Use a Semaphore in the Multiprocessing Pool - Super Fast Python

You can share a multiprocessing.Semaphore in child worker processes in the multiprocessing pool by using a multiprocessing.Manager.

In this tutorial you will discover how to use a semaphore in the process pool in Python.

Let’s get started.

Need To Use a Semaphore With the Process Pool

The multiprocessing.pool.Pool in Python provides a pool of reusable processes for executing ad hoc tasks.

A process pool can be configured when it is created, which will prepare the child workers.

A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.

multiprocessing — Process-based parallelism

We can issue one-off tasks to the process pool using functions such as apply() or we can apply the same function to an iterable of items using functions such as map().

Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as apply_async() and map_async().

When using the process pool, we may need to use a semaphore shared among all worker processes.

How can we use a semaphore in the process pool?

What is a Semaphore

A semaphore is a concurrency primitive that allows a limit on the number of processes (or threads) that can acquire a lock protecting a critical section.

It is an extension of a mutual exclusion (mutex) lock that adds a count for the number of processes that can acquire the lock before additional processes will block. Once full, new processes can only acquire access on the semaphore once an existing process holding the semaphore releases access.

Internally, the semaphore maintains a counter protected by a mutex lock that is incremented each time the semaphore is acquired and decremented each time it is released.

Semaphore: A synchronization object that has an associated nonnegative count. Two operations that are defined on a semaphore are known as wait (if the count is nonzero, decrement count; otherwise, block until count is positive) and post (increment count).

— PAGE 272, THE ART OF CONCURRENCY, 2009.

When a semaphore is created, the upper limit on the counter is set. If it is set to be 1, then the semaphore will operate like a mutex lock.

A semaphore provides a useful concurrency tool for limiting the number of processes that can access a resource concurrently. Some examples include:

  • Limiting concurrent socket connections to a server.
  • Limiting concurrent file operations on a hard drive.
  • Limiting concurrent calculations.

Now that we know what a semaphore is, let’s look at how we might use it in Python.

We can create a semaphore via a multiprocessing.Semaphore instance and specify the number of child processes that can acquire the semaphore at a time.

For example:

...

# create a semaphore with a limit of 100

semaphore = multiprocessing.Semaphore(100)

The semaphore can be acquired by calling the acquire() function, for example:

...

# acquire the semaphore

semaphore.acquire()

By default, it is a blocking call, which means that the calling process will block until access becomes available on the semaphore.

Once acquired, the semaphore can be released again by calling the release() function.

...

# release the semaphore

semaphore.release()

Finally, the multiprocessing.Semaphore class supports usage via the context manager, which will automatically acquire and release the semaphore for you. As such it is the preferred usage, if appropriate for your program.

For example:

...

# acquire the semaphore

with semaphore:

    # ...

You can learn more about how to use a semaphore in the tutorial:


Free Python Multiprocessing Pool Course

Download your FREE Process Pool PDF cheat sheet and get BONUS access to my free 7-day crash course on the Process Pool API.

Discover how to use the Multiprocessing Pool including how to configure the number of workers and how to execute tasks asynchronously.

Learn more
 


How to Use a Semaphore With the Process Pool

We can create a multiprocessing.Semaphore instance and share it with the child worker processes in the process pool.

There are perhaps three ways we can share a multiprocessing.Semaphore instance with worker processes, they are:

  • By passing it as an argument when initializing the worker processes.
  • By passing it as an argument to tasks executed by the pool.
  • By using the ‘fork‘ start method, storing it in a global variable, then having child processes inherit the variable.

The third method, using the ‘fork‘ start method will work, and provides an easy way to share a semaphore with child worker processes.

You can learn more about inheriting global variables by child processes in the tutorial:

The problem is, the ‘fork‘ start method is not available on all platforms, e.g. it cannot be used on Windows.

Alternately, if we naively pass a multiprocessing.Semaphore as an argument when initializing the process pool or in a task executed by the process pool, it will fail with an error, such as:

Semaphore objects should only be shared between processes through inheritance

Instead, we must use a multiprocessing.Manager.

A multiprocessing.Manager creates a process and is responsible for managing a centralized version of an object. It then provides proxy objects that can be used in other processes that keep up-to-date with the single centralized object.

Managers provide a way to create data which can be shared between different processes, including sharing over a network between processes running on different machines. A manager object controls a server process which manages shared objects. Other processes can access the shared objects by using proxies.

MULTIPROCESSING — PROCESS-BASED PARALLELISM

As such, using a multiprocessing.Manager is a useful way to centralize a synchronization primitive like a multiprocessing.Semaphore shared among multiple worker processes.

We can first create a multiprocessing.Manager using the context manager interface.

For example:

...

# create the manager

with Manager() as manager:

    # ...

We can then create a shared multiprocessing.Semaphore object using the manager.

This will return a proxy object for the multiprocessing.Semaphore object in the manager process that we can share among child worker processes directly or indirectly.

For example:

...

# create a shared object via the manager

semaphore = manager.Semaphore(100)

The proxy for the multiprocessing.Semaphore can then be passed to a child worker initialization function or to a task function executed by worker processes.

Now that we know how to share a multiprocessing.Semaphore with child worker processes in the process pool, let’s look at some worked examples.

Errors When Sharing a Semaphore With Child Worker Processes

Before we look at an example of how to successfully share a multiprocessing.Semaphore with child worker processes, let’s look at some common failure cases.

Three common errors when attempting to share a multiprocessing.Semaphore with worker processes are:

  1. Asynchronous tasks fail silently.
  2. Asynchronous tasks fail via error callback.
  3. Synchronous tasks fail with an error.

Let’s take a closer look at each failure case in turn.

Using a Semaphore in the Process Pool Fails Silently

The first common failure case involves issuing tasks asynchronously to the process pool and having them fail silently.

In this example, we define a task function that takes an integer identifier and a semaphore as an argument. It acquires the semaphore, generates a random number, blocks for a moment then reports the generated number. A semaphore is created that limits the number of tasks that can run concurrently. A process pool is created and many tasks are issued asynchronously.

The asynchronous tasks issued to the process pool fail silently.

They fail because a multiprocessing.Semaphore is passed to the task, which raises an error. The error is silent because the example does not explicitly get the result from the tasks, as there is no result to get.

Firstly, we can define the target task function executed in the process pool.

The function takes an integer identifier as an argument as well as a shared semaphore. The task acquires the semaphore using the context manager interface. It generates a random number between 0 and 1, blocks for a fraction of a second to simulate computational effort, then reports both its identifier and generated value.

The task() function below implements this.

# task executed in a worker process

def task(identifier, semaphore):

    # acquire the semaphore

    with semaphore:

        # generate a value

        value = random()

        # block for a moment

        sleep(value)

        # report a message

        print(f'Task {identifier} completed with {value}', flush=True)

Next, in the main process, we can define a semaphore with two positions.

...

# create the shared semaphore

semaphore = Semaphore(2)

We then define a process pool with 4 worker processes. In this case we use the context manager interface to ensure the process pool closes automatically once we are finished with it.

...

# create and configure the process pool

with Pool(4) as pool:

    # ...

You can learn more about the context manager interface in the tutorial:

We then define a list of task arguments, each item in the list representing a tuple of arguments for one call to the task() function, passing an integer and the shared semaphore.

...

# prepare task arguments

items = [(i, semaphore) for i in range(10)]

The items are then issued with calls to the task() function asynchronously via the starmap_async() function that returns an AsyncResult function immediately.

You can learn more about issuing asynchronous tasks to the process pool with starmap_async() in the tutorial:

...

# issue tasks into the process pool

result = pool.starmap_async(task, items)

Finally, the main process blocks until all tasks are completed.

...

# wait for all tasks to finish

result.wait()

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

# SuperFastPython.com

# example of using a semaphore in the process pool that fails silently

from random import random

from time import sleep

from multiprocessing import Semaphore

from multiprocessing.pool import Pool

# task executed in a worker process

def task(identifier, semaphore):

    # acquire the semaphore

    with semaphore:

        # generate a value

        value = random()

        # block for a moment

        sleep(value)

        # report a message

        print(f'Task {identifier} completed with {value}', flush=True)

# protect the entry point

if __name__ == '__main__':

    # create the shared semaphore

    semaphore = Semaphore(2)

    # create and configure the process pool

    with Pool(4) as pool:

        # prepare task arguments

        items = [(i, semaphore) for i in range(10)]

        # issue tasks into the process pool

        result = pool.starmap_async(task, items)

        # wait for all tasks to finish

        result.wait()

Running the example first creates the shared semaphore with 2 positions.

The process pool is then created with 4 worker processes.

The task arguments are prepared and 10 tasks are issued to the process pool. The main process then blocks until the tasks are complete.

The tasks fail to execute and no messages are reported.

No error messages are reported and the tasks fail silently.

The main process continues on almost immediately.

The tasks failed because a multiprocessing.Semaphore was shared directly with the child worker processes. An error was raised by each call to the task() function. The error was stored in the returned AsyncResult object, but was not re-raised or checked for, making it appear that the tasks failed silently.

Next, let’s look at how we might expose the error that is raised by issuing asynchronous tasks.

RuntimeError in Error Callback Using Semaphore in the Process Pool

Sharing a multiprocessing.Semaphore with tasks in the process pool via an argument to the task function will fail with a RuntimeError.

We can explore this error when issuing asynchronous tasks to the pool by using an error callback.

In this example we will update the previous example that fails silently to report the error caused by sharing a semaphore with child processes in the process pool.

This can be achieved by defining a function that takes an error argument and reports the error message directly.

# error callback function

def custom_error_callback(error):

    print(error, flush=True)

Next, we can configure the starmap_async() function to use the error callback function when an exception is raised executing a task in the process pool.

...

# issue tasks into the process pool

result = pool.starmap_async(task, items, error_callback=custom_error_callback)

And that’s it.

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

# SuperFastPython.com

# example of using a semaphore in the process pool that fails with an error

from random import random

from time import sleep

from multiprocessing import Semaphore

from multiprocessing.pool import Pool

# error callback function

def custom_error_callback(error):

    print(error, flush=True)

# task executed in a worker process

def task(identifier, semaphore):

    # acquire the semaphore

    with semaphore:

        # generate a value

        value = random()

        # block for a moment

        sleep(value)

        # report a message

        print(f'Task {identifier} completed with {value}', flush=True)

# protect the entry point

if __name__ == '__main__':

    # create the shared semaphore

    semaphore = Semaphore()

    # create and configure the process pool

    with Pool(4) as pool:

        # prepare task arguments

        items = [(i, semaphore) for i in range(10)]

        # issue tasks into the process pool

        result = pool.starmap_async(task, items, error_callback=custom_error_callback)

        # wait for all tasks to finish

        result.wait()

Running the example first creates the shared semaphore.

The process pool is then created with 4 worker processes.

The task arguments are prepared and 10 tasks are issued to the process pool. The main process blocks until the tasks are complete.

Each issued task fails with a RuntimeError. The first task to raise the error calls the error callback, which reports the error directly.

This highlights how the process pool is configured to not allow semaphores to be passed directly as arguments.

Semaphore objects should only be shared between processes through inheritance

Next, let’s look at how we might expose the error that is raised by issuing synchronous tasks instead of asynchronous tasks.

RuntimeError Using Semaphore in the Process Pool

We can update the previous example so that tasks are issued synchronously instead of asynchronously.

Issuing tasks synchronously means that the main process will block until all tasks are complete and the return value results of the tasks will be returned directly. This is important because if at least one task raises an error while executing, it will be automatically re-raised in the caller main process.

This can be achieved by changing the call to starmap_async() which is asynchronous to starmap() which is blocking and synchronous.

...

# issue tasks into the process pool

pool.starmap(task, items)

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

# SuperFastPython.com

# example of using a semaphore in the process pool that fails with an exception

from random import random

from time import sleep

from multiprocessing import Semaphore

from multiprocessing.pool import Pool

# task executed in a worker process

def task(identifier, semaphore):

    # acquire the semaphore

    with semaphore:

        # generate a value

        value = random()

        # block for a moment

        sleep(value)

        # report a message

        print(f'Task {identifier} completed with {value}', flush=True)

# protect the entry point

if __name__ == '__main__':

    # create the shared semaphore

    semaphore = Semaphore(2)

    # create and configure the process pool

    with Pool(4) as pool:

        # prepare task arguments

        items = [(i, semaphore) for i in range(10)]

        # issue tasks into the process pool

        pool.starmap(task, items)

Running the example first creates the shared semaphore with 2 positions.

The process pool is then created with 4 worker processes.

The task arguments are prepared and 10 tasks are issued to the process pool. The main process blocks until the tasks are complete.

Each issued task fails with a RuntimeError, the first of which is re-raised in the main process.

Traceback (most recent call last):

  ...

RuntimeError: Semaphore objects should only be shared between processes through inheritance

Now that we have confirmed that we cannot pass a normal multiprocessing.Semaphore to a task executed in the process pool, let’s look at how we might use a multiprocessing.Manager to fix the problem.

Example of Using a Semaphore in the Process Pool


Python Multiprocessing Pool Jump-Start

Loving The Tutorials?

Why not take the next step? Get the book.

Learn more
 


We can explore how to use a multiprocessing.Manager to share a multiprocessing.Semaphore among child worker processes in the process pool.

This can be achieved by updating the first asynchronous example of issuing tasks that failed silently to create the multiprocessing.Semaphore using a multiprocessing.Manager.

First, a manager can be created using the context manager interface. This ensures that the manager is closed automatically once we are finished with it.

...

# create the manager

with Manager() as manager:

    # ...

Next, a multiprocessing.Semaphore can be created using the multiprocessing.Manager instance.

This will create and host a semaphore in a new server process and returns a proxy object that can be shared among child worker processes and used to interface with the centralized semaphore instance.

...

# create the shared semaphore

semaphore = manager.Semaphore(2)

And that’s it.

The complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

# SuperFastPython.com

# example of using a semaphore in the process pool that use a manager

from random import random

from time import sleep

from multiprocessing import Manager

from multiprocessing.pool import Pool

# task executed in a worker process

def task(identifier, semaphore):

    # acquire the semaphore

    with semaphore:

        # generate a value

        value = random()

        # block for a moment

        sleep(value)

        # report a message

        print(f'Task {identifier} completed with {value}', flush=True)

# protect the entry point

if __name__ == '__main__':

    # create the manager

    with Manager() as manager:

        # create the shared semaphore

        semaphore = manager.Semaphore(2)

        # create and configure the process pool

        with Pool(4) as pool:

            # prepare task arguments

            items = [(i, semaphore) for i in range(10)]

            # issue tasks into the process pool

            result = pool.starmap_async(task, items)

            # wait for all tasks to finish

            result.wait()

Running the example first creates the shared semaphore with 2 positions.

The process pool is then created with 4 worker processes.

The task arguments are prepared and 10 tasks are issued to the process pool. The main process then blocks until the tasks are complete.

Each issued task then executes in the process pool. Each attempts to acquire the shared semaphore, but only two tasks are able to acquire a position and execute at a time.

The tasks generate random values between 0 and 1, block for a moment, then report a message with their integer identifier and their generated number.

Note, the random numbers generated will differ each time the code is run.

All tasks complete normally without error.

This highlights how we can share and use a semaphore among child worker processes in the process pool.

Task 2 completed with 0.22633415644992416

Task 1 completed with 0.48353171152414876

Task 0 completed with 0.38649326532159967

Task 4 completed with 0.3246889881862385

Task 3 completed with 0.5682974340155695

Task 5 completed with 0.2462666839753167

Task 6 completed with 0.6346508657452659

Task 7 completed with 0.6696792113625666

Task 8 completed with 0.3913289137773729

Task 9 completed with 0.6752871564752958

Example of Using a Global Variable to Share a Semaphore with the Process Pool

An alternate approach to sharing a semaphore with workers in the process pool is to share the semaphore as a global variable.

