How to Use ThreadPool apply_async() in Python - Super Fast Python

Last Updated on October 29, 2022

You can call the apply_async() method to issue asynchronous tasks to the ThreadPool.

In this tutorial you will discover how to issue one-off asynchronous tasks to the ThreadPool in Python.

Let’s get started.

Need to Issue Tasks To The ThreadPool

The multiprocessing.pool.ThreadPool in Python provides a pool of reusable threads for executing ad hoc tasks.

A thread pool object which controls a pool of worker threads to which jobs can be submitted.

multiprocessing — Process-based parallelism

The ThreadPool class extends the Pool class. The Pool class provides a pool of worker processes for process-based concurrency.

Although the ThreadPool class is in the multiprocessing module it offers thread-based concurrency and is best suited to IO-bound tasks, such as reading or writing from sockets or files.

A ThreadPool can be configured when it is created, which will prepare the new threads.

We can issue one-off tasks to the ThreadPool using methods such as apply() or we can apply the same function to an iterable of items using functions such as map().

Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as apply_async() and map_async().

The ThreadPool allows you to issue tasks in the form of target functions to be executed by the worker threads.

How can we issue one-off asynchronous tasks to the ThreadPool?

We can issue one-off tasks to the ThreadPool using the apply_async() method.

Asynchronous means that the call to the ThreadPool does not block, calling the caller that issued the task to carry on.

The apply_async() method takes the name of the function to execute in a worker thread and returns immediately with a AsyncResult object for the task.

For example:

...

# issue a task asynchronously to the thread pool

result = pool.apply_async(task)

A variant of the apply() method which returns a AsyncResult object.

multiprocessing — Process-based parallelism

If the target function takes arguments they can be specified as a tuple to the “args” argument or as a dictionary to the “kwds” argument.

For example:

...

# issue a task asynchronously to the thread pool with arguments

result = pool.apply_async(task, args=(arg1, arg2, arg3))

A callback function can be called automatically if the task was successful, e.g. no error or exception.

The callback function must take one argument, the result of the target function.

If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it, that is unless the call failed …

multiprocessing — Process-based parallelism

The function is specified via the “callback” argument to the apply_async() function.

For example:

# callback function

def custom_callback(result):

print(f'Got result: {result}')

...

# issue a task asynchronously to the thread pool with a callback

result = pool.apply_async(task, callback=custom_callback)

Similarly, an error callback function can be specified via the “error_callback” argument that is called only when an unexpected error or exception is raised.

If error_callback is specified then it should be a callable which accepts a single argument. If the target function fails, then the error_callback is called with the exception instance.

multiprocessing — Process-based parallelism

The error callback function must take one argument, which is the instance of the error or exception that was raised.

For example:

# error callback function

def custom_error_callback(error):

print(f'Got error: {error}')

...

# issue a task asynchronously to the thread pool with an error callback

result = pool.apply_async(task, error_callback=custom_error_callback)

Difference Between apply_async vs apply()

How does the apply_async() method compare to the apply() for issuing tasks?

Both the apply_async() and apply() may be used to issue one-off tasks to the ThreadPool.

The following summarizes the key differences between these two methods:

  • The apply_async() method does not block, whereas the apply() method does block.
  • The apply_async() method returns an AsyncResult, whereas the apply() method returns the result of the target function.
  • The apply_async() method can execute callback functions when the task is complete, whereas the apply() method cannot execute callback functions.

The apply_async() method should be used for issuing target task functions to the ThreadPool where the caller cannot or must not block while the task is executing.

The apply() method should be used for issuing target task functions to the ThreadPool where the caller can or must block until the task is complete.

Now that we know how to issue one-off tasks to the ThreadPool, let’s look at some worked examples.


Free Python ThreadPool Course

Download your FREE ThreadPool PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPool API.

Discover how to use the ThreadPool including how to configure the number of worker threads and how to execute tasks asynchronously

Learn more
 


Example of Calling apply_async()

The apply_async() method can be called directly to execute a target function in the ThreadPool.

