temporalio.worker.WorkflowRunner

@abstractmethod

def create_instance(self, det: WorkflowInstanceDetails) -> WorkflowInstance: (source)

Create a workflow instance that can handle activations.

Parameters
det:WorkflowInstanceDetailsDetails that can be used to create the instance.
Returns
WorkflowInstanceWorkflow instance that can handle activations.

@abstractmethod

def prepare_workflow(self, defn: temporalio.workflow._Definition): (source)

Prepare a workflow for future execution.

This is run once for each workflow definition when a worker starts. This allows the runner to do anything necessarily to prepare for this definition to be used multiple times in create_instance.

Parameters
defn:temporalio.workflow._DefinitionThe workflow definition.

def set_worker_level_failure_exception_types(self, types: Sequence[type[BaseException]]): (source)

Set worker-level failure exception types that will be used to validate in the sandbox when calling prepare_workflow.

Parameters
types:Sequence[type[BaseException]]Exception types.