Multiprocessing Race Conditions in Python - Super Fast Python

Last Updated on December 22, 2022

You can suffer race conditions when using process-based concurrency via the multiprocessing module in Python.

The types of race conditions we can expect may be different than those expected with threads, given that we are working with processes that do not have shared memory. Nevertheless, we must identify and protect critical sections from race conditions when using processes.

In this tutorial, you will discover how to identify and fix race conditions with processes in Python.

This tutorial was inspired by questions and discussions with Andrii K. Thank you deeply! If you have a question about Python concurrency, message me anytime.

Let’s get started.

What is a Race Condition

A race condition is a bug in concurrency programming.

Race condition: A flaw in a concurrent application in which the result is dependent on the timing or sequence of multiple threads’ execution.

— Page 271, The Art of Concurrency, 2009.

A race condition is a failure case where the behavior of the program is dependent upon the order of execution by two or more threads. This means that the behavior of the program will not be predictable, possibly changing each time it is run.

When threads or processes attempt to simultaneously access a shared resource, and the accesses can result in an error, we often say the program has a race condition, because the threads or processes are in a “race” to carry out an operation.

— Page 53, An Introduction to Parallel Programming, 2020.

There are many types of race conditions and they almost always have to do with an unexpected sequence of operations.

Two common classes of race conditions include:

  1. Race caused by accessing shared data or state.
  2. Race conditions caused due to timing.

Both of these race conditions can occur when using threads in Python.

Race Conditions with Threads

Race conditions are a real problem in Python when using threads, even in the presence of the global interpreter lock (GIL).

The refrain that there are no race conditions in Python because of the GIL is dangerously wrong.

Let’s look at two examples

Race Condition With Shared Data

For example, one thread may be adding values to a variable, while another thread is subtracting values from the same variable.

Let’s call them an adder thread and a subtractor thread.

At some point, the operating system may context switch from the adding thread to the subtracting thread in the middle of updating the variable. Perhaps right at the point where it was about to write an updated value with an addition, say from the current value of 100 to the new value of 110.

...

# add to the variable

variable = variable + 10

You may recall that the operating system controls what threads execute and when, and that a context switch refers to pausing the execution of a thread and storing its state while unpausing another thread and restoring its state.

You may also notice that the adding or subtracting from the variable is composed of at least three steps:

  1. Read the current value of the variable.
  2. Calculate a new value for the variable.
  3. Write a new value for the variable.

A context switch between threads may occur at any point in this task.

Back to our threads. The subtracting thread runs and reads the current value as 100 and reduces the value from 100 to 90.

This subtraction is performed as per normal and the variable value is now 90.

The operating system context switches back to the adding thread and it picks up where it left off writing the value 110.

This means that in this case, one subtraction operation was lost and the shared balance variable has an inconsistent value. A race condition.

This type of race condition can be avoided by using a mutex lock.

Thread scheduling is inherently nondeterministic. Because of this, failure to use locks in threaded programs can result in randomly corrupted data and bizarre behavior known as a “race condition.” To avoid this, locks should always be used whenever shared mutable state is accessed by multiple threads.

— Page 497, Python Cookbook, 2013.

You can see an example of this type of race condition between threads in the tutorial:

Race Condition Due to Timing

Python threads may suffer a race condition due to a bug with timing.

For example, consider the case of the use of a threading.Condition used by two threads to coordinate their behavior.

If you are new to a condition object, you can learn more here:

One thread may wait on the threading.Condition to be notified of some state change within the application via the wait function.

...

# wait for a state change

with condition:

condition.wait()

Recall that when using a threading.Condition, you must acquire the condition before you can call wait() or notify(), then release it once you are done. This is easily achieved using the context manager.

Another thread may perform some change within the application and alert the waiting thread via the condition with the notify() function.

...

# alert the waiting thread

with condition:

condition.notify()

This is an example of coordinated behavior between two threads where one thread signals another thread.

For the behavior to work as intended, the notification from the second thread must be sent after the first thread has started waiting. If the first thread calls wait() after the second thread calls notify(), then it will not be notified and will wait forever.

This may happen if there is a context switch by the operating system that allows the second thread that calls notify() to run before the first thread that calls wait() to run.

You may recall that the operating system controls what threads execute and when, and that a context switch refers to pausing the execution of a thread and storing its state while unpausing another thread and restoring its state.

You can see an example of a thread race condition due to timing in the tutorial:

Next, let’s consider race conditions with processes.

Race Conditions with Processes

Python provides process-based concurrency via the multiprocessing module.

