Use a Condition Variable in the Multiprocessing Pool - Super Fast Python

You can share a multiprocessing.Condition in child worker processes in the multiprocessing pool by using a multiprocessing.Manager.

In this tutorial you will discover how to use a condition variable in the process pool in Python.

Let’s get started.

Need a Condition Variable in the Process Pool

The multiprocessing.pool.Pool in Python provides a pool of reusable processes for executing ad hoc tasks.

A process pool can be configured when it is created, which will prepare the child workers.

A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.

multiprocessing — Process-based parallelism

We can issue one-off tasks to the process pool using functions such as apply() or we can apply the same function to an iterable of items using functions such as map().

Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as apply_async() and map_async().

When using the process pool, we may need to use a condition variable shared among all worker processes.

How can we use a condition variable in the process pool?

What is a Condition Variable.

In concurrency, a condition (also called a monitor) allows multiple processes (or threads) to be notified about some result.

It combines both a mutual exclusion lock (mutex) and a conditional variable.

A mutex can be used to protect a critical section, but it cannot be used to alert other processes that a condition has changed or been met.

A condition can be acquired by a process (like a mutex) after which it can wait to be notified by another process that something has changed. While waiting, the process is blocked and releases the lock for other processes to acquire.

Another process can then acquire the condition, make a change, and notify one, all, or a subset of processes waiting on the condition that something has changed. The waiting process can then wake-up (be scheduled by the operating system), re-acquire the condition (mutex), perform checks on any changed state and perform required actions.

This highlights that a condition makes use of a mutex internally (to acquire/release the condition), but it also offers additional features such as allowing processes to wait on the condition and to allow processes to notify other processes waiting on the condition.

Now that we know what a condition is, let’s look at how we might use it in Python.

How Can We Use a Condition Variable

Python provides a condition variable via the multiprocessing.Condition class.

...

# create a new condition variable

condition = multiprocessing.Condition()

In order for a process to make use of the condition, it must acquire it and release it, like a mutex lock.

This can be achieved manually with the acquire() and release() functions.

For example, we can acquire the condition and then wait on the condition to be notified and finally release the condition as follows:

...

# acquire the condition

condition.acquire()

# wait to be notified

condition.wait()

# release the condition

condition.release()

An alternative to calling the acquire() and release() functions directly is to use the context manager, which will perform the acquire/release automatically for us, for example:

...

# acquire the condition

with condition:

# wait to be notified

condition.wait()

We can notify a single waiting process via the notify() function.

For example:

...

# acquire the condition

with condition:

# notify a waiting process

condition.notify()

Finally, we can notify all processes waiting on the condition via the notify_all() function.

...

# acquire the condition

with condition:

# notify all processes waiting on the condition

condition.notify_all()

You can learn more about how to use the process-based condition variable in the tutorial:

Next, let’s look at how we might use the condition variable in the process pool.


Free Python Multiprocessing Pool Course

Download your FREE Process Pool PDF cheat sheet and get BONUS access to my free 7-day crash course on the Process Pool API.

Discover how to use the Multiprocessing Pool including how to configure the number of workers and how to execute tasks asynchronously.

Learn more
 


How to Use a Condition Variable in the Process Pool

We can create a multiprocessing.Condition instance and share it with the child worker processes in the process pool.

There are perhaps three ways we can share a multiprocessing.Condition instance with worker processes, they are:

  1. By passing it as an argument when initializing the worker processes.
  2. By passing it as an argument to tasks executed by the pool.
  3. By using the ‘fork‘ start method, storing it in a global variable, then having child processes inherit the variable.

The third method, using the ‘fork‘ start method will work, and provides an easy way to share a condition variable with child worker processes.

You can learn more about inheriting global variables by child processes in the tutorial:

The problem is, the ‘fork‘ start method is not available on all platforms, e.g. it cannot be used on Windows.

