Files:
The Wait Conditions example shows how to use QWaitCondition and QMutex to control access to a circular buffer shared by a producer thread and a consumer thread.
The producer writes data to the buffer until it reaches the end of the buffer, at which point it restarts from the beginning, overwriting existing data. The consumer thread reads the data as it is produced and writes it to screen.
Wait conditions make it possible to have a higher level of concurrency than what is possible with mutexes alone. If accesses to the buffer were simply guarded by a QMutex, the consumer thread couldn't access the buffer at the same time as the producer thread. Yet, there is no harm in having both threads working on different parts of the buffer at the same time.
The example comprises two classes: Producer and Consumer. Both inherit from QThread. The circular buffer used for communicating between these two classes and the synchronization tools that protect it are global variables.
An alternative to using QWaitCondition and QMutex to solve the producer-consumer problem is to use QSemaphore. This is what the Semaphores example does.
Let's start by reviewing the circular buffer and the associated synchronization tools:
// The total number of bytes that will be produced and consumed static const int DataSize = 100000; // The size of the buffer that is used to deliver data from the producer to the consumer static const int BufferSize = 1700; // The buffer that is used to deliver data from the producer to the consumer static char buffer[BufferSize]; // The wait condition that represents the state of an not-empty buffer static QWaitCondition bufferNotEmpty; // The wait condition that represents the state of an not-full buffer static QWaitCondition bufferNotFull; // The mutex that synchronizes the usage of the wait conditions static QMutex mutex; // The number of bytes that are used inside the shared buffer static int numUsedBytes = 0;
DataSize is the amount of data that the producer will generate. To keep the example as simple as possible, we make it a constant. BufferSize is the size of the circular buffer. It is less than DataSize, meaning that at some point the producer will reach the end of the buffer and restart from the beginning.
To synchronize the producer and the consumer, we need two wait conditions and one mutex. The bufferNotEmpty condition is signalled when the producer has generated some data, telling the consumer that it can start reading it. The bufferNotFull condition is signalled when the consumer has read some data, telling the producer that it can generate more. The numUsedBytes is the number of bytes in the buffer that contain data.
Together, the wait conditions, the mutex, and the numUsedBytes counter ensure that the producer is never more than BufferSize bytes ahead of the consumer, and that the consumer never reads data that the producer hasn't generated yet.
Let's review the code for the Producer class:
/** * The Producer fills the shared buffer with data in a separated worker thread. */ class Producer : public QThread { public: void run() { // This code is executed in the worker thread // Initialize the random number generator qsrand(QTime(0, 0, 0).secsTo(QTime::currentTime())); for (int i = 0; i < DataSize; ++i) { mutex.lock(); // Check whether we have written so many bytes that the shared buffer is completely filled. if (numUsedBytes == BufferSize) { /** * In this case wait on the 'bufferNotFull' condition. * This will block us until the Consumer calls wakeAll() on this wait condition. */ bufferNotFull.wait(&mutex); } mutex.unlock(); // Write on byte to the shared buffer ... buffer[i % BufferSize] = "ACGT"[(int) qrand() % 4]; mutex.lock(); // ... increment the number of used bytes ... ++numUsedBytes; // ... and wake up the Consumer if it has been blocked. bufferNotEmpty.wakeAll(); mutex.unlock(); } } };
The producer generates DataSize bytes of data. Before it writes a byte to the circular buffer, it must first check whether the buffer is full (i.e., numUsedBytes equals BufferSize). If the buffer is full, the thread waits on the bufferNotFull condition.
At the end, the producer increments numUsedBytes and signalls that the condition bufferNotEmpty is true, since numUsedBytes is necessarily greater than 0.
We guard all accesses to the numUsedBytes variable with a mutex. In addition, the QWaitCondition::wait() function accepts a mutex as its argument. This mutex is unlocked before the thread is put to sleep and locked when the thread wakes up. Furthermore, the transition from the locked state to the wait state is atomic, to prevent race conditions from occurring.
Let's turn to the Consumer class:
/** * The Consumer reads the data from the shared buffer in a separated worker thread and * publishes the read data to the UI. */ class Consumer : public QThread { Q_OBJECT public: void run() { for (int i = 0; i < DataSize; ++i) { mutex.lock(); // Check whether we have read so many bytes that the shared buffer is empty. if (numUsedBytes == 0) { /** * In this case wait on the 'bufferNotEmpty' condition. * This will block us until the Producer calls wakeAll() on this wait condition. */ bufferNotEmpty.wait(&mutex); } mutex.unlock(); // Read on byte from the shared buffer ... const QString text(buffer[i % BufferSize]); mutex.lock(); // ... decrement the number of used bytes ... --numUsedBytes; // ... and wake up the Producer if it has been blocked. bufferNotFull.wakeAll(); mutex.unlock(); // Publish the result to the UI emit stringConsumed(text); } // Tell the UI that we have transferred all data emit stringConsumed("\n\nFinished"); } Q_SIGNALS: void stringConsumed(const QString &text); };
The code is very similar to the producer. Before we read the byte, we check whether the buffer is empty (numUsedBytes is 0) instead of whether it's full and wait on the bufferNotEmpty condition if it's empty. After we've read the byte, we decrement numUsedBytes (instead of incrementing it), and we signal the bufferNotFull condition (instead of the bufferNotEmpty condition).
/** * This sample application shows how to solve the producer-consumer problem (http://en.wikipedia.org/wiki/Producer-consumer_problem) * with wait conditions under Qt. */ int main(int argc, char **argv) { Application app(argc, argv); // Create the producer and consumer objects Producer producer; Consumer consumer; // Create the TextBuffer object TextBuffer textBuffer; // Append the new data to the TextBuffer whenever the Consumer has consumed some data QObject::connect(&consumer, SIGNAL(stringConsumed(const QString&)), &textBuffer, SLOT(appendText(const QString&)), Qt::BlockingQueuedConnection); // Load the UI description from main.qml QmlDocument *qml = QmlDocument::create("asset:///main.qml"); // Make the TextBuffer object available to the UI as context property qml->setContextProperty("_textBuffer", &textBuffer); // Create the application scene AbstractPane *appPage = qml->createRootObject<AbstractPane>(); Application::instance()->setScene(appPage); // Start the producer and consumer thread producer.start(); consumer.start(); return Application::exec(); }
So what happens when we run the program? Initially, the producer thread is the only one that can do anything; the consumer is blocked waiting for the bufferNotEmpty condition to be signalled (numUsedBytes is 0). Once the producer has put one byte in the buffer, numUsedBytes is BufferSize - 1 and the bufferNotEmpty condition is signalled. At that point, two things can happen: Either the consumer thread takes over and reads that byte, or the consumer gets to produce a second byte.
The producer-consumer model presented in this example makes it possible to write highly concurrent multithreaded applications. On a multiprocessor machine, the program is potentially up to twice as fast as the equivalent mutex-based program, since the two threads can be active at the same time on different parts of the buffer.
Be aware though that these benefits aren't always realized. Locking and unlocking a QMutex has a cost. In practice, it would probably be worthwhile to divide the buffer into chunks and to operate on chunks instead of individual bytes. The buffer size is also a parameter that must be selected carefully, based on experimentation.
To visualize the result, the data the Consumer processes is also appended to a TextBuffer, which is exported to QML and displayed there.