Logo

dev-resources.site

for different kinds of informations.

Higher level threading in C++

Published at
4/29/2021
Categories
cpp
threading
Author
markboer
Categories
2 categories in total
cpp
open
threading
open
Author
8 person written this
markboer
open
Higher level threading in C++

This article was originally posted on my blog

Where Python is often called, "batteries included", one of C++ core principles is: "don't pay for what you don't use". And this too is visible in the C++'s standard template library (STL), it is deliberately kept small. One of the places this is visible is the threading functionality that the STL offers. Python's standard library contains everything from low level threading synchronization primitives (threads, locks) to high level ones (tasks, threadpools, futures). C++ only offers a single higher level one; the std::future. Unfortunately, the std::async function that returns these futures is usually implemented as little more than a wrapper around the std::thread. This has the disadvantage of a overhead of thread creation and destruction on every call of std::async.

This is why I generally prefer Threadpools, or a producer-consumer pattern. Both these patterns require a queue that contains the jobs or units of work. Worker threads will continually try to read items from such a queue and process the item. There are some libraries that offer threadsafe queue's and/or threadpools, such as poco, QT or boost, but it's actually fairly simple to implement your own, using only the STL. So let's implement one now!

Let's get started with the basic layout, we're creating a class called SynchronisationQueue with 2 method, put and get, that is loosely based on Python's SimpleQueue class.

template<typename T>
class SynchronisationQueue {
public:
    void put(const &T val);
    T get();
private:
    std::mutex mtx_;
    std::queue<T> queue_;
};
Enter fullscreen mode Exit fullscreen mode

The put implementation is quite straightforward. We need to protect access to the queue with a lock guard and then we simple push the item on the queue. The get is a little more cumbersome. We will need to check if the queue contains an item, if it does we can retrieve that and continue, but if it does not we will have to wait until another item is put on the queue that can then be returned. A simple, yet naive implementation, would be the following:

void put(const &T val)
{
    std::lock_guard<std::mutex> lck(mtx_);
    queue_.push(val);
}