Alternately, if we naively pass a multiprocessing.Condition as an argument when initializing the process pool or in a task executed by the process pool, it will fail with an error, such as:

Condition objects should only be shared between processes through inheritance

Instead, we must use a multiprocessing.Manager.

A multiprocessing.Manager creates a process and is responsible for managing a centralized version of an object. It then provides proxy objects that can be used in other processes that keep up-to-date with the single centralized object.

Managers provide a way to create data which can be shared between different processes, including sharing over a network between processes running on different machines. A manager object controls a server process which manages shared objects. Other processes can access the shared objects by using proxies.

MULTIPROCESSING — PROCESS-BASED PARALLELISM

As such, using a multiprocessing.Manager is a useful way to centralize a synchronization primitive like a multiprocessing.Condition shared among multiple worker processes.

We can first create a multiprocessing.Manager using the context manager interface.

For example:

...

# create the manager

with Manager() as manager:

    # ...

We can then create a shared multiprocessing.Condition object using the manager.

This will return a proxy object for the multiprocessing.Condition object in the manager process that we can share among child worker processes directly or indirectly.

For example:

...

# create a shared object via the manager

condition = manager.Condition()

The proxy for the multiprocessing.Condition can then be passed to a child worker initialization function or to a task function executed by worker processes.

Now that we know how to share a multiprocessing.Condition with child worker processes in the process pool, let’s look at some worked examples.

Errors Sharing a Condition Variable With Child Workers

Before we look at an example of how to successfully share a multiprocessing.Condition with child worker processes, let’s look at some common failure cases.

Three common errors when attempting to share a multiprocessing.Condition with worker processes are:

  1. Asynchronous tasks fail silently.
  2. Asynchronous tasks fail by error callback.
  3. Synchronous tasks fail with an error.

Let’s take a closer look at each failure case in turn.

Using a Condition in the Process Pool Fails Silently

The first common failure case involves issuing tasks asynchronously to the process pool and having them fail silently.

In this example, we will define a task that must be notified before it can be started. It allows us to issue multiple concurrent tasks into the process pool all at once and have them wait until we are ready, then have all issued tasks start at the same time.

In this example, we define a task function that takes an integer identifier and a condition variable as an argument. It acquires the condition and waits to be notified before starting. Once started, it generates a random number, blocks for a moment then reports the generated number. A process pool is created and many tasks are issued asynchronously. The tasks start, but wait to be notified before continuing. The main process notifies all tasks, then waits for them to complete.

The asynchronous tasks issued to the process pool fail silently.

They fail because a multiprocessing.Condition is passed to the task, which raises an error. The error is silent because the example does not explicitly get the result from the tasks, as there is no result to get.

Firstly, we can define the target task function executed in the process pool.

The function takes an integer identifier as an argument as well as a shared condition variable. The task acquires the condition using the context manager interface, then reports a message while waiting to be notified. Once notified, it generates a random number between 0 and 1, blocks for a fraction of a second to simulate computational effort, then reports both its identifier and generated value.

The task() function below implements this.

# task executed in a worker process

def task(identifier, condition):

    # acquire the condition

    with condition:

        # report waiting message

        print(f'Task {identifier} waiting...', flush=True)

        # wait to be notified

        condition.wait()

    # generate a value

    value = random()

    # block for a moment

    sleep(value)

    # report a message

    print(f'Task {identifier} completed with {value}', flush=True)

Next, in the main process, we can define a condition variable.

...

# create the shared condition

condition = Condition()

We then define a process pool with 4 worker processes. In this case we use the context manager interface to ensure the process pool closes automatically once we are finished with it.

...

# create and configure the process pool

with Pool(4) as pool:

    # ...

You can learn more about the context manager interface in the tutorial:

We then define a list of task arguments, each item in the list representing a tuple of arguments for one call to the task() function, passing an integer and the shared condition variable.

...

# prepare task arguments

items = [(i, condition) for i in range(4)]

