Interceptors - Python SDK | Temporal Platform Documentation
Interceptors are SDK hooks that let you intercept inbound and outbound Temporal calls. You use them to add common behavior across many calls, such as tracing and context propagation. This is similar to using middleware in web frameworks such as Django, Starlette, and Flask.
There are two types of interceptors--inbound and outbound.
- Outbound interceptors wrap network calls, running before they reach the network and after they return.
- Inbound interceptors run after the network hop, wrapping application code and running before it starts and after it returns.
Concretely, there are five categories of inbound and outbound calls that you can modify in this way:
| Outbound Client | Inbound Workflow | Outbound Workflow | Inbound Activity | Outbound Activity | |
|---|---|---|---|---|---|
| Description | Wraps calls from your application to the Temporal Client to start a Workflow or send Messages to it | Wraps calls arriving into a Workflow Execution, such as executing the Workflow, handling Messages | Wraps calls a Workflow makes to the SDK, such as scheduling Activities, starting Child Workflows, and invoking Nexus Operations | Wraps calls arriving into an Activity Execution | Wraps calls an Activity makes to the SDK, such as sending Heartbeats and reading Activity info |
| Runs on | Client | Worker (Workflow sandbox) | Worker (Workflow sandbox) | Worker (Activity context) | Worker (Activity context) |
| Example methods | start_workflow(), signal_workflow(), list_workflows() | execute_workflow(), handle_query(), handle_signal(), handle_update_handler() | start_activity(), start_child_workflow(), signal_child_workflow(), start_nexus_operation() | execute_activity() | info(), heartbeat() |
These are not exhaustive lists; refer to the linked API docs for each category.
Workflow interceptors and replay
Workflow inbound and outbound interceptor methods also execute during replay. Use replay-safe APIs for logging, randomness, and time in these interceptors. See Develop Workflow logic for details.
If you want to write generic code shared by all inbound Workflow call handlers but want to skip read-only operations, check workflow.unsafe.is_read_only().
Activity and Client interceptors are not affected by replay.
Register an Interceptor
Registering an interceptor means supplying an interceptor instance to the SDK so Temporal can invoke it when matching Client or Worker calls occur. Once registered, the interceptor runs as part of the call path and can observe or modify request and response data.
Register on the Client
Pass interceptors in the interceptors argument of Client.connect(). Client interceptors modify outbound calls such
as starting and signaling Workflows.
client = await Client.connect(
"localhost:7233",
interceptors=[TracingInterceptor()],
)
The interceptors list can contain multiple interceptors.
In this case they form a chain: a method implemented on an interceptor instance in the list can perform side effects, and modify the data, before passing it on to the corresponding method on the next interceptor in the list.
Register via a Plugin
If you're building a reusable library or want to bundle interceptors with other primitives, you can register them through a Plugin.
Register on the Worker only
If your interceptor doesn't affect the Client, you can pass interceptors in the interceptors argument of Worker().
Worker interceptors modify inbound and outbound Workflow and Activity calls.
worker = Worker(
client,
task_queue="my-task-queue",
interceptors=[SomeWorkerInterceptor()],
# ...
)
note
If your interceptor class inherits from both client.Interceptor and worker.Interceptor, pass it to
Client.connect() rather than the Worker() constructor. The Worker will use interceptors from its underlying Client
automatically.
Register an Interceptor
Registering an interceptor means supplying an interceptor instance to the SDK so Temporal can invoke it when matching Client or Worker calls occur. Once registered, the interceptor runs as part of the call path and can observe or modify request and response data.
Register on the Client
Pass interceptors in the interceptors argument of Client.connect(). Client interceptors modify outbound calls such
as starting and signaling Workflows.
client = await Client.connect(
"localhost:7233",
interceptors=[TracingInterceptor()],
)
The interceptors list can contain multiple interceptors. In this case they form a chain: a method implemented on an
interceptor instance in the list can perform side effects, and modify the data, before passing it on to the
corresponding method on the next interceptor in the list.
Register via a Plugin
If you're building a reusable library or want to bundle interceptors with other primitives, you can register them through a Plugin.
Register on the Worker only
If your interceptor doesn't affect the Client, you can pass interceptors in the interceptors argument of Worker().
Worker interceptors modify inbound and outbound Workflow and Activity calls.
worker = Worker(
client,
task_queue="my-task-queue",
interceptors=[SomeWorkerInterceptor()],
# ...
)
note
If your interceptor class inherits from both client.Interceptor and worker.Interceptor, pass it to
Client.connect() rather than the Worker() constructor. The Worker will use interceptors from its underlying Client
automatically.
How to implement Interceptors in Python
Interceptors run as a chain. Each interceptor wraps the entire inner call: your code runs before the call, invokes next to execute the rest of the chain, and then runs after the call completes. This means you can inspect or modify both the input and the result, handle errors, and perform side effects at either stage.
Implementing Client call Interceptors
To modify outbound Client calls, define a class inheriting from
client.Interceptor, and implement the method
intercept_client() to return an instance of
OutboundInterceptor that implements the
subset of outbound Client calls that you wish to modify.
This example implements an Interceptor on outbound Client calls that sets a certain key in the outbound headers field.
A User ID is context-propagated by being sent in a header field with outbound requests:
class ContextPropagationInterceptor(
temporalio.client.Interceptor, temporalio.worker.Interceptor
):
def __init__(
self,
payload_converter: temporalio.converter.PayloadConverter = temporalio.converter.default().payload_converter,
) -> None:
self._payload_converter = payload_converter
def intercept_client(
self, next: temporalio.client.OutboundInterceptor
) -> temporalio.client.OutboundInterceptor:
return _ContextPropagationClientOutboundInterceptor(
next, self._payload_converter
)
def set_header_from_context(
input: _InputWithHeaders, payload_converter: temporalio.converter.PayloadConverter
) -> None:
user_id_val = user_id.get()
if user_id_val:
input.headers = {
**input.headers,
HEADER_KEY: payload_converter.to_payload(user_id_val),
}
class _ContextPropagationClientOutboundInterceptor(
temporalio.client.OutboundInterceptor
):
def __init__(
self,
next: temporalio.client.OutboundInterceptor,
payload_converter: temporalio.converter.PayloadConverter,
) -> None:
super().__init__(next)
self._payload_converter = payload_converter
async def start_workflow(
self, input: temporalio.client.StartWorkflowInput
) -> temporalio.client.WorkflowHandle[Any, Any]:
set_header_from_context(input, self._payload_converter)
return await super().start_workflow(input)
It often happens that your Worker and Client interceptors will share code because they implement closely related logic.
In the Python SDK, you will typically want to create an interceptor class that inherits from both client.Interceptor
and worker.Interceptor as above, since their method sets do not overlap.
You can then register this interceptor in your client/starter code.
Your interceptor classes need not implement every method; the default implementation is always to pass the data on to the next method in the interceptor chain. During execution, when the SDK encounters an Inbound Activity call, it will look to the first Interceptor instance, get hold of the appropriate intercepted method, and call it. The intercepted method will perform its function then call the same method on the next Interceptor in the chain. At the end of the chain the SDK will call the "real" SDK method.
Implementing Worker call Interceptors
To modify inbound and outbound Workflow and Activity calls, define a class inheriting from worker.Interceptor. This is
an interface with two methods named intercept_activity and workflow_interceptor_class, which you can use to
configure interceptions of Activity and Workflow calls, respectively. intercept_activity returns an
ActivityInboundInterceptor.
This example demonstrates using an interceptor to measure Schedule-To-Start and Schedule-To-Close latency.
Notice how the interceptor wraps the call: it records Schedule-To-Start before execute_activity, then records Schedule-To-Close after it completes:
from datetime import datetime, timezone
from temporalio import activity
from temporalio.worker import (
ActivityInboundInterceptor,
ExecuteActivityInput,
Interceptor,
Worker,
)
class SimpleWorkerInterceptor(Interceptor):
def intercept_activity(
self, next: ActivityInboundInterceptor
) -> ActivityInboundInterceptor:
return ActivityMetricsInterceptor(next)
class ActivityMetricsInterceptor(ActivityInboundInterceptor):
async def execute_activity(self, input: ExecuteActivityInput):
info = activity.info()
meter = activity.metric_meter()
attrs = {"workflow_type": info.workflow_type}
# Before the activity executes: record Schedule-To-Start
schedule_to_start = info.started_time - info.current_attempt_scheduled_time
meter.create_histogram_timedelta(
"custom_activity_schedule_to_start_latency",
description="Time between activity scheduling and start",
unit="duration",
).record(schedule_to_start, attrs)
# Execute the activity
result = await self.next.execute_activity(input)
# After the activity completes: record Schedule-To-Close
elapsed = datetime.now(timezone.utc) - info.current_attempt_scheduled_time
meter.create_histogram_timedelta(
"custom_activity_schedule_to_close_latency",
description="Time between activity scheduling and completion",
unit="duration",
).record(elapsed, attrs)
return result
client = await Client.connect(
"localhost:7233",
)
worker = Worker(
client,
interceptors=[SimpleWorkerInterceptor()],
# ...
)
The workflow_interceptor_class returns a WorkflowInboundInterceptor that works similarly to
ActivityInboundInterceptor.