Multiprocessing Pipe in Python - Super Fast Python

You can use a pipe between processes by multiprocessing.Pipe class.

In this tutorial you will discover how to use a multiprocessing pipe in Python.

Let’s get started.

Need for a Pipe

A process is a running instance of a computer program.

Every Python program is executed in a Process, which is a new instance of the Python interpreter. This process has the name MainProcess and has one thread used to execute the program instructions called the MainThread. Both processes and threads are created and managed by the underlying operating system.

Sometimes we may need to create new child processes in our program in order to execute code concurrently.

Python provides the ability to create and manage new processes via the multiprocessing.Process class.

In multiprocessing programming, we often need to share data between processes.

One approach to sharing data is to use a pipe.

What is the pipe and how can we use it in Python?

What is a Pipe

In multiprocessing, a pipe is a connection between two processes in Python.

It is used to send data from one process which is received by another process.

Under the covers, a pipe is implemented using a pair of connection objects, provided by the multiprocessing.connection.Connection class.

Creating a pipe will create two connection objects, one for sending data and one for receiving data. A pipe can also be configured to be duplex so that each connection object can both send and receive data.

Pipe vs Queue

Both a multiprocessing.Pipe and a multiprocessing.Queue can be used to send and receive objects and data between processes.

A Pipe is simpler than a queue. It is a lower-level mechanism, requiring first the explicit creation of the connections between a pair of processes, then the explicit sending and receiving of data between processes.

A Queue is a high-level constructor that can be treated like a local data structure that just so happens to be shared among processes.

Importantly, a Queue is designed to be used with multiple producers and multiple consumers in mind, whereas a Pipe is intended for a pair of processes only.

The targeted and simpler nature of pipes can make them more efficient and potentially faster in sharing data between two processes.

In summary:

  • Both pipe and queue can be used to shared data between processes
  • Pipe is simple and low-level, queue is more capable and higher-level
  • Pipe is between two processes, and the queue has multiple producers and consumers.

How to Use the Pipe

Python provides a simple queue in the multiprocessing.Pipe class.

Let’s take a closer look at how to use the pipe class.

Create a Pipe

A pipe can be created by calling the constructor of the multiprocessing.Pipe class, which returns two multiprocessing.connection.Connection objects.

For example:

...

# create a pipe

conn1, conn2 = multiprocessing.Pipe()

By default, the first connection (conn1) can only be used to receive data, whereas the second connection (conn2) can only be used to send data.

The connection objects can be made duplex or bidirectional.

This can be achieved by setting the “duplex” argument to the constructor to True.

For example:

...

# create a duplex pipe

conn1, conn2 = multiprocessing.Pipe(duplex=True)

In this case, both connections can be used to send and receive data.

Share Objects With Pipe

Objects can be shared between processes using the Pipe.

The Connection.send() function can be used to send objects from one process to another.

The objects sent must be picklable.

For example:

...

# send an object

conn2.send('Hello world')

The Connection.recv() function can be used to receive objects in one process sent by another.

The objects received will be automatically un-pickled.

For example:

...

# receive an object

object = conn1.recv()

The function call will block until an object is received.

Share Bytes With Pipe

Data can be shared between processes using the Pipe.

This can be achieved by sending and receiving data in the form of packages of bytes.

Bytes can be sent from one process to another via the Connection.send_bytes() function.

For example:

...

# send bytes

conn2.send(b'Hello world')

If byte data is held in a buffer data structure, then an “offset” and “size” arguments can be specified when sending bytes.

For example:

...

# send bytes

conn2.send(buffer, offset=10, size=100)

Bytes can be received via the Connection.recv_bytes() function.

For example:

...

# receive bytes

data = conn1.recv_bytes()

The function call will block until there are bytes to receive.

A single message of bytes will be read.

A maximum length of bytes can be specified via the “maxlength” argument.

For example:

...

# receive bytes

data = conn1.recv_bytes(maxlength=100)

Byte data can also be received into an existing byte buffer, with an offset.