The items are then issued with calls to the task() function asynchronously via the starmap_async() function that returns an AsyncResult function immediately.

You can learn more about issuing asynchronous tasks to the process pool with starmap_async() in the tutorial:

...

# issue tasks into the process pool

result = pool.starmap_async(task, items)

The main process then blocks for a moment to allow all issued tasks to start in the process pool and wait to be notified.

...

# wait a moment

sleep(1)

The main process then acquires the condition via the context manager interface, then reports a message and notifies all waiting tasks, allowing them to start.

...

# start both tasks at the same time

with condition:

    # report start message

    print(f'Tasks begin.', flush=True)

    # notify the tasks

    condition.notify_all()

Finally, the main process blocks until all tasks are completed.

...

# wait for all tasks to finish

result.wait()

# report done message

print(f'Tasks done.', flush=True)

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

# SuperFastPython.com

# example of using a condition in the process pool that fails silently

from random import random

from time import sleep

from multiprocessing import Condition

from multiprocessing.pool import Pool

# task executed in a worker process

def task(identifier, condition):

    # acquire the condition

    with condition:

        # report waiting message

        print(f'Task {identifier} waiting...', flush=True)

        # wait to be notified

        condition.wait()

    # generate a value

    value = random()

    # block for a moment

    sleep(value)

    # report a message

    print(f'Task {identifier} completed with {value}', flush=True)

# protect the entry point

if __name__ == '__main__':

    # create the shared condition

    condition = Condition()

    # create and configure the process pool

    with Pool(4) as pool:

        # prepare task arguments

        items = [(i, condition) for i in range(4)]

        # issue tasks into the process pool

        result = pool.starmap_async(task, items)

        # wait a moment

        sleep(1)

        # start both tasks at the same time

        with condition:

            # report start message

            print(f'Tasks begin.', flush=True)

            # notify the tasks

            condition.notify_all()

        # wait for all tasks to finish

        result.wait()

        # report done message

        print(f'Tasks done.', flush=True)

Running the example first creates the shared condition variable.

The process pool is then created with 4 worker processes.

The task arguments are prepared and 4 tasks are issued to the process pool. The main process then sleeps for a moment.

The tasks fail to execute and no messages are reported.

No error messages are reported and the tasks fail silently.

The main process continues on. It acquires the condition variable, notifies any waiting processes which there are none at this point, then waits for the issued tasks to complete. The tasks have already failed, therefore the main process finishes nearly immediately.

The tasks failed because a multiprocessing.Condition was shared directly with the child worker processes. An error was raised by each call to the task() function. The error was stored in the returned AsyncResult object, but was not re-raised or checked for, making it appear that the tasks failed silently.

Next, let’s look at how we might expose the error that is raised by issuing asynchronous tasks.

RuntimeError in Error Callback Using Condition in the Process Pool

Sharing a multiprocessing.Condition with tasks in the process pool via an argument to the task function will fail with a RuntimeError.

We can explore this error when issuing asynchronous tasks to the pool by using an error callback.

In this example we will update the previous example that fails silently to report the error caused by sharing a condition variable with child processes in the process pool.

This can be achieved by defining a function that takes an error argument and reports the error message directly.

# error callback function

def custom_error_callback(error):

    print(error, flush=True)

Next, we can configure the starmap_async() function to use the error callback function when an exception is raised executing a task in the process pool.

...

# issue tasks into the process pool

result = pool.starmap_async(task, items, error_callback=custom_error_callback)

And that’s it.

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

# SuperFastPython.com

# example of using a condition in the process pool that fails with error callback

from random import random

from time import sleep

from multiprocessing import Condition

from multiprocessing.pool import Pool

# error callback function

def custom_error_callback(error):

    print(error, flush=True)

# task executed in a worker process

