Concurrent Queue

Concurrent queue is very useful in concurrent programming. For example, task queue can be considered as a special kind of concurrent queue. CLUE implements a concurrent queue class, in header file <clue/concurrent_queue.hpp>.

template<T>
class concurrent_queue

Concurrent queue class. T is the element type.

This class has a default constructor, but it is not copyable or movable. The class provides the following member functions:

size_t size() const

Get the number of elements in the queue (at the point this method is being called).

bool empty() const

Get whether the queue is empty (contains no elements).

void synchronize()

Block until all updating (e.g. push or pop) are done.

void clear()

Clear the queue (pop all remaining elements).

void push(const T &x)

Push an element x to the back of the queue.

void push(T &&x)

Push an element x (by moving) to the back of the queue.

void emplace(Args&&... args)

Construct an element using the given arguments and push it to the back of the queue.

bool try_pop(T &dst)

If the queue is not empty, pop the element at the front, store it to dst, and return true. Otherwise, return false immediately.

T wait_pop()

Wait until the queue is non-empty, and pop the element at the front and return it.

If the queue is already non-empty, it pops the front element and returns it immediately.

Note

All updating methods, including push, emplace, try_pop, and wait_pop, are thread-safe. It is safe to call these methods in concurrent threads.

Example: The following example shows how to use concurrent_queue to implement a task queue. In this example, multiple concurrent producers generate items to be processed, and a consumer fetches them from a queue and process.

#include <clue/concurrent_queue.hpp>
#include <vector>
#include <thread>
#include <cstdio>

inline void process_item(double v) {
    std::printf("process item %g\n", v);
}

int main() {
    const size_t M = 2;  // # producers
    const size_t k = 10;  // # items per producer
    size_t remain_nitems = M * k;

    clue::concurrent_queue<double> Q;
    std::vector<std::thread> producers;

    // producers: generate items to be processed
    for (size_t t = 0; t < M; ++t) {
        producers.emplace_back([&Q,t,k](){
            for (size_t i = 0; i < k; ++i) {
                double v = i + 1;
                Q.push(v);
            }
        });
    }

    // consumer: process the items
    std::thread consumer([&](){
        while (remain_nitems > 0) {
            process_item(Q.wait_pop());
            -- remain_nitems;
        }
    });

    // wait for all threads to complete
    for (auto& th: producers) th.join();
    consumer.join();
}

Note

To emulate a typical task queue, one may also push functions as elements, and let the consumer invokes each function that it acquires from the queue.