This can be achieved via the Connection.recv_bytes_into() function with an optional “offset” argument.

For example:

...

# receive bytes

data = conn1.recv_bytes_into(buffer, offset=100)

Status of Pipe

The status of the pipe can be checked via the Connection.poll() function.

This will return a boolean as to whether three is data to be received and read from the pipe.

For example:

...

# check if there is data to receive

if conn1.poll():

# ...

A timeout can be set via the “timeout” argument. If specified, the call will block until data is available. If no data is available before the timeout number of seconds has elapsed, then the function will return.

For example:

...

# check if there is data to receive

if conn1.poll(timeout=5):

# ...

Now that we know how to use the multiprocessing.Pipe, let’s look at some worked examples.


Free Python Multiprocessing Course

Download your FREE multiprocessing PDF cheat sheet and get BONUS access to my free 7-day crash course on the multiprocessing API.

Discover how to use the Python multiprocessing module including how to create and start child processes and how to use a mutex locks and semaphores.

Learn more
 


Example of Using a Pipe

We can explore how to use a multiprocessing.Pipe to share data between processes.

In this example we will create a sender process that will generate random numbers and send them to another process via the pipe. We will also create a receiver process that will receive numbers sent from the other process and report them.

We can first define the sender process.

We can define a new function named sender() that takes a connection as an argument on which it will send objects. It then loops ten times and each iteration it will generate a random number between 0 and 1 via random.random(), block for a fraction of a second to simulate work, then send the value to the other process via the pipe.

Once done, the sender will send a special value, called a sentinel value to indicate that no more values will be sent. In this case we will use the value “None“.

The sender() function below implements this.

# generate work

def sender(connection):

    print('Sender: Running', flush=True)

    # generate work

    for i in range(10):

        # generate a value

        value = random()

        # block

        sleep(value)

        # send data

        connection.send(value)

    # all done

    connection.send(None)

    print('Sender: Done', flush=True)

Next, we can define the receiver process.

We can define a new receiver() function that takes a connection on which to receive objects.

The function will loop forever. Each iteration, it will receive an object on the pipe and block until an object is received. It will then report the value. If the received value was the special sentinel value, it will break the loop and the process will terminate.

The receive() function below implements this.

# consume work

def receiver(connection):

    print('Receiver: Running', flush=True)

    # consume work

    while True:

        # get a unit of work

        item = connection.recv()

        # report

        print(f'>receiver got {item}', flush=True)

        # check for stop

        if item is None:

            break

    # all done

    print('Receiver: Done', flush=True)

Finally, the main process will create the process and wait for them to finish.

First, a new pipe is created that will be used to send and receive objects between the processes.

...

# create the pipe

conn1, conn2 = Pipe()

Next, we can create a child process that will execute the sender() function and take the conn2 that can only send data along the pipe. Once created and configured the child process is started.

...

# start the sender

sender_process = Process(target=sender, args=(conn2,))

sender_process.start()

We can then create another child process that will execute the receiver() function and take the conn1 that can only receive data via the pipe. This process can also then be started.

...

# start the receiver

receiver_process = Process(target=receiver, args=(conn1,))

receiver_process.start()

The main process can then block until both child processes have finished.

...

# wait for all processes to finish

sender_process.join()

receiver_process.join()

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

# SuperFastPython.com

# example of using a pipe between processes

from time import sleep

from random import random

from multiprocessing import Process

from multiprocessing import Pipe

# generate work

def sender(connection):

    print('Sender: Running', flush=True)

    # generate work

    for i in range(10):

        # generate a value

        value = random()

        # block

        sleep(value)

        # send data

        connection.send(value)

    # all done

    connection.send(None)

    print('Sender: Done', flush=True)

# consume work

def receiver(connection):

    print('Receiver: Running', flush=True)

    # consume work

    while True:

        # get a unit of work

        item = connection.recv()

        # report

        print(f'>receiver got {item}', flush=True)

        # check for stop

        if item is None:

            break

    # all done

    print('Receiver: Done', flush=True)