The call will not block, but will instead immediately return an AsyncResult object that we can ignore if our function does not return a value.

The example below demonstrates this by defining a task that reports a message and blocks for one second.

The task() function implements this.

# task executed in a worker thread

def task():

    # report a message

    print(f'Task executing')

    # block for a moment

    sleep(1)

    # report a message

    print(f'Task done')

We can then create and configure a ThreadPool with the default configuration.

...

# create and configure the thread pool

pool = ThreadPool()

Next, we can issue the task() function to the ThreadPool.

...

# issue tasks to the thread pool

pool.apply_async(task)

Finally, we can close the thread pool and release the resources and wait until all tasks in the pool are completed.

...

# close the thread pool

pool.close()

# wait for all tasks to finish

pool.join()

Note, if we did not call join() to wait for the tasks to complete, then it is possible that the main thread would exit and terminate the thread pool without completing the tasks.

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

# SuperFastPython.com

# example of issuing a task with apply_async() to the thread pool

from time import sleep

from multiprocessing.pool import ThreadPool

# task executed in a worker thread

def task():

    # report a message

    print(f'Task executing')

    # block for a moment

    sleep(1)

    # report a message

    print(f'Task done')

# protect the entry point

if __name__ == '__main__':

    # create and configure the thread pool

    pool = ThreadPool()

    # issue tasks to the thread pool

    pool.apply_async(task)

    # close the thread pool

    pool.close()

    # wait for all tasks to finish

    pool.join()

Running the example first creates and configures the ThreadPool.

Next, the task() function is issued to the ThreadPool.

The main thread then closes the ThreadPool and blocks until all tasks complete and all threads in the ThreadPool close.

A worker thread executes the task() function, reporting messages and sleeping for a second. The task is finished and returns.

The main thread continues on and the program exits.

Next, let’s look at an example of issuing a task to the ThreadPool that has arguments.

Example of Calling apply_async() with Arguments

We can call apply_async() to issue a task to the ThreadPool that takes arguments.

This can be achieved by passing a tuple of arguments to the “args” argument or a dictionary of arguments to the “kwds” argument.

In this example, we can update the previous examples so that our task() function takes one argument that is then reported in printed messages.

The updated task() function with this change is listed below.

# task executed in a worker thread

def task(message):

    # report a message

    print(f'Task executing: {message}')

    # block for a moment

    sleep(1)

    # report a message

    print(f'Task done: {message}')

We can then update the call to apply_async() to issue the task() function and specify a tuple containing one argument.

...

# issue tasks to the thread pool

pool.apply_async(task, args=('Hello world',))

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

# SuperFastPython.com

# example of issuing a task with apply_async() to the thread pool with arguments

from time import sleep

from multiprocessing.pool import ThreadPool

# task executed in a worker thread

def task(message):

    # report a message

    print(f'Task executing: {message}')

    # block for a moment

    sleep(1)

    # report a message

    print(f'Task done: {message}')

# protect the entry point

if __name__ == '__main__':

    # create and configure the thread pool

    pool = ThreadPool()

    # issue tasks to the thread pool

    pool.apply_async(task, args=('Hello world',))

    # close the thread pool

    pool.close()

    # wait for all tasks to finish

    pool.join()

Running the example first creates and configures the ThreadPool.

Next, the task() function is issued to the ThreadPool with an argument.

The main thread then closes the ThreadPool and blocks until all tasks complete and all threads in the process pool close.

A worker thread executes the task() function, reporting messages with the provided argument, then sleeping for a second. The task is finished and returns.

The main thread continues on and the program exits.

Task executing: Hello world

Task done: Hello world

Next, let’s look at an example of issuing a task to the pool and handling the result with a callback.


Python ThreadPool Jump-Start

Loving The Tutorials?

Why not take the next step? Get the book.

Learn more
 


Example of Calling apply_async() with a Callback Function

We can issue a task to the ThreadPool that returns a value and specify a callback function to handle the returned value.

This can be achieved via the “callback” argument.

In this example, we can update the above example so that the task() function generates a value and returns it. We can then define a function to handle the return value, in this case to simply report the value.

Firstly, we can define the function to call to handle the value returned from the function.

The return_callback() below implements this, taking one argument which is the value returned from the target task function.

# handle the return value callback

def return_callback(result):

    print(f'Callback received: {result}')

Next, we can update the task function to generate a random value between 0 and 1. It then reports the value, sleeps, then returns the value that was generated.

The updated task() function with these changes is listed below.

# task executed in a worker thread

def task():

    # generate a random value

    value = random()

    # report a message

    print(f'Task generated {value}')

    # block for a moment

    sleep(1)

    # report a message

    print(f'Task done with {value}')

    # return the generated value

    return value

Finally, we can update the call to apply_async() to specify the callback function via the “callback” argument, giving the name of our custom function.

...

# issue tasks to the thread pool

pool.apply_async(task, callback=return_callback)

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

# SuperFastPython.com

# example of issuing a task with apply_async() to the thread pool with a callback

from random import random

from time import sleep

from multiprocessing.pool import ThreadPool

# handle the return value callback

def return_callback(result):

    print(f'Callback received: {result}')

# task executed in a worker thread

def task():

    # generate a random value

    value = random()

    # report a message

    print(f'Task generated {value}')

    # block for a moment

    sleep(1)

    # report a message

    print(f'Task done with {value}')

    # return the generated value

    return value

# protect the entry point

if __name__ == '__main__':

    # create and configure the thread pool

    pool = ThreadPool()

    # issue tasks to the thread pool

    pool.apply_async(task, callback=return_callback)

    # close the thread pool

    pool.close()

    # wait for all tasks to finish

    pool.join()

Running the example first creates and configures the ThreadPool.

Next, the task() function is issued to the ThreadPool with the custom callback function.

The main thread then closes the ThreadPool and blocks until all tasks complete and all threads in the ThreadPool close.

A worker thread executes the task() function, reporting messages and sleeping for a second. The task is finished and returns.

The custom callback function is then called and passed the value returned from the target task function. The value is reported in a message by the callback function.

Finally, the main thread continues on and the program exits.

Task generated 0.6357624807921874

Task done with 0.6357624807921874

Callback received: 0.6357624807921874

Next, let’s look at an example of issuing a task to the pool with an error callback function.

Example of Calling apply_async() with an Error Callback Function

We can issue a task to the ThreadPool that may raise an unhandled exception and specify an error callback function to handle the exception.

This can be achieved via the “error_callback” argument.

In this example, we can update the above example so that the task() function raises an exception. We can then define a function to handle the raised example, in this case to simply report the details of the exception.

Firstly, we can define the function to call to handle an exception raised by the function.

The custom_error_callback() below implements this, taking one argument which is the exception raised by the target task function.

# handle any errors in the task function

def custom_error_callback(error):

    print(f'Got an Error: {error}')

Next, we can update the task() function so that it raises an exception that is not handled.

# task executed in a worker thread

def task():

    # report a message

    print(f'Task executing')

    # block for a moment

    sleep(1)

    # raise an exception

    raise Exception('Something bad happened')

    # report a message

    print(f'Task done')

Finally, we can update the call to apply_async() to specify the error callback function via the “custom_error_callback” argument, giving the name of our custom function.

...

# issue tasks to the thread pool

pool.apply_async(task, error_callback=custom_error_callback)

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

# SuperFastPython.com

# example of issuing a task with apply_async() to the thread pool with an error callback

from random import random

from time import sleep

from multiprocessing.pool import ThreadPool

# handle any errors in the task function

def custom_error_callback(error):

    print(f'Got an Error: {error}')

# task executed in a worker thread

def task():

    # report a message

    print(f'Task executing')

    # block for a moment

    sleep(1)

    # raise an exception

    raise Exception('Something bad happened')

    # report a message

    print(f'Task done')

# protect the entry point

