Threading Condition Variable in Python - Super Fast Python

You can use a thread condition object in Python via the threading.Condition class.

In this tutorial you will discover how to use a thread condition object in Python.

Let’s get started.

Need for A Condition Variable

A thread is a thread of execution in a computer program.

Every Python program has at least one thread of execution called the main thread. Both processes and threads are created and managed by the underlying operating system.

Sometimes we may need to create additional threads in our program in order to execute code concurrently.

Python provides the ability to create and manage new threads via the threading module and the threading.Thread class.

You can learn more about Python threads in the guude:

When writing concurrent programs we may need threads to wait for some condition within a critical section before continuing.

This could be achieved using a mutual exclusion lock to protect the critical section, but the threads waiting for the condition would have to spin (execute in a loop) repeatedly releasing/re-acquiring the mutex lock until the condition was met.

An alternative is to use a condition (also called a monitor) that builds upon a mutex and allows threads to wait and be notified.

What is a condition in concurrent programming and how can we use it in Python?

What is a Condition Variable

In concurrency, a condition (also called a monitor) allows multiple threads to be notified about some result.

It combines both a mutual exclusion lock (mutex) and a conditional variable.

A mutex can be used to protect a critical section, but it cannot be used to alert other threads that a condition has changed or been met.

A condition can be acquired by a thread (like a mutex) after which it can wait to be notified by another thread that something has changed. While waiting, the thread is blocked and releases the lock for other threads to acquire.

Another thread can then acquire the condition, make a change, and notify one, all, or a subset of threads waiting on the condition that something has changed. The waiting thread can then wake-up (be scheduled by the operating system), re-acquire the condition (mutex), perform checks on any changed state and perform required actions.

This highlights that a condition makes use of a mutex internally (to acquire/release the condition), but it also offers additional features such as allowing threads to wait on the condition and to allow threads to notify other threads waiting on the condition.

Now that we know what a condition is, let’s look at how we might use it in Python.

How to Use a Condition Variable

Python provides a condition via the threading.Condition class.

We can create a condition object and by default it will create a new reentrant mutex lock (threading.RLock class) by default which will be used internally.

...

# create a new condition

condition = threading.Condition()

We may have a reentrant mutex or a non-reentrant mutex that we wish to reuse in the condition for some good reason, in which case we can provide it to the constructor.

I don’t recommend this unless you know your use case has this requirement. The chance of getting into trouble is high.

...

# create a new condition with a custom lock

condition = threading.Condition(lock=my_lock)

In order for a thread to make use of the condition, it must acquire it and release it, like a mutex lock.

This can be achieved manually with the acquire() and release() functions.

For example, we can acquire the condition and then wait on the condition to be notified and finally release the condition as follows:

...

# acquire the condition

condition.acquire()

# wait to be notified

condition.wait()

# release the condition

condition.release()

An alternative to calling the acquire() and release() functions directly is to use the context manager, which will perform the acquire/release automatically for us, for example:

...

# acquire the condition

with condition:

# wait to be notified

condition.wait()

The wait() function will wait forever until notified by default. We can also pass a “timeout” argument which will allow the thread to stop blocking after a time limit in seconds.

For example:

...

# acquire the condition

with condition:

# wait to be notified

condition.wait(timeout=10)

The threading.Condition class also provides a wait_for() function that can be used to only unlock the waiting thread if a condition is met, such as calling a function that returns a boolean value.

The name of the function that returns a boolean value can be provided to the wait_for() function directly, and the function also takes a “timeout” argument in seconds.

...

# acquire the condition

with condition:

# wait to be notified and a function to return true

