Multiprocessing Pool.apply() in Python - Super Fast Python

You can call Pool.apply() to issue tasks to the process pool and block the caller until the task is complete.

In this tutorial you will discover how to issue one-off tasks to the process pool in Python.

Let’s get started.

Need to Issue Tasks To The Process Pool

The multiprocessing.pool.Pool in Python provides a pool of reusable processes for executing ad hoc tasks.

A process pool can be configured when it is created, which will prepare the child workers.

A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.

multiprocessing — Process-based parallelism

The process pool allows you to issue tasks in the form of target functions to be executed by the worker processes.

How can we issue one-off tasks to the process pool?

How Use Pool.apply()

We can issue one-off tasks to the process pool using the apply() function.

The apply() function takes the name of the function to execute by a worker process.

For example:

...

# issue a task to the process pool

pool.apply(task)

The call will block until the function is executed by a worker process, after which time it will return.

Call func with arguments args and keyword arguments kwds. It blocks until the result is ready.

multiprocessing — Process-based parallelism

If the function to be executed takes arguments they can be specified as a tuple to the “args” argument or a dictionary via the “kwds” argument.

For example:

...

# issue a task to the process pool with arguments

pool.apply(task, args=(arg1, arg2, arg3))

Difference Between apply() vs apply_async()

How does the apply() function compare to the apply_async() function for issuing tasks?

Both the apply() and apply_async() may be used to issue one-off tasks to the process pool.

The following summarizes the key differences between these two functions:

  • The apply() function blocks, whereas the apply_async() function does not block.
  • The apply() function returns the result of the target function, whereas the apply_async() function returns an AsyncResult.
  • The apply() function does not take callbacks, whereas the apply_async() function does take callbacks.

The apply() function should be used for issuing target task functions to the process pool where the caller can or must block until the task is complete.

The apply_async() function should be used for issuing target task functions to the process pool where the caller cannot or must not block while the task is executing.

Now that we know how to issue one-off tasks to the process pool, let’s look at some worked examples.

Example Pool.apply()

The apply() function can be called directly to execute a target function in the process pool.

The call will block until the function is executed by a worker process.

The example below demonstrates this by defining a task that reports a message and blocks for one second.

The task() function implements this.

# task executed in a worker process

def task():

    # report a message

    print(f'Task executing', flush=True)

    # block for a moment

    sleep(1)

    # report a message

    print(f'Task done', flush=True)

We can then create and configure a process pool with the default configuration.

...

# create and configure the process pool

pool = Pool()

Next, we can issue the task() function to the process pool and block until it is executed.

...

# issue tasks to the process pool

pool.apply(task)

Finally, we can close the process pool and release the resources.

...

# close the process pool

pool.close()

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

# SuperFastPython.com

# example of issuing a task with apply() to the process pool

from time import sleep

from multiprocessing.pool import Pool

# task executed in a worker process

def task():

    # report a message

    print(f'Task executing', flush=True)

    # block for a moment

    sleep(1)

    # report a message

    print(f'Task done', flush=True)

# protect the entry point

if __name__ == '__main__':

    # create and configure the process pool

    pool = Pool()

    # issue tasks to the process pool

    pool.apply(task)

    # close the process pool

    pool.close()

Running the example first creates and configures the process pool.

Next, the task() function is issued to the process pool. The main process blocks until the task is executed.

A worker process executes the task() function, reporting messages and sleeping for a second. The task is finished and returns.

The main process continues on and closes the process pool.

Next, let’s look at an example of issuing a task function with arguments


Free Python Multiprocessing Pool Course

Download your FREE Process Pool PDF cheat sheet and get BONUS access to my free 7-day crash course on the Process Pool API.

Discover how to use the Multiprocessing Pool including how to configure the number of workers and how to execute tasks asynchronously.

Learn more
 


Example of Pool.apply() With Arguments

We can call apply() to issue a task to the process pool that takes arguments.

This can be achieved by passing a tuple of arguments to the “args” argument or a dictionary of arguments to the “kwds” argument.

In this example, we can update the previous examples so that our task() function takes one argument that is then reported in printed messages.

The updated task() function with this change is listed below.

# task executed in a worker process

def task(data):

    # report a message

    print(f'Task executing: {data}', flush=True)

    # block for a moment

    sleep(1)

    # report a message

    print(f'Task done: {data}', flush=True)

We can then update the call to apply() to issue the task() function and specify a tuple containing one argument.

...

# issue tasks to the process pool

pool.apply(task, args=('Hello World',))

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

# SuperFastPython.com

# example of issuing a task with apply() with arguments to the process pool

from time import sleep

from multiprocessing.pool import Pool

# task executed in a worker process

def task(data):

    # report a message

    print(f'Task executing: {data}', flush=True)

    # block for a moment

    sleep(1)

    # report a message

    print(f'Task done: {data}', flush=True)