if __name__ == '__main__':

    # create and configure the thread pool

    pool = ThreadPool()

    # issue tasks to the thread pool

    pool.apply_async(task, error_callback=custom_error_callback)

    # close the thread pool

    pool.close()

    # wait for all tasks to finish

    pool.join()

Running the example first creates and configures the ThreadPool.

Next, the task() function is issued to the ThreadPool with the custom callback function.

The main thread then closes the ThreadPool and blocks until all tasks complete and all threads in the ThreadPool close.

A worker thread executes the task() function, reporting messages and sleeping for a second. The task then raises an exception.

The exception is trapped by the ThreadPool. Then the custom error callback function is then called and passed the raised exception. The details of the raised exception are then reported by the callback function.

Finally, the main thread continues on and the program exits.

Task executing

Got an Error: Something bad happened

Next, let’s look at an example of waiting for an issued task to complete.

Example of Calling apply_async() and Wait For Task to Complete

The apply_async() function returns an AsyncResult object that provides a handle on the asynchronous task.

We can wait for the issued task to complete via the wait() function on the AsyncResult object.

In this example, we can update the above example so that the AsyncResult object is assigned when the task is issued, then the main thread waits for the task to complete explicitly.

...

# issue tasks to the thread pool

result = pool.apply_async(task)

# wait for the task to complete

result.wait()

The wait() function returns once the task is complete.

At this time, there are no further tasks in the ThreadPool. Therefore the main thread can close the pool and exit, without calling the join() function.

...

# close the thread pool

pool.close()

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

# SuperFastPython.com

# example of issuing a task with apply_async() to the thread pool and waiting for the task to complete

from time import sleep

from multiprocessing.pool import ThreadPool

# task executed in a worker thread

def task():

    # report a message

    print(f'Task executing')

    # block for a moment

    sleep(1)

    # report a message

    print(f'Task done')

# protect the entry point

if __name__ == '__main__':

    # create and configure the thread pool

    pool = ThreadPool()

    # issue tasks to the thread pool

    result = pool.apply_async(task)

    # wait for the task to complete

    result.wait()

    # close the thread pool

    pool.close()

Running the example first creates and configures the ThreadPool.

Next, the task() function is issued to the ThreadPool and the returned AsyncResult object is assigned.

The main thread then waits on the returned AsyncResult object for the issued task to complete.

A worker thread executes the task() function, reporting messages and sleeping for a second. The task is finished and returns.

The main thread then continues on and closes the ThreadPool.

Next, let’s look at an example where we wait for the result of an issued task.

Example of Calling apply_async() and Wait For Result

The AsyncResult object returned when issuing a task via the apply_async() provides a way to access the value returned from the target task function.

This can be achieved by calling the get() function that returns the value from the function issued to the ThreadPool.

In this example, we can update the target task() function to generate a random value between 0 and 1 and return the value. We can then assign the AsyncResult object returned from apply_async() and call get the result. This will block until the task is completed and the result is returned.

Firstly, we can update the task() function to generate a random value, report the value, then return it.

The updated task() function with these changes is listed below.

# task executed in a worker thread

def task():

    # generate a random value

    value = random()

    # report a message

    print(f'Task generated {value}')

    # block for a moment

    sleep(1)

    # report a message

    print(f'Task done with {value}')

    # return the generated value

    return value

Next, we can issue the task and assign the AsyncResult object returned.

...

# issue tasks to the thread pool

result = pool.apply_async(task)

We can then get the result from the AsyncResult object and report the value.

This will block until the task is complete and the result is returned.

...

# wait for the return value

value = result.get()

# report the return value

print(f'Got: {value}')

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

# SuperFastPython.com

# example of issuing a task with apply_async() to the thread pool and wait for the result

from random import random

from time import sleep

from multiprocessing.pool import ThreadPool

# task executed in a worker thread

def task():

    # generate a random value

    value = random()

    # report a message

    print(f'Task generated {value}')

    # block for a moment

    sleep(1)

    # report a message

    print(f'Task done with {value}')

    # return the generated value

    return value