You can get started with process-based concurrency with the guide:

Race conditions are possible in Python when using process-based concurrency.

Let’s consider the two types of race conditions considered above with threads:

  1. Race caused by accessing shared data or state.
  2. Race conditions caused due to timing.

Race Condition With Shared Data

Race conditions with shared data is challenging with processes.

The reason is that processes cannot share data directly.

Threads have shared memory (within one process), therefore a global variable can be accessed by different threads at the same time and left in an inconsistent state.

This cannot happen directly with processes.

A child process created using the “fork” start method will get a copy of all global variables from the parent process, but any changes to the global variable in the child will have no effect on the global variable in the parent.

You can learn more about this in the tutorial:

There are two mechanics for sharing Python objects and data between processes in a Python program, they are:

  1. Host the object on a server process and allow processes to interact with it indirectly via proxy objects. This can be achieved using a multiprocessing.Manager.
  2. Share primitive types (e.g. integers and floats) between processes using shared ctypes via the multiprocessing.Value and multiprocessing.Array classes.

Race conditions are possible when using either of these two mechanisms to share data between processes.

Additionally, it is possible for two processes to interact with other types of shared resources and suffer a race condition as a result.

A common example is a file.

If two or more processes attempt to write to the same file at the same time, the results may be unpredictable.

Processes will race to write to the file, overwrite the content of the file and leave the file in an unexpected state.

This is most commonly seen when logging to one file from multiple processes.

You can learn more about this in the tutorial:

Race Condition Due to Timing

Processes can suffer race conditions due to timing.

This is just like the race conditions seen with threads.

A synchronization primitive may be shared among processes and one or more processes may wait to be notified after another process has sent the notification message.

This type of race condition can occur with an assortment of primitives that involve waiting, such as events, condition variables, and barriers.

Process Safety

The solution is to ensure that multiprocessing code is process-safe.

Process-safe is the concept of thread safety applied to concurrency with processes.

This may involve making correct use of synchronization primitives, such as:

It may also involve separating out responsibilities for interacting with a resource into a controlling process and having other processes send messages to the controlling process via a multiprocessing queue.

You can learn more about multiprocessing queues in the tutorial:

You can learn more about process safety in the tutorial:

Now that we know that race conditions are possible with processes in Python, let’s explore some examples.

Starting with a negative example.


Free Python Multiprocessing Course

Download your FREE multiprocessing PDF cheat sheet and get BONUS access to my free 7-day crash course on the multiprocessing API.

Discover how to use the Python multiprocessing module including how to create and start child processes and how to use a mutex locks and semaphores.

Learn more
 


Multiprocessing Cannot Race with a Global Variable

We can explore the case where a global variable is shared with child processes and does not lead to a race condition.

This situation is important to demonstrate because the same solution when using thread-based concurrency will lead to a race condition.

In this example, we will define two tasks, “adder” and “subtractor“. The adder task will access a shared global variable named “value” and add one to it, with plenty of room for a context switch to occur during the operation. Similarly, the subtractor operation will access a global variable named “value” and subtract one from it with plenty of room for a context switch during the operation.

Specifically, will unroll a “value += 1” and “value -= 1” into their separate operations of: “copy value into tmp“, “change tmp“, and “copy tmp back to value” and add sleep operations between each step. This signals to the operating system that the task may context switch at that point.

Each adder and subtractor task will be performed in a loop that iterates 1,000,000 times, offering many opportunities to race.

Each task will be executed in a separate child process and the child processes will be created using the “fork” start method that ensures that each child is a copy of the parent process.

Because the “value” global variable is defined in the parent process, it will be copied and made available to the child processes.

Note: The “fork” start method is not available on windows. As such this example may not run on the Windows platform. It will work fine on Linux, macOS, and other platforms.

The complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

# SuperFastPython.com

# example of an attempted race condition with a shared variable

from time import sleep

from multiprocessing import Process

from multiprocessing import set_start_method

# make additions into the global variable

def adder(amount, repeats):

    global value

    for _ in range(repeats):

        # copy the value

        tmp = value

        # suggest a context switch

        sleep(0)

        # change the copy

        tmp = tmp + amount

        # suggest a context switch

        sleep(0)

        # copy the value back

        value = tmp

# make subtractions from the global variable

def subtractor(amount, repeats):

    global value

    for _ in range(repeats):

        # copy the value

        tmp = value

        # suggest a context switch

        sleep(0)

        # change the copy

        tmp = tmp - amount

        # suggest a context switch

        sleep(0)

        # copy the value back

        value = tmp