# protect the entry point

if __name__ == '__main__':

    # create and configure the process pool

    pool = Pool()

    # issue tasks to the process pool

    pool.apply(task, args=('Hello World',))

    # close the process pool

    pool.close()

Running the example first creates and configures the process pool.

Next, the task() function is issued to the process pool with an argument. The main process blocks until the task is executed.

A worker process executes the task() function, reporting messages with the provided argument and sleeping for a second. The task is finished and returns.

The main process continues on and closes the process pool.

Task executing: Hello World

Task done: Hello World

Next, let’s look at an example of issuing a task function that returns a value.

Example of Pool.apply() With a Return Value

We can call apply() to issue a target task function that returns a value.

This can be achieved by specifying the function that returns a value as an argument to apply(), and the function will be executed by the process pool, returning the value directly once completed.

In this example, we can update the task() function from the previous example to generate a random value between 0 and 1, report this value, then return it to the calling process.

The updated task() function is listed below.

# task executed in a worker process

def task():

    # generate a value

    value = random()

    # report a message

    print(f'Task executing: {value}', flush=True)

    # block for a moment

    sleep(1)

    # report a message

    print(f'Task done {value}', flush=True)

    return value

We can then call the apply() function from the main process and assign the return value to a variable, then report the value that was returned.

...

# issue tasks to the process pool

result = pool.apply(task)

# report value

print(f'Main got: {result}')

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

# SuperFastPython.com

# example of issuing a task with apply() to the process pool with a return value

from random import random

from time import sleep

from multiprocessing.pool import Pool

# task executed in a worker process

def task():

    # generate a value

    value = random()

    # report a message

    print(f'Task executing: {value}', flush=True)

    # block for a moment

    sleep(1)

    # report a message

    print(f'Task done {value}', flush=True)

    return value

# protect the entry point

if __name__ == '__main__':

    # create and configure the process pool

    pool = Pool()

    # issue tasks to the process pool

    result = pool.apply(task)

    # report value

    print(f'Main got: {result}')

    # close the process pool

    pool.close()

Running the example first creates and configures the process pool.

Next, the task() function is issued to the process pool. The main process blocks until the task is executed.

A worker process executes the task() function, generating a random value, reporting messages and sleeping for a second. The task is finished and returns the value that was generated.

The main process continues on, reports the same return value and then closes the process pool.

Task executing: 0.4150468503576358

Task done 0.4150468503576358

Main got: 0.4150468503576358

Next, let’s look at an example of issuing a target task function that may raise an exception.

Example of Pool.apply() With an Exception


Python Multiprocessing Pool Jump-Start

Loving The Tutorials?

Why not take the next step? Get the book.

Learn more
 


We can call apply() to issue a task to the process pool that may raise an exception that is not handled.

The process pool will trap the exception for us, then re-raise the exception in the calling process that issued the task.

In this example, we can update the example above so that the task() function will raise an exception that is left unhandled.

The updated task() function with this change is listed below.

# task executed in a worker process

def task():

    # report a message

    print(f'Task executing', flush=True)

    # block for a moment

    sleep(1)

    # fail

    raise Exception('Something bad happened')

    # report a message

    print(f'Task done', flush=True)

We can then issue the task() function to the process pool, but wrap the call in a try-except block, to handle the possible exception.

...

# issue tasks to the process pool

try:

    pool.apply(task)

except Exception as e:

    print(f'Failed with: {e}')

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

# SuperFastPython.com

# example of issuing a task with apply() to the process pool that raises an exception

from time import sleep

from multiprocessing.pool import Pool

# task executed in a worker process

def task():

    # report a message

    print(f'Task executing', flush=True)

    # block for a moment

    sleep(1)

    # fail

    raise Exception('Something bad happened')

    # report a message

    print(f'Task done', flush=True)

# protect the entry point

if __name__ == '__main__':

    # create and configure the process pool

    pool = Pool()

    # issue tasks to the process pool

    try:

        pool.apply(task)

    except Exception as e:

        print(f'Failed with: {e}')

    # close the process pool

    pool.close()

Running the example first creates and configures the process pool.

Next, the task() function is issued to the process pool. The main process blocks until the task is executed.

A worker process executes the task() function, reporting messages and sleeping for a second. The task then raises an exception.

The process pool traps the exception and re-raises it in the calling process.

The main process continues on and handles the re-raised exception. It then closes the process pool.

Task executing

Failed with: Something bad happened

Further Reading

This section provides additional resources that you may find helpful.

Books

I would also recommend specific chapters from these books:

Guides

APIs

References

Takeaways

You now know how to issue one-off tasks to the process pool using Pool.apply()

Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.

Photo by Tim Martin on Unsplash