# entry point

if __name__ == '__main__':

    # create the pipe

    conn1, conn2 = Pipe()

    # start the sender

    sender_process = Process(target=sender, args=(conn2,))

    sender_process.start()

    # start the receiver

    receiver_process = Process(target=receiver, args=(conn1,))

    receiver_process.start()

    # wait for all processes to finish

    sender_process.join()

    receiver_process.join()

Running the example first creates the pipe, then creates and starts both child processes.

The main process then blocks until the child processes finish.

The sender child process then runs in a loop, generating and sending ten random values along the pipe. Once all values are generated and sent, the sender process terminates.

The child process loops, receiving objects from the pipe each iteration. It blocks until an object appears each iteration. Received values are reported, and the loop is broken once the sentinel value is received.

Note, your specific results will differ given the use of random numbers.

Sender: Running

Receiver: Running

>receiver got 0.7672216614763814

>receiver got 0.6766375162201561

>receiver got 0.3958018282651511

>receiver got 0.0019046615500002417

>receiver got 0.4850692187219524

>receiver got 0.6249563547834307

>receiver got 0.44768176602669507

>receiver got 0.852944978432572

>receiver got 0.26856894937968356

Sender: Done

>receiver got 0.7724406632512203

>receiver got None

Receiver: Done

This highlights how to use a default pipe for sending data from one process to another.

Next, let’s look at a duplex or bidirectional pipe between two processes.

Example of Using a Duplex Pipe

A multiprocessing.Pipe can be used to both send and receive data between two processes.

This is called a duplex or bidirectional pipe and can be achieved by setting the “duplex” argument to True when creating a pipe.

In this example, we will play ping pong between two processes, player1 and player2. Player1 will start the game by generating a random value between 0 and 1 and send it to player2. Player2 will receive the value, add a new random value to the received value and send it back to player1. Player1 will receive the value and perform the same action of adding a random value to the received value and sending it back.

This process is repeated until a value above 10 is received, after which both player processes will terminate.

First, we can define a function that takes a connection and a value as arguments, adds a random value to the value and sends it along the connection. The value passed in as an argument will be the value received along the pipe.

This function can be used by both players in the ping pong game, and can be used by player1 when starting the game.

The generate_send() function listed below implements this.

# generate and send a value

def generate_send(connection, value):

    # generate value

    new_value = random()

    # block

    sleep(new_value)

    # update value

    value = value + new_value

    # report

    print(f'>sending {value}', flush=True)

    # send value

    connection.send(value)

Next, we can define a function that encapsulates the game.

The function takes a connection object that can both send and receive, and a boolean value as to whether it should start the game or not.

# ping pong between processes

def pingpong(connection, send_first):

# ...

The function will then check if it should start the game, and if so, it will call our generate_send() function defined above with a value of zero.

...

# check if this process should seed the process

if send_first:

    generate_send(connection, 0)

The function will loop forever. Each iteration it will receive an object along the pipe and block until an object is received. It will then report the value, send it back with a call to generate_send(), then stop the game if the value is above the threshold, 10 in this case.

...

# run until limit reached

while True:

    # read a value

    value = connection.recv()

    # report

    print(f'>received {value}', flush=True)

    # send the value back

    generate_send(connection, value)

    # check for stop

    if value > 10:

        break

Tying this together, the complete pingpong() function is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

# ping pong between processes

def pingpong(connection, send_first):

    print('Process Running', flush=True)

    # check if this process should seed the process

    if send_first:

        generate_send(connection, 0)

    # run until limit reached

    while True:

        # read a value

        value = connection.recv()

        # report

        print(f'>received {value}', flush=True)

        # send the value back

        generate_send(connection, value)

        # check for stop

        if value > 10:

            break

    print('Process Done', flush=True)

Finally, the main process will create the pipe, and create and start the two players of the ping pong game.

First, the pipe can be created in duplex mode so that each player can both send and receive along the pipe.

...