T get()
{
    while (true)
    {
        {
            std::lock_guard<std::mutex> lck(mtx_);
            if (queue_.size() > 0)
            {
                T ret_val = std::move(queue_.front());
                queue_.pop();
                return ret_val;
            }
         }
         std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
}
Enter fullscreen mode Exit fullscreen mode

This implementation has an obvious shortcoming. The sleeping for X milliseconds wastes cpu cycles, which can be minimized by increasing the timeout, but this in turn has a negative impact on the troughput of the queue.

Here the std::condition_variable comes to the rescue, this is synchronization primitive that can be used to block threads, until another thread modifies the wait condition and notifies the waiting thread. The std::condition_variable has two types of member functions, that each come in different varieties, based on your needs:

  • wait: This function is to be called from the waiting thread, in the case of a JobQueue, this would be the workerthread waiting for a new item to be processed.
  • notify: This function is called from the notifying thread, the thread that changes a condition and wants to notify waiting threads to wake up.

If we have a condition variable cv, the most basic usage would look as follows:

// thread 1 waits for the condition
while(condition)
    cv.wait(lck);

// thread 2 notifies that thread 1 can continue
condition = true;
cv.notify_one()
Enter fullscreen mode Exit fullscreen mode

cv.wait() does two things, it releases the lock and then sleeps until cv.notify_one() or cv.notify_all() is called from another thread. Then the condition_variable unblocks and it reacquires the lock1, so you can safely continue using the protected resource.

Now you might wonder why we are continously checking the condition in a while loop. When there are multiple threads involved, there is no guarantee that between thread 2 setting the condition=true there isn't a third thread that sets it back to false. However, even if there is just the 2 threads, there is a chance that the condition variable awakens without being notified. This is called a spurious wakeup and it is allowed by the C++ standard to allow for some flexibility in the implementation of std::condition_variable, though it is not clear how many implementations actually use this freedom2. So you will always need to check the condition after the condition variable is notified, which is easiest done in a while loop.

So to sum this up, whenever you wish to use a condition variable you need the following three parts:

  1. A std::condition_variable
  2. A std::mutex
  3. A condition protected by the mutex (2)

In the case of the SynchronizationQueue the condition we will be using is the queue being empty. While it is empty we will wait until an element has been added to the queue by another thread. This thread will need to notify the waiting thread, signaling that it can continue and retrieve an item from the queue.

It is crucial that we do not alter the size of the queue without holding the lock, as this could potentially lead to race conditions. Apart from the obvious issues with accesing a std::queue from multiple threads, there can also be an issues with the condition variable. If a thread was allowed to insert an item into the queue without holding the lock, an item could potentially be inserted between the worker thread checking the condition (queue being empty) and starting to wait on the condition variable. This could result in a state where the worker thread is waiting (potentially indefinitely) while there is in fact an item in the queue it could process. This is why you always must hold the lock while altering the condition, even when the condition could be changed atomicly.

In the following example we have added the condition variable empty_queue_cv_ to the private member data of the SynchronisationQueue. We can rewrite the get method to make use of the condition variable:

T get()
{
    std::unique_lock<std::mutex> lck(mtx_);
    while (queue_.empty())
        empty_queue_cv_.wait(lck);
    T ret_val = std::move(queue_.front());
    queue_.pop();
    return ret_val;
}
Enter fullscreen mode Exit fullscreen mode

The final step is to add a notification to the put method to awaken a waiting thread. Since put only adds a single item to the queue a notify_one() will suffice. If we were to add a method that inserts multiple elements, like the std::vector range insert, we could use the notify_all() to wake up all waiting threads.

void put(const &T val)
{
    std::lock_guard<std::mutex> lck(mtx_);
    queue_.push(val);
    empty_queue_cv_.notify_one()
}
Enter fullscreen mode Exit fullscreen mode

The full SynchronizationQueue implementation can be found here. I have included several utility functions such as a get with and without timeout.

As we've seen, using these basic building blocks it becomes possible to implement the higher order synchronization primitives that other languages have build in, such as Threadpools, Semaphores, Barriers and Notifications. If you are looking for a challenge you could implement your own threadpool using the building blocks discussed here, there are also several implementations available on Github you could have a look at.

Happy coding!


  1. This is also the reason that we need to upgrade our lock_guard to a unique_lock, they are similar, but the latter allows you to release and reacquire the underlying mutex. 

  2. See also: Spurious wakeup. The spurious wakeup is also the reason a condition variable always needs to be used together with a condition, even when this condition is just a boolean. 

threading Article's
30 articles in total
Favicon
Concorrência e paralelismo em Python
Favicon
Navigating Concurrency for Large-Scale Systems
Favicon
Common Java Developer Interview Questions and Answers on multithreading, garbage collection, thread pools, and synchronization
Favicon
Real-time plotting with pyplot
Favicon
A Quick Guide to the Python threading Module with Examples
Favicon
Understanding Threading and Multiprocessing in Python: A Comprehensive Guide
Favicon
I Asked Copilot to Explain Threading in Python to a Dog
Favicon
Introduction to GCD (Grand Central Dispatch)
Favicon
Achieving multi-threading by creating threads manually in Swift
Favicon
Swift Concurrency
Favicon
Python Multithreading: Unlocking Concurrency for Better Performance
Favicon
Choosing the best asynchronous library in Python
Favicon
Two Lines of Code Eluded me for Several Months
Favicon
A Comprehensive Guide to Python Threading: Advanced Concepts and Best Practices
Favicon
Thread synchronisation
Favicon
Rust Learning Note: Multithreading
Favicon
Async vs Threading vs Multiprocessing in Python
Favicon
02.Android Background Task
Favicon
How to handle threads with multiple gunicorn workers to get consistent result
Favicon
Tasks, BackgroundWorkers, and Threads
Favicon
Understanding Task.WhenAll in C#
Favicon
Producer/consumer pipelines with System.Threading.Channels
Favicon
How to auto-refresh Realm inside Android WorkManager
Favicon
Understanding Task in .Net
Favicon
Como resolvemos um bug que afetava 3000 usuários e nos custaria milhões por ano
Favicon
Java Thread Programming (Part 1)
Favicon
So, you want to launch several threads in Python and something does not work?
Favicon
Higher level threading in C++
Favicon
Solve the scenario - using Thread synchronization in Dotnet - CountDownEvent
Favicon
Что в процессе тебе моем?

Featured ones: