How to Use ThreadPool apply() in Python - Super Fast Python

Last Updated on October 29, 2022

You can call apply() to issue tasks to the thread pool and block the caller until the task is complete.

In this tutorial you will discover how to issue one-off tasks to the ThreadPool in Python.

Let’s get started.

Need to Issue Tasks To The ThreadPool

The multiprocessing.pool.ThreadPool in Python provides a pool of reusable threads for executing ad hoc tasks.

A thread pool object which controls a pool of worker threads to which jobs can be submitted.

multiprocessing — Process-based parallelism

The ThreadPool class extends the Pool class. The Pool class provides a pool of worker processes for process-based concurrency.

Although the ThreadPool class is in the multiprocessing module it offers thread-based concurrency and is best suited to IO-bound tasks, such as reading or writing from sockets or files.

A ThreadPool can be configured when it is created, which will prepare the new threads.

We can issue one-off tasks to the ThreadPool using methods such as apply() or we can apply the same function to an iterable of items using functions such as map().

Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as apply_async() and map_async().

The ThreadPool allows you to issue tasks in the form of target functions to be executed by the worker threads.

How can we issue one-off tasks to the ThreadPool?

How to Issue One-Off Tasks to the ThreadPool

We can issue one-off tasks to the ThreadPool using the apply() method.

The apply() method takes the name of the function to be executed by a worker thread.

For example:

...

# issue a task to the thread pool

pool.apply(task)

The call will block until the function is executed by a worker thread, after which time it will return.

Call func with arguments args and keyword arguments kwds. It blocks until the result is ready.

multiprocessing — Process-based parallelism

If the function to be executed takes arguments they can be specified as a tuple to the “args” argument or a dictionary via the “kwds” argument.

For example:

...

# issue a task to the thread pool with arguments

pool.apply(task, args=(arg1, arg2, arg3))

Difference Between apply() vs apply_async()

How does the apply() methods compare to the apply_async() for issuing tasks?

Both the apply() and apply_async() may be used to issue one-off tasks to the thread pool.

The following summarizes the key differences between these two methods:

  • The apply() method blocks, whereas the apply_async() method does not block.
  • The apply() method returns the result of the target function, whereas the apply_async() method returns an AsyncResult.
  • The apply() method does not take callbacks, whereas the apply_async() method does take callbacks.

The apply() method should be used for issuing target task functions to the thread pool where the caller can or must block until the task is complete.

The apply_async() method should be used for issuing target task functions to the ThreadPool where the caller cannot or must not block while the task is executing.

Now that we know how to issue one-off tasks to the ThreadPool, let’s look at some worked examples.


Free Python ThreadPool Course

Download your FREE ThreadPool PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPool API.

Discover how to use the ThreadPool including how to configure the number of worker threads and how to execute tasks asynchronously

Learn more
 


Example of Calling apply()

The apply() method can be called directly to execute a target function in the ThreadPool.

The call will block until the function is executed by a worker thread.

The example below demonstrates this by defining a task that reports a message and blocks for one second.

The task() function implements this.

# task executed in a worker thread

def task():

    # report a message

    print(f'Task executing')

    # block for a moment

    sleep(1)

    # report a message

    print(f'Task done')

We can then create and configure a ThreadPool with the default configuration.

...

# create and configure the thread pool

pool = Pool()

Next, we can issue the task() function to the ThreadPool and block until it is executed.

...

# issue tasks to the thread pool

pool.apply(task)

Finally, we can close the ThreadPool and release the resources.

...

# close the thread pool

pool.close()

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

# SuperFastPython.com

# example of issuing a task with apply() to the thread pool

from time import sleep

from multiprocessing.pool import ThreadPool

# task executed in a worker thread

def task():

    # report a message

    print(f'Task executing')

    # block for a moment

    sleep(1)

    # report a message

    print(f'Task done')

# protect the entry point

if __name__ == '__main__':

    # create and configure the thread pool

    pool = ThreadPool()

    # issue tasks to the thread pool

    pool.apply(task)

    # close the thread pool

    pool.close()

Running the example first creates and configures the ThreadPool.

Next, the task() function is issued to the ThreadPool. The main thread blocks until the task is executed.

A worker thread executes the task() function, reporting messages and sleeping for a second. The task is finished and returns.

The main thread continues on and closes the ThreadPool.

Next, let’s look at an example of issuing a task function with arguments

Example of Calling apply() With Arguments

We can call apply() to issue a task to the ThreadPool that takes arguments.

This can be achieved by passing a tuple of arguments to the “args” argument or a dictionary of arguments to the “kwds” argument.

In this example, we can update the previous examples so that our task() function takes one argument that is then reported in printed messages.

The updated task() function with this change is listed below.

# task executed in a worker thread

def task(data):

    # report a message

    print(f'Task executing: {data}')

    # block for a moment

    sleep(1)

    # report a message

    print(f'Task done: {data}')

We can then update the call to apply() to issue the task() function and specify a tuple containing one argument.

...

# issue tasks to the thread pool

pool.apply(task, args=('Hello World',))

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

# SuperFastPython.com

# example of issuing a task with apply() with arguments to the thread pool

from time import sleep

from multiprocessing.pool import ThreadPool

# task executed in a worker thread

def task(data):

    # report a message

    print(f'Task executing: {data}')

    # block for a moment

    sleep(1)

    # report a message

    print(f'Task done: {data}')

# protect the entry point

if __name__ == '__main__':

    # create and configure the thread pool

    pool = ThreadPool()

    # issue tasks to the thread pool

    pool.apply(task, args=('Hello World',))

    # close the thread pool

    pool.close()

Running the example first creates and configures the ThreadPool.

Next, the task() function is issued to the ThreadPool with an argument. The main thread blocks until the task is executed.

A worker thread executes the task() function, reporting messages with the provided argument and sleeping for a second. The task is finished and returns.

The main thread continues on and closes the ThreadPool.

Task executing: Hello World

Task done: Hello World

Next, let’s look at an example of issuing a task function that returns a value.


Python ThreadPool Jump-Start

Loving The Tutorials?

Why not take the next step? Get the book.

Learn more
 


Example of Calling apply() With a Return Value

We can call apply() to issue a target task function that returns a value.

This can be achieved by specifying the function that returns a value as an argument to apply(), and the function will be executed by the ThreadPool, returning the value directly once completed.

In this example, we can update the task() function from the previous example to generate a random value between 0 and 1, report this value, then return it to the calling thread.

The updated task() function is listed below.

# task executed in a worker thread

def task():

    # generate a value

    value = random()

    # report a message

    print(f'Task executing: {value}')

    # block for a moment

    sleep(1)

    # report a message

    print(f'Task done {value}')

    return value

We can then call the apply() method from the main thread and assign the return value to a variable, then report the value that was returned.

...

# issue tasks to the thread pool

result = pool.apply(task)

# report value

print(f'Main got: {result}')

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

# SuperFastPython.com

# example of issuing a task with apply() to the thread pool with a return value

from random import random

from time import sleep

from multiprocessing.pool import ThreadPool

# task executed in a worker thread

def task():

    # generate a value

    value = random()

    # report a message

    print(f'Task executing: {value}')

    # block for a moment

    sleep(1)

    # report a message

    print(f'Task done {value}')

    return value

# protect the entry point

if __name__ == '__main__':

    # create and configure the thread pool

    pool = ThreadPool()

    # issue tasks to the thread pool

    result = pool.apply(task)

    # report value

    print(f'Main got: {result}')

    # close the thread pool

    pool.close()

Running the example first creates and configures the ThreadPool.

Next, the task() function is issued to the ThreadPool. The main thread blocks until the task is executed.

A worker thread executes the task() function, generating a random value, reporting messages and sleeping for a second. The task is finished and returns the value that was generated.

The main thread continues on, reports the same return value and then closes the ThreadPool.

Task executing: 0.9420120071068216

Task done 0.9420120071068216

Main got: 0.9420120071068216

Next, let’s look at an example of issuing a target task function that may raise an exception.

Example of Calling apply() With an Exception

We can call apply() to issue a task to the ThreadPool that may raise an exception that is not handled.

The ThreadPool will trap the exception for us, then re-raise the exception in the calling thread that issued the task.

In this example, we can update the example above so that the task() function will raise an exception that is left unhandled.

The updated task() function with this change is listed below.

# task executed in a worker thread

def task():

    # report a message

    print(f'Task executing')

    # block for a moment

    sleep(1)

    # fail

    raise Exception('Something bad happened')

    # report a message

    print(f'Task done')

We can then issue the task() function to the ThreadPool, but wrap the call in a try-except block, to handle the possible exception.

...

# issue tasks to the thread pool

try:

    pool.apply(task)

except Exception as e:

    print(f'Failed with: {e}')

Tying this together, the complete example is listed below.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

# SuperFastPython.com

# example of issuing a task with apply() to the thread pool that raises an exception

from time import sleep

from multiprocessing.pool import ThreadPool

# task executed in a worker thread

def task():

    # report a message

    print(f'Task executing')

    # block for a moment

    sleep(1)

    # fail

    raise Exception('Something bad happened')

    # report a message

    print(f'Task done')

# protect the entry point

if __name__ == '__main__':

    # create and configure the thread pool

    pool = ThreadPool()

    # issue tasks to the thread pool

    try:

        pool.apply(task)

    except Exception as e:

        print(f'Failed with: {e}')

    # close the thread pool

    pool.close()

Running the example first creates and configures the ThreadPool.

Next, the task() function is issued to the ThreadPool. The main thread blocks until the task is executed.

A worker thread executes the task() function, reporting messages and sleeping for a second. The task then raises an exception.

The ThreadPool traps the exception and re-raises it in the calling thread.

The main thread continues on and handles the re-raised exception. It then closes the ThreadPool.

Task executing

Failed with: Something bad happened

Further Reading

This section provides additional resources that you may find helpful.

Books

I also recommend specific chapters from the following books:

  • Python Cookbook, David Beazley and Brian Jones, 2013.
    • See: Chapter 12: Concurrency
  • Effective Python, Brett Slatkin, 2019.
    • See: Chapter 7: Concurrency and Parallelism
  • Python in a Nutshell, Alex Martelli, et al., 2017.
    • See: Chapter: 14: Threads and Processes

Guides

APIs

References

    Takeaways

    You now know how to issue one-off tasks to the ThreadPool using the apply() method.

    Do you have any questions?
    Ask your questions in the comments below and I will do my best to answer.

    Photo by James Coleman on Unsplash