This requires the use of the ‘fork‘ start method for creating new processes in Python.

A limitation of this approach is that the ‘fork‘ start method is not supported on all platforms. For example, ‘fork’ is not supported on Windows.

In this example, we will update the previous example to use the ‘fork‘ start method and to share the multiprocessing.Semaphore with all workers in the process pool via a global variable.

Firstly, we update the task() function so that it does not take the semaphore as an argument, and instead assumes it is available via an inherited global variable.

The updated version of the task() function with this change is listed below.

# task executed in a worker process

def task(identifier):

    # acquire the semaphore

    with semaphore:

        # generate a value

        value = random()

        # block for a moment

        sleep(value)

        # report a message

        print(f'Task {identifier} completed with {value}', flush=True)

Next, in the main process, we can configure the program to use the ‘fork‘ start method when creating new child processes, such as those in the process pool.

...

# set the fork start method

set_start_method('fork')

You can learn more about setting the process start method in the tutorial:

Next, we can create the shared multiprocessing.Semaphore instance as per normal.

...

# create the shared semaphore instance

semaphore = Semaphore(2)

This will implicitly be a global variable.

A more explicit approach would be to declare a “semaphore” as a global variable, then assign it a new multiprocessing.Semaphore instance.

For example:

...

# declare the global variable

global semaphore

# assign the global variable

semaphore = multiprocessing.Semaphore(2)

This might be easier to read to newer Python programmers.

We can then create the process pool as per normal, then issue 10 tasks to the process pool asynchronously via the map_async() function and wait for the tasks to complete.

...

# create and configure the process pool

with Pool(4) as pool:

    # issue tasks into the process pool

    result = pool.map_async(task, range(10))

    # wait for all tasks to finish

    result.wait()

We use the map_async() function instead of the starmap_async() function because our task() function only has one argument.

You can learn more about the map_async() function in the tutorial:

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

# SuperFastPython.com

# example of sharing a semaphore with worker processes using a global variable

from random import random

from time import sleep

from multiprocessing import set_start_method

from multiprocessing import Semaphore

from multiprocessing.pool import Pool

# task executed in a worker process

def task(identifier):

    # acquire the semaphore

    with semaphore:

        # generate a value

        value = random()

        # block for a moment

        sleep(value)

        # report a message

        print(f'Task {identifier} completed with {value}', flush=True)

# protect the entry point

if __name__ == '__main__':

    # set the fork start method

    set_start_method('fork')

    # create the shared semaphore instance

    semaphore = Semaphore(2)

    # create and configure the process pool

    with Pool(4) as pool:

        # issue tasks into the process pool

        result = pool.map_async(task, range(10))

        # wait for all tasks to finish

        result.wait()

Running the example first creates the shared semaphore with 2 positions.

The process pool is then created with 4 worker processes.

The task arguments are prepared and 10 tasks are issued to the process pool. The main process then blocks until the tasks are complete.

Each issued task then executes in the process pool. Each attempts to acquire the shared semaphore, but only two tasks are able to acquire a position and execute at a time.

The tasks generate random values between 0 and 1, block for a moment, then report a message with their integer identifier and their generated number.

The semaphore instance is accessed via the inherited global variable.

Note, the random numbers generated will differ each time the code is run.

All tasks complete normally without error.

This highlights an alternate way that we can share and use a semaphore among child worker processes in the process pool.

Task 0 completed with 0.3473932278198989

Task 2 completed with 0.056947916632273166

Task 1 completed with 0.5170349183008216

Task 4 completed with 0.1397094529366133

Task 5 completed with 0.17672185473701796

Task 6 completed with 0.1402163071067637

Task 3 completed with 0.6271489804953453

Task 8 completed with 0.5274392172317696

Task 7 completed with 0.6789224867965732

Task 9 completed with 0.39673788336317106

Further Reading

This section provides additional resources that you may find helpful.

Books

I would also recommend specific chapters from these books:

Guides

APIs

References

Takeaways

You now know how to use a semaphore in the process pool in Python.

Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.

Photo by Toby Christopher on Unsplash