Asynchronous Tool Execution by LucaButBoring · Pull Request #1398 · modelcontextprotocol/python-sdk

This PR implements the required changes for modelcontextprotocol/modelcontextprotocol#1391, which adds asynchronous tool execution.

This is a large PR, and I expect that if the associated SEP is accepted, we might want to break this down into several smaller PRs for SDK reviewers. I tried to generally have separate commits for each step of the implementation, to try and make this easier to review in its current form.

Motivation and Context

Today, most applications integrate with tools in a straightforward but naive manner, choosing to have agents invoke tools synchronously with the conversation instead of allowing agents to multitask where possible. There are a few reasons why we believe this is the case, including the lack of clarity around tool interfaces (single tool or multiple for job tracking), model failures when manually polling on operations, and not having a way to retrieve results with a well-defined TTL, among other problems (described in more detail in the linked issue). Here, we introduce an alternative API that establishes a clear integration path for async job-style use cases that are typically on the order of minutes to hours.

The ultra high-level overview is as follows:

  • Tools now support synchronous or asynchronous invocation modes
  • A single tool only advertises itself as either sync or async to a given client, controlled by protocol version
  • Sync tools behave just like they always did
  • Async tools are split into start/poll/retrieve stages:
    • Starting a call:
      • tools/call begins an async tool call
      • The result is a CallToolResult containing an operation token, which is used to interact with the async tool call across multiple RPC calls
    • Polling:
      • The operation token is used to call tools/async/status, which returns the current operation status
      • The client should poll this method until the status reaches a terminal value
    • Result retrieval:
      • The operation token is used to call tools/async/result, which has the final tool output

Whether a tool is sync, async, or both (on old/new protocol versions) is defined by tool implementors. This enables remote server operators to control this based on how long each tool is expected to take to execute, rather than potentially serving HTTP requests with widely varying execution times on the same endpoint. This also makes it much more clear to client applications what the "time contract" of a tool is, so that fast tools can still be executed synchronously while allowing long-running tools to be immediately backgrounded.

Usage

Defining an async-compatible tool is just a matter of adjusting the @mcp.tool() decorator to include an invocation_modes parameter, which is a list of "sync" and "async":

@mcp.tool(invocation_modes=["async", "sync"])
async def data_processing_tool(dataset: str, operations: list[str], ctx: Context) -> dict[str, str]:
    await ctx.info(f"Starting data processing pipeline for {dataset}")

    results: dict[str, str] = {}
    total_ops = len(operations)

    for i, operation in enumerate(operations):
        await ctx.debug(f"Executing operation: {operation}")
        await asyncio.sleep(0.5 + (i * 0.2))  # Simulate processing time
        progress = (i + 1) / total_ops  # Report progress
        await ctx.report_progress(progress, 1.0, f"Completed {operation}")
        results[operation] = f"Result of {operation} on {dataset}"  # Store result

    await ctx.info("Data processing pipeline complete!")
    return results

If invocation_modes contains "async", the tool is async-compatible and will only be called in async mode by clients on new versions, while if it contains "sync", the tool is sync-compatible and will be called in sync mode if async mode is not supported (a client will never have the option to choose one or the other itself).

Behind the scenes, the SDK handles branching the behavior to either run synchronously (like today) or asynchronously (immediate return with job tracking) depending on if the client version supports async tools yet or not.

To control how long the results are kept for to retrieve with tools/async/result, we can use the keep_alive parameter:

@mcp.tool(invocation_modes=["async", "sync"], keep_alive=30)  # retain result for 30s following completion

We can also customize the content returned in the immediate CallToolResult with the immediate_result parameter:

async def immediate_feedback(operation: str) -> list[types.ContentBlock]:
    return [types.TextContent(type="text", text=f"Starting {operation}... This may take a moment.")]

@mcp.tool(invocation_modes=["async", "sync"], immediate_result=immediate_feedback)

On the client side, we just add the polling and result retrieval like so:

async def demonstrate_data_processing(session: ClientSession):
    """Demonstrate data processing pipeline."""
    print("\n=== Data Processing Pipeline Demo ===")

    # Just like before
    operations = ["validate", "clean", "transform", "analyze", "export"]
    result = await session.call_tool(
        "data_processing_tool", arguments={"dataset": "customer_data.csv", "operations": operations}
    )

    # We could choose to sent the immediate result content to an agent from here before continuing

    # New parts
    if result.operation:
        token = result.operation.token
        print(f"Data processing started with token: {token}")

        # Poll for completion
        while True:
            status = await session.get_operation_status(token)
            print(f"Status: {status.status}")

            if status.status == "completed":
                final_result = await session.get_operation_result(token)

                # Show structured result if available
                if final_result.result.structuredContent:
                    print("Processing results:")
                    for op, result_text in final_result.result.structuredContent.items():
                        print(f"  {op}: {result_text}")
                break
            elif status.status == "failed":
                print(f"Processing failed: {status.error}")
                break
            elif status.status in ("canceled", "unknown"):
                print(f"Processing ended with status: {status.status}")
                break

            await asyncio.sleep(0.8)

How Has This Been Tested?

Unit tests, integration tests, and new example snippets.

Breaking Changes

Existing users will not need to update their applications to continue using synchronous tool calls. Asynchronous tool calls will require minor code changes that will be documented.

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)
  • Documentation update

Checklist

  • I have read the MCP Documentation
  • My code follows the repository's style guidelines
  • New and existing tests pass locally
  • I have added appropriate error handling
  • I have added or updated documentation as needed

Additional context

There were a bunch of decisions in the implementation we may want to discuss further, some of which were due to ambiguity in the proposal (which will be revised again) and some of which were due to working things into the SDK implementation.

  • I added a faux-version called next to deal with the requirement that sessions on the current-latest version always advertise as sync-only. The tests and examples explicitly set the advertised client protocol version to next when calling async-only tools.
  • Reusing the existing tool/call method creates some ambiguities in how outputSchema should be handled, as the immediate tool call result (communicating an accepted state) would no longer have meaningful structuredContent. The output that should be validated is actually the result of GetOperationPayloadResult, so for now I'm skipping validation of the immediate CallToolResult (only in async execution) and only validating GetOperationPayloadResult (sync tool executions are always validated, just like before).
  • keepAlive should have a sentinel value representing "no expiration," and I'm leaning towards None. However, in SDK implementations, that becomes somewhat ambiguous with sync tool calls, which also implicitly have a keepAlive of None already. For now, I default it to 1 hour if not specified/None, but this should probably be changed before this is merged.
  • In sHTTP, the SDK has behavior to send tool-related server messages on the same SSE stream that the server used as a response to the client's CallToolRequest, by attaching a related_request_id to the stream for fast lookups and session resumption. To support sampling and elicitation, we keep a map of operation tokens to their original request IDs to reuse the same event store entry between related calls.
  • The client session needs to cache a mapping of in-flight operation tokens to tool names for validating structuredContent in async tool calls, as it otherwise has no way to look up the cached outputSchema. We could consider including a toolName in GetOperationPayloadResult to avoid the inconvenience, but in this draft I'm using a cache expiry based on keepAlive to avoid holding that mapping forever.