def task(identifier, condition):

    # acquire the condition

    with condition:

        # report waiting message

        print(f'Task {identifier} waiting...', flush=True)

        # wait to be notified

        condition.wait()

    # generate a value

    value = random()

    # block for a moment

    sleep(value)

    # report a message

    print(f'Task {identifier} completed with {value}', flush=True)

# protect the entry point

if __name__ == '__main__':

    # create the shared condition

    condition = Condition()

    # create and configure the process pool

    with Pool(4) as pool:

        # prepare task arguments

        items = [(i, condition) for i in range(4)]

        # issue tasks into the process pool

        result = pool.starmap_async(task, items, error_callback=custom_error_callback)

        # wait a moment

        sleep(1)

        # start both tasks at the same time

        with condition:

            # report start message

            print(f'Tasks begin.', flush=True)

            # notify the tasks

            condition.notify_all()

        # wait for all tasks to finish

        result.wait()

        # report done message

        print(f'Tasks done.', flush=True)

Running the example first creates the shared condition variable.

The process pool is then created with 4 worker processes.

The task arguments are prepared and 4 tasks are issued to the process pool. The main process then blocks for a moment.

Each issued task fails with a RuntimeError. The first task to raise the error calls the error callback, which reports the error directly.

The main process unblocks, notifies the condition, waits for the tasks to finish and terminates the program.

This highlights how the process pool is configured to not allow condition variables to be passed directly as arguments.

Condition objects should only be shared between processes through inheritance

Tasks begin.

Tasks done.

Next, let’s look at how we might expose the error that is raised by issuing asynchronous tasks, then get the results of the issued tasks directly.

RuntimeError Using Condition in the Process Pool

We can update the above example to get the results of the issued tasks.

Our task() function does not have a return value, therefore there are no results to get. Nevertheless, we can get the get() function on the returned AsyncResult object for each issued task. This will allow any failure that occurred while running the task, such as an Error or Exception to be re-raised.

Recall that the get() function on the AsyncResult will block until the task function returns.

For example:

...

# get each result

for item in result.get():

    pass

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

# SuperFastPython.com

# example of using a condition in the process pool that fails with an exception

from random import random

from time import sleep

from multiprocessing import Condition

from multiprocessing.pool import Pool

# task executed in a worker process

def task(identifier, condition):

    # acquire the condition

    with condition:

        # report waiting message

        print(f'Task {identifier} waiting...', flush=True)

        # wait to be notified

        condition.wait()

    # generate a value

    value = random()

    # block for a moment

    sleep(value)

    # report a message

    print(f'Task {identifier} completed with {value}', flush=True)

# protect the entry point

if __name__ == '__main__':

    # create the shared condition

    condition = Condition()

    # create and configure the process pool

    with Pool(4) as pool:

        # prepare task arguments

        items = [(i, condition) for i in range(4)]

        # issue tasks into the process pool

        result = pool.starmap_async(task, items)

        # wait a moment

        sleep(1)

        # start both tasks at the same time

        with condition:

            # report start message

            print(f'Tasks begin.', flush=True)

            # notify the tasks

            condition.notify_all()

        # get each result

        for item in result.get():

            pass

        # report done message

        print(f'Tasks done.', flush=True)

Running the example first creates the shared condition variable.

The process pool is then created with 4 worker processes.

The task arguments are prepared and 4 tasks are issued to the process pool. The main process then sleeps for a moment.

The tasks fail to execute and no messages are reported.

The main process unblocks, notifies the condition, then begins iterating through the return values for the issued task.

The first call to get() re-raises the RunTime error experienced in the task() function, terminating the program.

Tasks begin.

Traceback (most recent call last):

  ...

RuntimeError: Condition objects should only be shared between processes through inheritance

Now that we have confirmed that we cannot pass a normal multiprocessing.Condition to a task executed in the process pool, let’s look at how we might use a multiprocessing.Manager to fix the problem.

Manager to Use a Condition Variable in the Process Pool


Python Multiprocessing Pool Jump-Start

Loving The Tutorials?