# create the pipe

conn1, conn2 = Pipe(duplex=True)

Next, the two players can be created. Each player must use a separate connection, and only one player can start the game.

...

# create players

player1 = Process(target=pingpong, args=(conn1,True))

player2 = Process(target=pingpong, args=(conn2,False))

Both player child processes can then be started and the main process will block until the game is finished.

...

# start players

player1.start()

player2.start()

# wait for players to finish

player1.join()

player2.join()

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

# SuperFastPython.com

# example of using a duplex pipe between processes

from time import sleep

from random import random

from multiprocessing import Process

from multiprocessing import Pipe

# generate and send a value

def generate_send(connection, value):

    # generate value

    new_value = random()

    # block

    sleep(new_value)

    # update value

    value = value + new_value

    # report

    print(f'>sending {value}', flush=True)

    # send value

    connection.send(value)

# ping pong between processes

def pingpong(connection, send_first):

    print('Process Running', flush=True)

    # check if this process should seed the process

    if send_first:

        generate_send(connection, 0)

    # run until limit reached

    while True:

        # read a value

        value = connection.recv()

        # report

        print(f'>received {value}', flush=True)

        # send the value back

        generate_send(connection, value)

        # check for stop

        if value > 10:

            break

    print('Process Done', flush=True)

# entry point

if __name__ == '__main__':

    # create the pipe

    conn1, conn2 = Pipe(duplex=True)

    # create players

    player1 = Process(target=pingpong, args=(conn1,True))

    player2 = Process(target=pingpong, args=(conn2,False))

    # start players

    player1.start()

    player2.start()

    # wait for players to finish

    player1.join()

    player2.join()

Running the example first creates the duplex pipe, then creates and starts the two child processes.

The main process then blocks and waits for both child processes to terminate.

The first player starts the game, generating an initial random value and sending it to player2.

Player2 receives the value, reports it, adds a random value to it and then sends it back to player1.

This process of receiving and sending an accumulating floating point value between the two processes continues until the threshold of 10 is reached.

Both processes detect the value exceeding the threshold and break their loop, ending the game.

This highlights how to use a duplex pipe between two processes.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

Process Running

Process Running

>sending 0.512990902456799

>received 0.512990902456799

>sending 0.6720034113827182

>received 0.6720034113827182

>sending 1.121709843352519

>received 1.121709843352519

>sending 2.05592427147109

>received 2.05592427147109

>sending 2.8145974836316694

>received 2.8145974836316694

>sending 3.5275895108277027

>received 3.5275895108277027

>sending 3.901572782141232

>received 3.901572782141232

>sending 4.165258844191272

>received 4.165258844191272

>sending 4.986646861424761

>received 4.986646861424761

>sending 5.170546681258859

>received 5.170546681258859

>sending 5.591074632717518

>received 5.591074632717518

>sending 6.540571062561306

>received 6.540571062561306

>sending 7.345922557811228

>received 7.345922557811228

>sending 7.79464404807978

>received 7.79464404807978

>sending 7.880037300505228

>received 7.880037300505228

>sending 8.237138395881377

>received 8.237138395881377

>sending 8.971813669795386

>received 8.971813669795386

>sending 9.1417714866508

>received 9.1417714866508

>sending 9.750216361609873

>received 9.750216361609873

>sending 9.759058549107657

>received 9.759058549107657

>sending 10.51892536850362

>received 10.51892536850362

>sending 10.554211869042057

Process Done

>received 10.554211869042057

>sending 11.04291146739027

Process Done


Python Multiprocessing Jump-Start

Loving The Tutorials?

Why not take the next step? Get the book.

Learn more
 


Further Reading

This section provides additional resources that you may find helpful.

Python Multiprocessing Books

I would also recommend specific chapters in the books:

Guides

APIs

References

    Takeaways

    You now know how to use a pipe between processes in Python.

    Do you have any questions?
    Ask your questions in the comments below and I will do my best to answer.

    Photo by Ivan Ragozin on Unsplash