if __name__ == '__main__':

    # set start method

    set_start_method('fork')

    # define the global variable

    global value

    value = 0

    # start a thread making additions

    adder_thread = Process(target=adder, args=(1, 1000000))

    adder_thread.start()

    # start a thread making subtractions

    subtractor_thread = Process(target=subtractor, args=(1, 1000000))

    subtractor_thread.start()

    # wait for both processes to finish

    print('Waiting for processes to finish...')

    adder_thread.join()

    # subtractor_thread.join()

    # report the value

    print(f'Value: {value}')

Running the example first declares and defines the “value” global variable.

The adder and subtractor processes are then created and started.

The main process then waits for both processes to complete.

The adder and subtractor processes run, looping one million times each and modifying the global variable.

The child processes finish and the value of the global variable is reported.

And it has the expected value of zero.

Waiting for processes to finish...

Value: 0

There was no race condition because each process operated on a separate copy of the “value” global variable.

The main process had one copy that was never changed and remained at the value of zero until reported at the end.

Each child process had a copy of the global variable and either added to it or subtracted from it. Their values were changed and never interacted with each other or the copy in the main process.

We cannot get a race condition in this way with processes because we cannot share a global variable directly.

Next, let’s look at an example of sharing data between processes that is subject to a race condition.

Multiprocessing Race Condition with a Hosted Variable

One way to share data between processes is to use a multiprocessing.Manager.

A manager is a server process that hosts Python objects and returns proxy objects. Processes can then use the hosted object via the proxy objects, just like a shared global variable.

All of the inter-process communication required to read and write the shared variable is performed automatically behind the scenes via the proxy objects.

You can learn more about managers in the tutorial:

We can create a custom class that suffers race conditions and hosts it with a manager.

Why not create a custom class with a member variable and share this among child processes directly, without a Manager?

Excellent question! Because each child process would get and work with a copy of the shared object. Any changes to the shared object would be limited to the process that made those changes (e.g. we would be in the same situation as the example in the previous section).

You can see an example of this in the tutorial:

A manager is required to really share an object between child processes and have changes reflected in other processes.

Let’s explore this case and then how to fix it.

Example of Race Condition with a Manager

In this example we can define a custom class that is subject to race conditions, host it on the manager and then interact with it from multiple child processes, triggering a race condition.

Firstly, we can create a custom counter class for incrementing and decrementing a variable.

We can perform the changes to the variable in a way that strongly encourages the operating system to context switch in the middle of the operations, essentially forcing a race condition.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

# custom counter class

class UnsafeCounter():

    def __init__(self, count):

        self._value = count

    # retrieve the variable

    def get(self):

        return self._value

    # add one to the variable

    def increment(self):

        # copy value

        tmp = self._value

        # allow a context switch

        sleep(0)

        # increment the value

        tmp = tmp + 1

        # allow a context switch

        sleep(0)

        # copy the updated value

        self._value = tmp

    # subtract one from the variable

    def decrement(self):

        # copy value

        tmp = self._value

        # allow a context switch

        sleep(0)

        # increment the value

        tmp = tmp - 1

        # allow a context switch

        sleep(0)

        # copy the updated value

        self._value = tmp

We can host this custom class on a Manager.

This requires first defining a custom Manager class.

# custom manager to support custom classes

class CustomManager(BaseManager):

    # nothing

    pass

Then registering the custom class on the custom manager class, so it knows how to make it.

...

# register the custom class on the custom manager

CustomManager.register('UnsafeCounter', UnsafeCounter)

You can learn more about hosting custom classes on a manager in the tutorial:

We can define custom task functions to increment and decrement our shared counter.

Each task will perform 1,000 iterations and call increment() or decrement() methods on the shared counter.

# task executed in a child process

def adder_task(counter):

    for i in range(1000):

        counter.increment()

# task executed in a child process

def subtractor_task(counter):

    for i in range(1000):

        counter.decrement()

The main process can first create the custom Manager class and then create the shared counter using the manager.

This will host the counter on the manager server process and return proxy objects that can be shared with the child processes and used to interact with the hosted object.

...

# create manager

with CustomManager() as manager:

    # create the counter

    counter = manager.UnsafeCounter(0)

We can then start two child processes, one for adding the shared counter and one for subtracting, and wait for them to complete.

...

# create child processes

adder_process = Process(target=adder_task, args=(counter,))

subtractor_process = Process(target=subtractor_task, args=(counter,))

# start child processes

adder_process.start()

subtractor_process.start()

# wait for processes to complete

adder_process.join()

subtractor_process.join()

Finally, we can report the value of the shared counter.

...

# report the value

print(f'Value: {counter.get()}')

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

# SuperFastPython.com

# example of a race condition between processes with shared memory

from time import sleep

from multiprocessing import Process

from multiprocessing.managers import BaseManager

# custom counter class

class UnsafeCounter():

    def __init__(self, count):

        self._value = count

    # retrieve the variable

    def get(self):

        return self._value

    # add one to the variable

    def increment(self):

        # copy value

        tmp = self._value

        # allow a context switch

        sleep(0)

        # increment the value

        tmp = tmp + 1

        # allow a context switch

        sleep(0)

        # copy the updated value

        self._value = tmp

    # subtract one from the variable

    def decrement(self):

        # copy value

        tmp = self._value

        # allow a context switch

        sleep(0)

        # increment the value

        tmp = tmp - 1

        # allow a context switch

        sleep(0)

        # copy the updated value

        self._value = tmp

# custom manager to support custom classes

class CustomManager(BaseManager):

    # nothing

    pass

# task executed in a child process

def adder_task(counter):

    for i in range(1000):

        counter.increment()

# task executed in a child process

def subtractor_task(counter):

    for i in range(1000):

        counter.decrement()

# protect the entry point

if __name__ == '__main__':

    # register the custom class on the custom manager

    CustomManager.register('UnsafeCounter', UnsafeCounter)

    # create manager

    with CustomManager() as manager:

        # create the counter

        counter = manager.UnsafeCounter(0)

        # create child processes

        adder_process = Process(target=adder_task, args=(counter,))

        subtractor_process = Process(target=subtractor_task, args=(counter,))

        # start child processes

        adder_process.start()

        subtractor_process.start()

        # wait for processes to complete

        adder_process.join()

        subtractor_process.join()

        # report the value

        print(f'Value: {counter.get()}')

Running the example first registers the custom counter class on the custom manager.

An instance of the custom manager is created which creates a server process, then an instance of the custom counter class is created on the manager process, returning proxy objects.

The two child processes are created and provided access to the shared counter via proxy objects.

The main process waits until the tasks are completed.

Each task loops 1,000 times either incrementing or decrementing the shared counter via the proxy objects.

The tasks are completed and the value of the shared counter is reported.

We expect the value to be zero every time because we issued 1,000 increments and 1,000 decrements.

In this case, we see the value is not zero.

In fact, the program gives a different final value every time it is run.

We have a race condition.

Example of Fixing a Race Condition with a Manager

We can fix a race condition between processes using shared memory via a manager.

There are a few ways we could achieve this.

One way might be to share a multiprocessing.Lock object and use it in each task function to protect the usage of the counter.

For example:

# task executed in a child process

def adder_task(counter, lock):

    for i in range(1000):

        with lock:

            counter.increment()

# task executed in a child process

def subtractor_task(counter, lock):

    for i in range(1000):

        with lock:

            counter.decrement()

This will work.

The mutex lock can only be acquired and held by one child process at a time, ensuring only one child process can either increment or decrement the shared counter at a time.

You can learn more about multiprocessing mutex locks in the tutorial:

Another approach is to move the lock into the custom counter class and make it process-safe.

Each time a method in the class needs to operate on the internal variable, it must acquire the lock first. This ensures that all reads and writes to the shared variable are mutually exclusive, e.g. one at a time.

This might be a better approach as it hides the change, transforming our UnsafeCounter into a SafeCounter.

For example:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

# custom counter class

class SafeCounter():

    def __init__(self, count):

        self._lock = Lock()

        self._value = count

    # retrieve the variable

    def get(self):

        with self._lock:

            return self._value

    # add one to the variable

    def increment(self):

        with self._lock:

            # copy value

            tmp = self._value

            # allow a context switch

            sleep(0)

            # increment the value

            tmp = tmp + 1

            # allow a context switch

            sleep(0)

            # copy the updated value

            self._value = tmp

    # subtract one from the variable

    def decrement(self):

        with self._lock:

            # copy value

            tmp = self._value

            # allow a context switch

            sleep(0)

            # increment the value

            tmp = tmp - 1

            # allow a context switch

            sleep(0)

            # copy the updated value

            self._value = tmp

The updated version with this change is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

# SuperFastPython.com

# example of fixing a race condition between processes with shared memory

from time import sleep

from multiprocessing import Process

from multiprocessing import Lock

from multiprocessing.managers import BaseManager

# custom counter class

class SafeCounter():

    def __init__(self, count):

        self._lock = Lock()

        self._value = count

    # retrieve the variable

    def get(self):

        with self._lock:

            return self._value

    # add one to the variable

    def increment(self):

        with self._lock:

            # copy value

            tmp = self._value

            # allow a context switch

            sleep(0)

            # increment the value

            tmp = tmp + 1

            # allow a context switch

            sleep(0)

            # copy the updated value

            self._value = tmp

    # subtract one from the variable

    def decrement(self):

        with self._lock:

            # copy value

            tmp = self._value

            # allow a context switch

            sleep(0)

            # increment the value

            tmp = tmp - 1

            # allow a context switch

            sleep(0)

            # copy the updated value

            self._value = tmp

# custom manager to support custom classes

class CustomManager(BaseManager):

    # nothing

    pass

# task executed in a child process

def adder_task(counter):

    for i in range(1000):

        counter.increment()

# task executed in a child process

def subtractor_task(counter):

    for i in range(1000):

        counter.decrement()

# protect the entry point

if __name__ == '__main__':

    # register the custom class on the custom manager

    CustomManager.register('SafeCounter', SafeCounter)

    # create manager

    with CustomManager() as manager:

        # create the counter

        counter = manager.SafeCounter(0)

        # create child processes

        adder_process = Process(target=adder_task, args=(counter,))

        subtractor_process = Process(target=subtractor_task, args=(counter,))

        # start child processes

        adder_process.start()

        subtractor_process.start()

        # wait for processes to complete

        adder_process.join()

        subtractor_process.join()

        # report the value

        print(f'Value: {counter.get()}')

Running the example first registers the custom counter class on the custom manager.

An instance of the custom manager is created which creates a server process, then an instance of the custom counter class is created on the manager process, returning proxy objects.

Importantly, the counter creates its own internal lock when the object is initialized.

The two child processes are created and provided access to the shared counter via proxy objects.

The main process waits until the tasks are completed.

Each task loops 1,000 times either incrementing or decrementing the shared counter via the proxy objects.

The tasks are completed and the value of the shared counter is reported.

We expect the value to be zero every time because we issued 1,000 increments and 1,000 decrements.

In this case, we can see that this expectation is met. In fact, the value of the shared counter is zero every time the program is run.

The race condition has been fixed.

Next, let’s look at a race condition between processes using a shared ctype variable.


Python Multiprocessing Jump-Start

Loving The Tutorials?

Why not take the next step? Get the book.

Learn more
 


Multiprocessing Race Condition with a Shared ctype

Another way we can share a variable between processes is by using shared ctypes.

The ctypes module provides tools for working with C data types.

Python provides the capability to share ctypes between processes on one system.

This is primarily achieved via the following classes:

  • multiprocessing.Value: manage a shared value.
  • multiprocessing.Array: manage an array of shared values.

The multiprocessing.Value class will create a shared ctype with a specified data type and initial value.

The first argument defines the data type for the value. It may be a string type code or a Python ctype class. The second argument may be an initial value.

For example, we can define a signed integer type with the ‘i’ type code and an initial value of zero as follows:

...

# create a integer value

variable = multiprocessing.Value('i', 0)

The data within the multiprocessing.Value object can be accessed via the “value” attribute.

...

# get the data

data = variable.value

You can learn more about how to create and share ctypes between processes in the tutorial:

We can suffer race conditions when sharing ctypes between processes.

Let’s explore an example and then how to fix it.

Example of Race Condition with Shared ctype

In this example, we can define an integer shared ctype variable and share it among processes.

Each process will change the variable at the same time, triggering a race condition and leaving the variable in an unknown and inconsistent state.

We will define two tasks, an adder, and a subtractor task, which we did previously.

The adder task will loop 1,000 times and increment the variable each iteration, whereas the subtractor task will subtract from the task each iteration.

The final value of the shared ctype will be reported, which would expect to be zero, given the balanced number of increments and decrements.

The complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

# SuperFastPython.com

# example of an a race condition with a shared ctype

from time import sleep

from multiprocessing import Process

from multiprocessing import Value

# make additions into the shared variable

def adder(variable):

    for _ in range(1000):

        # increment the variable

        variable.value += 1

# make subtractions from the shared variable

def subtractor(variable):

    for _ in range(1000):

        # decrement the variable

        variable.value -= 1

if __name__ == '__main__':

    # create a shared ctype integer

    variable = Value('i', 0)

    # start a thread making additions

    adder_thread = Process(target=adder, args=(variable,))

    adder_thread.start()

    # start a thread making subtractions

    subtractor_thread = Process(target=subtractor, args=(variable,))

    subtractor_thread.start()

    # wait for both processes to finish

    print('Waiting for processes to finish...')

    adder_thread.join()

    subtractor_thread.join()

    # report the value

    print(f'Value: {variable.value}')

Running the example first creates the shared ctype and initializes it to zero.

The adder process is then created and passed the ctype variable, then started. Similarly. the subtractor child process is created and also passed the same ctype variable and started.

The main process then waits for both processes to complete.

Each process loops 1,000 times and modifies the value within the shared ctype.

Because the variable is shared, both processes operate on the same variable.

Both child processes are completed and the final value of the shared ctype is reported.

In this case, the final value is not zero.

Waiting for processes to finish…

Value: 221

In fact, the final value is different each time the program is run given a race condition.

Waiting for processes to finish…

Value: -659

Next, let’s look at how we can fix this race condition.

Example of Fixing a Race Condition with Shared ctype

We can fix the race condition by using a shared mutex lock.

One approach would be to create a mutex lock in the main process and share it with each child process.

Each time the adder and subtractor tasks change the shared ctype, they can first acquire the lock. This would ensure that all changes to the shared ctype are serialized, that is only one child process can change the variable at a time.

For example:

# make additions into the shared variable

def adder(variable, lock):

    for _ in range(1000):

        # acquire the lock on the variable

        with lock:

            # increment the variable

            variable.value += 1

# make subtractions from the shared variable

def subtractor(variable,lock ):

    for _ in range(1000):

        # acquire the lock on the variable

        with lock:

            # decrement the variable

            variable.value -= 1

In fact, this is not necessary.

By default, the shared ctype Value and Array objects will create and manage their own internal mutex lock.

This lock can be retrieved via the get_lock() method and can be used to protect direct modification of the shared ctype value.

For example:

# make additions into the shared variable

def adder(variable):

    for _ in range(1000):

        # acquire the lock on the variable

        with variable.get_lock():

            # increment the variable

            variable.value += 1

# make subtractions from the shared variable

def subtractor(variable):

    for _ in range(1000):

        # acquire the lock on the variable

        with variable.get_lock():

            # decrement the variable

            variable.value -= 1

This is the correct and preferred way to modify and access the values of a shared ctype to ensure it is process-safe.

The updated version of the program with these changes that fix the race condition is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

# SuperFastPython.com

# example of fixing a race condition with a shared ctype

from time import sleep

from multiprocessing import Process

from multiprocessing import Value

# make additions into the shared variable

def adder(variable):

    for _ in range(1000):

        # acquire the lock on the variable

        with variable.get_lock():

            # increment the variable

            variable.value += 1

# make subtractions from the shared variable

def subtractor(variable):

    for _ in range(1000):

        # acquire the lock on the variable

        with variable.get_lock():

            # decrement the variable

            variable.value -= 1

if __name__ == '__main__':

    # create a shared ctype integer

    variable = Value('i', 0)

    # start a thread making additions

    adder_thread = Process(target=adder, args=(variable,))

    adder_thread.start()

    # start a thread making subtractions

    subtractor_thread = Process(target=subtractor, args=(variable,))

    subtractor_thread.start()

    # wait for both processes to finish

    print('Waiting for processes to finish...')

    adder_thread.join()

    subtractor_thread.join()

    # acquire the lock on the variable

    with variable.get_lock():

        # report the value

        print(f'Value: {variable.value}')

Running the example first creates the shared ctype and initializes it to zero.

The adder process is then created and passed the ctype variable, then started. Similarly. the subtractor child process is created and also passed the same ctype variable and started.

The main process then waits for both processes to complete.

Each process loops 1,000 times. Each iteration, the tasks first acquire the internal mutex lock for the shared ctype variable. Only once the lock is acquired will the task modify the value within the shared ctype.

Both child processes are completed and the final value of the shared ctype is reported.

In this case, the final value is always zero.

The race condition has been fixed.

Waiting for processes to finish…

Value: 0

Next, let’s look at sharing a file from multiple processes.

Multiprocessing Cannot Race with a Shared File

We can explore the case of multiple child processes writing to the same file at the same time.

When using threads, we can open a handle to the file, share the handle directly with many threads and have them all write to the file at the same time. This leads to a race condition as the threads complete and overwrites each other’s data, leading to data corruption and loss.

For an example of this type of race condition with threads, see the tutorial:

This type of race condition is challenging to recreate using process-based concurrency.

Firstly, we cannot open a file handle and share it directly with child processes.

Doing so results in an error because the file handle cannot be pickled (serialized) and therefore cannot be sent to a child process as an argument.