# protect the entry point

if __name__ == '__main__':

    # create and configure the thread pool

    pool = ThreadPool()

    # issue tasks to the thread pool

    result = pool.apply_async(task)

    # wait for the return value

    value = result.get()

    # report the return value

    print(f'Got: {value}')

    # close the thread pool

    pool.close()

Running the example first creates and configures the ThreadPool.

Next, the task() function is issued to the ThreadPool and the returned AsyncResult object is assigned.

The main thread then gets the result from the AsyncResult object, blocking until the the issued task is finished.

A worker thread executes the task() function, generates a value, reporting messages and sleeping for a second. The task is finished and returns the generated value.

The main thread then receives the value, reports it then continues on and closes the ThreadPool.

Task generated 0.0378375302457159

Task done with 0.0378375302457159

Got: 0.0378375302457159

Next, let’s look at an example of getting a return value from a function that raised an exception.

Example of Calling apply_async() with Exception

We can get a return value from a target task function issued to the ThreadPool via the AsyncResult.get().

If the target task function raises an exception that was not handled, then the get() function will re-raise the same exception.

This means, we may need to explicitly handle exceptions that may be raised in the target task function when getting a returned value from an issued task.

In this example, we will update the above example so that the target task() function raises an unhandled exception. We will then get the return value from the target task function via the returned AsyncResult object and handle the possible exception with a try-except pattern.

Firstly, we can update the target task() function that returns a value to raise an exception.

# task executed in a worker thread

def task():

    # report a message

    print(f'Task executing')

    # block for a moment

    sleep(1)

    # raise an exception

    raise Exception('Something bad happened')

    # report a message

    print(f'Task done')

    # return a value

    return "DONE!"

We can then issue the task as before and assign the returned AsyncResult object.

...

# issue tasks to the thread pool

result = pool.apply_async(task)

Finally, we can attempt to get the result from the target task function via the AsyncResult.get() function, and handle the exception with a try-except block.

...

# wait for the return value

try:

    value = result.get()

except Exception as e:

    print(f'Failed with: {e}')

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

# SuperFastPython.com

# example of issuing a task with apply_async() to the thread pool and handle exception

from random import random

from time import sleep

from multiprocessing.pool import ThreadPool

# task executed in a worker thread

def task():

    # report a message

    print(f'Task executing')

    # block for a moment

    sleep(1)

    # raise an exception

    raise Exception('Something bad happened')

    # report a message

    print(f'Task done')

    # return a value

    return "DONE!"

# protect the entry point

if __name__ == '__main__':

    # create and configure the thread pool

    pool = ThreadPool()

    # issue tasks to the thread pool

    result = pool.apply_async(task)

    # wait for the return value

    try:

        value = result.get()

    except Exception as e:

        print(f'Failed with: {e}')

    # close the thread pool

    pool.close()

Running the example first creates and configures the ThreadPool.

Next, the task() function is issued to the ThreadPool and the returned AsyncResult object is assigned.

The main thread then gets the result from the AsyncResult object, blocking until the the issued task is finished.

A worker thread executes the task() function, reporting messages and sleeping for a second. The task then raises an exception.

The main thread then catches the raised exception and reports the message. It then continues on and closes the ThreadPool.

Task executing

Failed with: Something bad happened

Further Reading

This section provides additional resources that you may find helpful.

Books

I also recommend specific chapters from the following books:

  • Python Cookbook, David Beazley and Brian Jones, 2013.
    • See: Chapter 12: Concurrency
  • Effective Python, Brett Slatkin, 2019.
    • See: Chapter 7: Concurrency and Parallelism
  • Python in a Nutshell, Alex Martelli, et al., 2017.
    • See: Chapter: 14: Threads and Processes

Guides

APIs

References

    Takeaways

    You now know how to issue one-off tasks to the ThreadPool using the apply_async() method.

    Do you have any questions?
    Ask your questions in the comments below and I will do my best to answer.

    Photo by Victor B. on Unsplash