Why not take the next step? Get the book.

Learn more
 


We can explore how to use a multiprocessing.Manager to share a multiprocessing.Condition among child worker processes in the process pool.

This can be achieved by updating the first asynchronous example of issuing tasks that failed silently to create the multiprocessing.Condition using a multiprocessing.Manager.

First, a manager can be created using the context manager interface. This ensures that the manager is closed automatically once we are finished with it.

...

# create the manager

with Manager() as manager:

    # ...

Next, a multiprocessing.Condition can be created using the multiprocessing.Manager instance.

This will create and host a condition variable in a new server process and returns a proxy object that can be shared among child worker processes and used to interface with the centralized condition variable instance.

...

# create the shared condition

condition = manager.Condition()

And that’s it.

The complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

# SuperFastPython.com

# example of using a condition in the process pool that use a manager

from random import random

from time import sleep

from multiprocessing import Manager

from multiprocessing.pool import Pool

# task executed in a worker process

def task(identifier, condition):

    # acquire the condition

    with condition:

        # report waiting message

        print(f'Task {identifier} waiting...', flush=True)

        # wait to be notified

        condition.wait()

    # generate a value

    value = random()

    # block for a moment

    sleep(value)

    # report a message

    print(f'Task {identifier} completed with {value}', flush=True)

# protect the entry point

if __name__ == '__main__':

    # create the manager

    with Manager() as manager:

        # create the shared condition

        condition = manager.Condition()

        # create and configure the process pool

        with Pool(4) as pool:

            # prepare task arguments

            items = [(i, condition) for i in range(4)]

            # issue tasks into the process pool

            result = pool.starmap_async(task, items)

            # wait a moment

            sleep(1)

            # start both tasks at the same time

            with condition:

                # report start message

                print(f'Tasks begin.', flush=True)

                # notify the tasks

                condition.notify_all()

            # wait for all tasks to finish

            result.wait()

            # report done message

            print(f'Tasks done.', flush=True)

Running the example first creates the shared condition variable.

The process pool is then created with 4 worker processes.

The task arguments are prepared and 4 tasks are issued to the process pool. The main process then sleeps for a moment.

Each task begins executing, acquiring the condition variable, reporting a message and waiting to be notified.

The main process continues on. It acquires the condition variable then notifies all processes waiting on the condition variable. It then waits for all issued tasks to complete.

The waiting tasks are notified and continue on. Each task generates a random value between 0 and 1, then blocks for a fraction of a second, then reports a message.

All tasks complete and the main process continues on, reports a final message and closes the application.

This highlights how we can share and use a condition variable among child worker processes in the process pool.

Task 0 waiting...

Task 2 waiting...

Task 1 waiting...

Task 3 waiting...

Tasks begin.

Task 0 completed with 0.032243443984690034

Task 3 completed with 0.15954361126406702

Task 1 completed with 0.7495177294818461

Task 2 completed with 0.7653446732794642

Tasks done.

Global Variable to Use a Condition in the Process Pool

An alternate approach to sharing a condition variable with workers in the process pool is to share the condition as a global variable.

This requires the use of the ‘fork‘ start method for creating new processes in Python.

A limitation of this approach is that the ‘fork‘ start method is not supported on all platforms. For example, ‘fork‘ is not supported on Windows.

In this example, we will update the previous example to use the ‘fork‘ start method and to share the multiprocessing.Condition with all workers in the process pool via a global variable.

Firstly, we update the task() function so that it does not take the condition variable as an argument, and instead assumes it is available via an inherited global variable.

The updated version of the task() function with this change is listed below.

# task executed in a worker process

def task(identifier):

    # acquire the condition via the inherited global variable

    with condition:

        # report waiting message

        print(f'Task {identifier} waiting...', flush=True)

        # wait to be notified

        condition.wait()

    # generate a value

    value = random()

    # block for a moment

    sleep(value)

    # report a message

    print(f'Task {identifier} completed with {value}', flush=True)

Next, in the main process, we can configure the program to use the ‘fork‘ start method when creating new child processes, such as those in the process pool.

...

# set the fork start method

set_start_method('fork')

You can learn more about setting the process start method in the tutorial:

Next, we can create the shared multiprocessing.Condition instance as per normal.

...

# create the shared condition

condition = Condition()

This will implicitly be a global variable.

A more explicit approach would be to declare a “condition” as a global variable, then assign it a new multiprocessing.Condition instance.

For example:

...

# declare the global variable

global condition

# assign the global variable

condition = multiprocessing.Condition()

This might be easier to read to newer Python programmers.

We can then create the process pool as per normal, then issue 4 tasks to the process pool asynchronously via the map_async() function.

...

# create and configure the process pool

with Pool(4) as pool:

    # issue tasks into the process pool

    result = pool.map_async(task, range(4))

We use the map_async() function instead of the starmap_async() function because our task() function only has one argument.

You can learn more about the map_async() function in the tutorial:

Finally, as before, the main process will block for a moment to allow all tasks to start and wait to be notified, then acquire the condition, notify all tasks, then wait for the tasks to complete.

...

# wait a moment

sleep(1)

# start both tasks at the same time

with condition:

    # report start message

    print(f'Tasks begin.', flush=True)

    # notify the tasks

    condition.notify_all()

# wait for all tasks to finish

result.wait()

# report done message

print(f'Tasks done.', flush=True)

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

# SuperFastPython.com

# example of using a condition in the process pool as a global variable

from random import random

from time import sleep

from multiprocessing import set_start_method

from multiprocessing import Condition

from multiprocessing.pool import Pool

# task executed in a worker process

def task(identifier):

    # acquire the condition via the inherited global variable

    with condition:

        # report waiting message

        print(f'Task {identifier} waiting...', flush=True)

        # wait to be notified

        condition.wait()

    # generate a value

    value = random()

    # block for a moment

    sleep(value)

    # report a message

    print(f'Task {identifier} completed with {value}', flush=True)

# protect the entry point

if __name__ == '__main__':

    # set the fork start method

    set_start_method('fork')

    # create the shared condition

    condition = Condition()

    # create and configure the process pool

    with Pool(4) as pool:

        # issue tasks into the process pool

        result = pool.map_async(task, range(4))

        # wait a moment

        sleep(1)

        # start both tasks at the same time

        with condition:

            # report start message

            print(f'Tasks begin.', flush=True)

            # notify the tasks

            condition.notify_all()

        # wait for all tasks to finish

        result.wait()

        # report done message

        print(f'Tasks done.', flush=True)

Running the example first creates the shared condition variable.

The process pool is then created with 4 worker processes.

The task arguments are prepared and 4 tasks are issued to the process pool. The main process then sleeps for a moment.

Each task begins executing, acquiring the condition variable, reporting a message and waiting to be notified.

The main process continues on. It acquires the condition variable then notifies all processes waiting on the condition variable. It then waits for all issued tasks to complete.

The waiting tasks are notified and continue on. Each task generates a random value between 0 and 1, then blocks for a fraction of a second, then reports a message.

All tasks complete and the main process continues on, reports a final message and closes the application.

This highlights an alternate way that we can share and use a condition variable among child worker processes in the process pool.

Task 0 waiting...

Task 1 waiting...

Task 2 waiting...

Task 3 waiting...

Tasks begin.

Task 0 completed with 0.1719506072263567

Task 2 completed with 0.3426125123900914

Task 3 completed with 0.7347756344488263

Task 1 completed with 0.9601410175104224

Tasks done.

Further Reading

This section provides additional resources that you may find helpful.

Books

I would also recommend specific chapters from these books:

Guides

APIs

References

Takeaways

You now know how to use a condition variable in the process pool in Python.

Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.

Photo by Tiaan van Zyl on Unsplash