An alternate approach is to have each child process open the same file and attempt to write many lines to the file at the same time.

The example below explores this case.

We can define a task that takes the filename and loops 10,000 times. Each iteration, the file is opened and one line of text is written.

We can then perform this task in a large number of child processes, 50 in this case, kept intentionally small as some operating systems like Windows limit the number of child processes that can be created.

With 50 processes each writing 10K lines, we expect the file to have 500,000 lines of text.

The program waits for all child processes to finish before loading the file and reporting the total number of lines.

If there was a race condition leading to corruption and loss, we would expect fewer than 500,000 lines in the final file.

The complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

# SuperFastPython.com

# example of attempted race condition with shared file

from random import random

from time import sleep

from multiprocessing import Process

import os

# task for writing data to a file

def task(filename, arg):

    # write data to the file

    for i in range(10000):

        # open the file

        with open(filename, 'a') as file:

            # write one line

            print(f'Data line {i} from task {arg}.', file=file)

# protect the entry point

if __name__ == '__main__':

    # shared filename

    filename = 'tmp.txt'

    # delete the file if it exists

    try:

        os.remove(filename)

    except:

        pass

    # start some processes to write to the same file

    processes = [Process(target=task, args=(filename,i)) for i in range(50)]

    # start the processes

    for process in processes:

        process.start()

    # wait for the processes to complete

    for process in processes:

        process.join()

    # read the file and report results

    with open(filename, 'r') as file:

        # read entire file

        data = file.read()

        # split into lines

        data = data.strip().split('\n')

        # report total lines

        print(f'{filename} has {len(data)} lines')

Running the example first deletes the shared file if it exists (e.g. if the program has been run before).

Next, 50 child processes are created and started and the main process waits for them to complete.

Each process loops 10,000 times, each iteration, opening the file and writing one line.

The repeated opening and writing to the file is an attempt to force a race between child processes.

Once all tasks are complete the main process loads the text file and reports the total number of lines.

In this case, we can see that all 500,000 lines were written without incident.

No race condition is evident.

It seems writing to the same file from multiple processes is safe on many modern operating systems, such as Linux and macOS. Not sure about windows, sorry.

We cannot easily force a race condition in this situation.

We can see some discussion of this issue on stack overflow, here:

Nevertheless, I would not rely on this finding in a production system.

I would strongly recommend creating a lock and sharing it among the child processes and treat operating on the file as a critical section.

For example:

# task for writing data to a file

def task(filename, arg, lock):

    # write data to the file

    for i in range(10000):

        # acquire the lock

        with lock:

            # open the file

            with open(filename, 'a') as file:

                # write one line

                print(f'Data line {i} from task {arg}.', file=file)

Maybe this adds too much lock overhead in this specific case and the task() function should be restricted so that the lock is acquired more efficiently.

Next, let’s explore a race condition between child processes due to timing.

Multiprocessing Race Condition with Timing

We can explore an example of a race condition between processes due to timing.

We can then explore how to fix the race condition, ensuring it does not happen.

Example of Multiprocessing Race Condition

In this example, we will create a multiprocessing.Condition, then start a new child process that waits on the condition to be notified and the main process that notifies the child process.

If you are new to condition variables, see the tutorial:

To force the race condition, we will add a delay between the new process starting and waiting on the condition. This will cause the new process to always miss the notification from the main process and wait forever, requiring the program to be killed manually rather than terminating normally.

Firstly, we can define a function named task() to be executed by a new process.

The function will first sleep for a fraction of a second to force the timing race condition, then acquire the condition and wait to be notified.

The complete function is listed below.

# process waiting to be notified

def task(condition):

    # insert a delay

    sleep(1)

    # wait to be notified

    print('Process: Waiting to be notified...', flush=True)

    with condition:

        condition.wait()

    print('Process: Notified', flush=True)

Next, in the main process we can create the shared multiprocessing.Condition object.

...

# create the shared condition variable

condition = Condition()

Next, we can create and start a new multiprocessing.Process configured to execute our task() function and pass in the condition as an argument to the function.

...

# create the new process

process = Process(target=task, args=(condition,))

# start the new process

process.start()

Finally, the main process can acquire the multiprocessing.Condition and notify the child process.

...

# allow the new process to start up, but not start waiting

sleep(0.5)

# notify the new process

print('Main: Notifying the process')

with condition:

    condition.notify()

# wait for the task to complete

process.join()

print('Main: Done')

Tying this together, the complete example of a race condition based on timing is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

# SuperFastPython.com