condition.wait_for(all_data_collected

We also must acquire the condition in a thread if we wish to notify waiting threads. This too can be achieved directly with the acquire/release function calls or via the context manager.

We can notify a single waiting thread via the notify() function.

For example:

...

# acquire the condition

with condition:

# notify a waiting thread

condition.notify()

The notified thread will stop-blocking as soon as it can re-acquire the mutex within the condition. This will be attempted automatically as part of it’s call to wait() or wait_for(), you do not need to do anything extra.

If there are more than one thread waiting on the condition, we will not know which thread will be notified.

We can also notify a subset of waiting threads by setting the “n” argument to an integer number of threads to notify, for example:

...

# acquire the condition

with condition:

# notify 3 waiting threads

condition.notify(n=3)

Finally, we can notify all threads waiting on the condition via the notify_all() function.

...

# acquire the condition

with condition:

# notify all threads waiting on the condition

condition.notify_all()

A final reminder, a thread must acquire the condition before waiting on it or notifying waiting threads. A failure to acquire the condition (the lock within the condition) before performing these actions will result in a RuntimeError.

Now that we know how to use the threading.Condition class, let’s look at some worked examples.


Free Python Threading Course

Download your FREE threading PDF cheat sheet and get BONUS access to my free 7-day crash course on the threading API.

Discover how to use the Python threading module including how to create and start new threads and how to use a mutex locks and semaphores

Learn more
 


Example of Wait and Notify With a Condition

In this section we will explore using a threading.Condition to notify a waiting thread that something has happened.

We will use a new threading.Thread instance to prepare some data and notify a waiting thread, and in the main thread we will kick-off the new thread and use the condition to wait for the work to be completed.

First, we will define a target task function to execute in a new thread.

The function will take the condition object and a list in which it can deposit data. The function will block for a moment, add data to the list, then notify the waiting thread.

The complete target task function is listed below.

# target function to prepare some work

def task(condition, work_list):

    # block for a moment

    sleep(1)

    # add data to the work list

    work_list.append(33)

    # notify a waiting thread that the work is done

    print('Thread sending notification...')

    with condition:

        condition.notify()

In the main thread, first we can create the condition and a list in which we can place data.

...

# create a condition

condition = Condition()

# prepare the work list

work_list = list()

Next, we can start a new thread calling our target task function and wait to be notified of the result

...

# wait to be notified that the data is ready

print('Main thread waiting for data...')

with condition:

    # start a new thread to perform some work

    worker = Thread(target=task, args=(condition, work_list))

    worker.start()

    # wait to be notified

    condition.wait()

Note, we must start the new thread after we have acquired the mutex lock in the condition in this example.

If we did not acquire the lock first, it is possible that there would be a race condition. Specifically, if we started the new thread before acquiring the condition and waiting in the main thread, then it is possible for the new thread to execute and notify before the main thread has had a chance to start waiting. In which case the main thread would wait forever to be notified.

Finally, we can report the data once it is available.

...

# we know the data is ready

print(f'Got data: {work_list}')

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

# SuperFastPython.com

# example of wait/notify with a condition

from time import sleep

from threading import Thread

from threading import Condition

# target function to prepare some work

def task(condition, work_list):

    # block for a moment

    sleep(1)

    # add data to the work list

    work_list.append(33)

    # notify a waiting thread that the work is done

    print('Thread sending notification...')

    with condition:

        condition.notify()

# create a condition

condition = Condition()

# prepare the work list

work_list = list()

# wait to be notified that the data is ready

print('Main thread waiting for data...')

with condition:

    # start a new thread to perform some work

    worker = Thread(target=task, args=(condition, work_list))

    worker.start()

    # wait to be notified

    condition.wait()

# we know the data is ready

print(f'Got data: {work_list}')

Running the example first creates the condition and the work list.

The new thread is defined and started. The thread blocks for a moment, adds data to the list then notifies the waiting thread.

Meanwhile the main thread waits to be notified by the new threads, then once notified it knows the data is ready and reports the results.

Main thread waiting for data...

Thread sending notification...

Got data: [33]

Next, let’s look at how we might notify all waiting threads.

Example of wait() and notify_all() With a Condition

We can explore how to notify all threads waiting on a condition.

In this example we will start a suite of threads that will wait on the condition to be notified before performing their processing and reporting a result. The main thread will block for a moment then notify all waiting threads that they can begin processing.

First, we can define a target task function that takes the condition as an argument along with a unique integer for identification.

The task function will acquire the condition and wait to be notified. Once notified it will generate a random value between 0 and 1, block for that fraction of a second then report the value.

The task function is listed below.

# target function

def task(condition, number):

    # wait to be notified

    print(f'Thread {number} waiting...')

    with condition:

        condition.wait()

    # block for a moment

    value = random()

    sleep(value)

    # report a result

    print(f'Thread {number} got {value}')

The main thread will first create the condition object instance.

...

# create a condition

condition = Condition()

Next, we can create five threads that will execute the task() function, passing in the condition instance and a unique integer from 0 to 4 for each thread instance.

...

# start a bunch of threads that will wait to be notified

for i in range(5):

    worker = Thread(target=task, args=(condition, i))

    worker.start()

Finally, the main thread will block for a second, then will notify all waiting threads and waits for the threads to complete.

...

# block for a moment

sleep(1)

# notify all waiting threads that they can run

with condition:

    # wait to be notified

    condition.notify_all()

# block until all non-daemon threads finish...

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

# SuperFastPython.com

# example of wait/notify all with a condition

from time import sleep

from random import random

from threading import Thread

from threading import Condition

# target function

def task(condition, number):

    # wait to be notified

    print(f'Thread {number} waiting...')

    with condition:

        condition.wait()

    # block for a moment

    value = random()

    sleep(value)

    # report a result

    print(f'Thread {number} got {value}')

# create a condition

condition = Condition()

# start a bunch of threads that will wait to be notified

for i in range(5):

    worker = Thread(target=task, args=(condition, i))

    worker.start()

# block for a moment

sleep(1)

# notify all waiting threads that they can run

with condition:

    # wait to be notified

    condition.notify_all()

# block until all non-daemon threads finish...

Running the example first creates five threads that start running immediately and all acquire the condition and block waiting to be notified.

The main thread blocks for a moment then notifies all five waiting threads. The waiting threads wake-up, acquire the lock in the condition one at a time, perform their processing and report their result.

The program exits once all threads finish their processing.

Note, your specific results will differ given the use of random numbers in the example.

Thread 0 waiting...

Thread 1 waiting...

Thread 2 waiting...

Thread 3 waiting...

Thread 4 waiting...

Thread 4 got 0.19179135601921027

Thread 2 got 0.2889060889073257

Thread 1 got 0.7461055320153259

Thread 3 got 0.7859499073325357

Thread 0 got 0.9911966665209208

Next, let’s look at how we might wait for a specific result on the condition.


Python Threading Jump-Start

Loving The Tutorials?

Why not take the next step? Get the book.

Learn more
 


Example of wait_for() With a Condition

We can explore how to use the wait_for() function on the condition.

This function takes a callable, such as a function with no arguments or a lambda expression. The thread calling the wait_for() function will block until notified and the callable passed in as an argument returns a True value.

This might mean that the thread is notified many times by different threads, but will only unblock and continue execution once the condition in the callable is met.

In this example, will create a suite of worker threads, each of which will calculate a value and add it to a shared list and notify the waiting thread. The main thread will wait on the condition and will use a lambda expression in the wait_for() function to not continue on until a work list populated by the worker threads is fully populated.

First, we must define a target task function.

The function will compute a random value, block for a moment to simulate processing, add the value to the shared work list and notify the waiting thread.

A list will be shared between all work threads and therefore must be protected by a mutex lock to avoid a race condition in adding values to the list. This can be achieved with the mutex within the condition, meaning we must acquire the condition in order to use the list, making the list thread-safe.

The target task function is below.

# target function

def task(condition, work_list):

    # acquire the condition

    with condition:

        # block for a moment

        value = random()

        sleep(value)

        # add work to the list

        work_list.append(value)

        print(f'Thread added {value}')

        # notify the waiting thread

        condition.notify()

In the main thread, we will first create the condition and the shared list used to collect the work from each thread.

...

# create a condition

condition = Condition()

# define work list

work_list = list()

Next, we can start five threads and pass the condition object instance and work list as arguments to each thread.

...

# start a bunch of threads that will add work to the list

for i in range(5):

    worker = Thread(target=task, args=(condition, work_list))

    worker.start()

Finally, we will wait to be notified on the condition and use a lambda expression to ensure that we do not continue on until the work list contains five values, matching the number of worker threads.

Again, access to the list will occur only while we hold the lock for the condition, avoiding any possible race condition with the list itself.

...

# wait for all threads to add their work to the list

with condition:

    # wait to be notified

    condition.wait_for(lambda : len(work_list)==5)

    print(f'Done, got: {work_list}')

Tying this together, the complete example of using the condition wait_for() function is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

# SuperFastPython.com

# example of wait for with a condition

from time import sleep

from random import random

from threading import Thread

from threading import Condition

# target function

def task(condition, work_list):

    # acquire the condition

    with condition:

        # block for a moment

        value = random()

        sleep(value)

        # add work to the list

        work_list.append(value)

        print(f'Thread added {value}')

        # notify the waiting thread

        condition.notify()

# create a condition

condition = Condition()

# define work list

work_list = list()

# start a bunch of threads that will add work to the list

for i in range(5):

    worker = Thread(target=task, args=(condition, work_list))

    worker.start()

# wait for all threads to add their work to the list

with condition:

    # wait to be notified

    condition.wait_for(lambda : len(work_list)==5)

    print(f'Done, got: {work_list}')

Running the example first starts five threads, each of which will acquire the condition, generate a random value, add it to the shared work list and notify the main thread.

The main thread waits on the condition and is notified each time one of the new threads finishes, but does not actually continue on and print a message until the lambda callable returns True, that is once the number of values in the list matches the number of threads.

Note, your specific results will differ given the use of random numbers.

Thread added 0.25571585179107537

Thread added 0.8208840033855079

Thread added 0.967902983880266

Thread added 0.22873248054525108

Thread added 0.8632623296500798

Done, got: [0.25571585179107537, 0.8208840033855079, 0.967902983880266, 0.22873248054525108, 0.8632623296500798]

Further Reading

This section provides additional resources that you may find helpful.

Python Threading Books

I also recommend specific chapters in the following books:

  • Python Cookbook, David Beazley and Brian Jones, 2013.
    • See: Chapter 12: Concurrency
  • Effective Python, Brett Slatkin, 2019.
    • See: Chapter 7: Concurrency and Parallelism
  • Python in a Nutshell, Alex Martelli, et al., 2017.
    • See: Chapter: 14: Threads and Processes

Guides

APIs

References

Takeaways

You now know how to use a thread condition object in Python

Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.

Photo by Sugden Guy sugden on Unsplash