# example of a race condition with timing between two processes

from time import sleep

from multiprocessing import Process

from multiprocessing import Condition

# process waiting to be notified

def task(condition):

    # insert a delay

    sleep(1)

    # wait to be notified

    print('Process: Waiting to be notified...', flush=True)

    with condition:

        condition.wait()

    print('Process: Notified', flush=True)

# protect the entry point

if __name__ == '__main__':

    # create the shared condition variable

    condition = Condition()

    # create the new process

    process = Process(target=task, args=(condition,))

    # start the new process

    process.start()

    # allow the new process to start up, but not start waiting

    sleep(0.5)

    # notify the new process

    print('Main: Notifying the process')

    with condition:

        condition.notify()

    # wait for the task to complete

    process.join()

    print('Main: Done')

Running the example first creates and starts the child process.

The child process starts running and then sleeps for one second.

Meanwhile, the main process acquires the condition and calls notify().

The child process wakes up, acquires the condition, and waits to be notified. Because the notification has already been sent, it is missed and the child process waits forever.

This demonstrates a race condition based on timing.

We might also say that the program is deadlocked and unable to progress.

Main: Notifying the process

Process: Waiting to be notified...

Next, let’s look at how we might fix this race condition using an event.

Example of Fixing a Multiprocessing Race Condition

A race condition based on timing seen in the previous section can be fixed by allowing the notify process to wait for the waiting process to be ready before doing its work and calling notify().

One way to achieve this is by using a multiprocessing.Event, which is a process-safe boolean flag variable.

If you are new to events, see the tutorial:

The shared multiprocessing.Event can be passed to the task() function as an argument and then set by the child process while holding the multiprocessing.Condition, right before waiting on the condition.

It is important in this change that the event is set while the condition is held as it blocks the main process from acquiring the condition and calling notify until the child process releases the condition when calling wait().

The updated version of the task() function with this change is listed below.

# process waiting to be notified

def task(condition, event):

    # insert a delay

    sleep(1)

    # wait to be notified

    print('Process: Waiting to be notified...', flush=True)

    with condition:

        # report signal that we are ready

        print('Process: Ready', flush=True)

        event.set()

        # wait to be notified

        condition.wait()

    print('Process: Notified', flush=True)

The main process can then create the shared multiprocessing.Event object.

...

# create the shared event

event = Event()

It can then be passed to a child process.

...

# create the new process

process = Process(target=task, args=(condition,event))

# start the new process

process.start()

Finally, the main process can wait for the multiprocessing.Event to be set before progressing.

This can be achieved by calling the wait() method on the event.

...

# wait for the task to signal that it's ready

event.wait()

Tying this together, the complete example of fixing a race condition based on timing is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

# SuperFastPython.com

# example of fixed race condition due to timing between two processes

from time import sleep

from multiprocessing import Process

from multiprocessing import Condition

from multiprocessing import Event

# process waiting to be notified

def task(condition, event):

    # insert a delay

    sleep(1)

    # wait to be notified

    print('Process: Waiting to be notified...', flush=True)

    with condition:

        # report signal that we are ready

        print('Process: Ready', flush=True)

        event.set()

        # wait to be notified

        condition.wait()

    print('Process: Notified', flush=True)

# protect the entry point

if __name__ == '__main__':

    # create the shared condition variable

    condition = Condition()

    # create the shared event

    event = Event()

    # create the new process

    process = Process(target=task, args=(condition,event))

    # start the new process

    process.start()

    # wait for the task to signal that it's ready

    event.wait()

    # notify the new process

    print('Main: Notifying the process')

    with condition:

        condition.notify()

    # wait for the task to complete

    process.join()

    print('Main: Done')

Running the example first creates the shared condition and event.

The child process is then created and started, which immediately blocks for a moment.

Meanwhile, the main process waits to be notified that the child process is ready.

The child process wakes up, acquires the condition, sets the event, and then waits on the condition.

The main process notices that the event has been set, then acquires the condition and notifies the child process.

The program works as expected and the race condition no longer exists.

Process: Waiting to be notified...

Process: Ready

Main: Notifying the process

Process: Notified

Main: Done

If you are aware of other race conditions with multiprocessing, please let me know in the comments below.

Further Reading

This section provides additional resources that you may find helpful.

Python Multiprocessing Books

I would also recommend specific chapters in the books:

Guides

APIs

References

    Takeaways

    You now know how to identify and fix race conditions with processes in Python.

    Do you have any questions?
    Ask your questions in the comments below and I will do my best to answer.

    Photo by Cali Naughton on Unsplash