A lightweight, event-driven workflow orchestration library for building AI-powered applications.
Features
- 🔄 Flexible Workflow Orchestration: Define complex workflows with multiple nodes and routing logic
- 🤖 AI Agent Integration: Built-in support for multiple LLM providers (OpenAI, Anthropic, Gemini, Bedrock, Ollama)
- ⚡ Async-First Design: Full async/await support for high-performance execution
- 🔀 Smart Routing: Route workflow execution based on dynamic conditions
- ⚙️ Concurrent Execution: Run multiple nodes in parallel for I/O-bound operations
- ✅ Type-Safe: Full Pydantic validation throughout
- 📊 DAG Validation: Automatic validation of workflow graphs (no cycles, all nodes reachable)
Installation
Basic Installation
pip install workflow-engine
With AI Provider Support
# For OpenAI pip install workflow-engine[openai] # For Anthropic pip install workflow-engine[anthropic] # For AWS Bedrock pip install workflow-engine[bedrock] # For all providers pip install workflow-engine[all-providers]
From Source
# Install in editable mode for local development
pip install -e /path/to/workflow-engineQuick Start
1. Define Your Event Schema
from pydantic import BaseModel class MyEventSchema(BaseModel): message: str priority: int
2. Create Workflow Nodes
from workflow_engine import TaskContext from workflow_engine.nodes import Node class ProcessNode(Node): async def process(self, task_context: TaskContext) -> TaskContext: # Access the event data event = task_context.event # Do some processing result = f"Processed: {event.message}" # Store results in the task context task_context.update_node(self.node_name, result=result) return task_context
3. Define Your Workflow
from workflow_engine import Workflow, WorkflowSchema, NodeConfig class MyWorkflow(Workflow): workflow_schema = WorkflowSchema( description="A simple workflow example", event_schema=MyEventSchema, start=ProcessNode, nodes=[ NodeConfig( node=ProcessNode, connections=[], description="Processes the incoming message" ) ] )
4. Execute the Workflow
# Create workflow instance workflow = MyWorkflow() # Run synchronously (creates new event loop) event_data = {"message": "Hello World", "priority": 1} result = workflow.run(event_data) # Or run asynchronously (in existing event loop) result = await workflow.run_async(event_data) # Access results print(result.nodes["ProcessNode"]["result"])
Core Concepts
Workflow
The main orchestrator that manages node execution, routing, and state management.
class MyWorkflow(Workflow): workflow_schema = WorkflowSchema( event_schema=MyEventSchema, start=FirstNode, nodes=[ NodeConfig(node=FirstNode, connections=[SecondNode]), NodeConfig(node=SecondNode, connections=[]), ] )
TaskContext
The context object passed between nodes containing:
event: The original event that triggered the workflownodes: Dictionary storing results from each nodemetadata: Workflow-level metadatashould_stop: Flag to terminate workflow early
async def process(self, task_context: TaskContext) -> TaskContext: # Access event message = task_context.event.message # Store results task_context.update_node(self.node_name, status="completed") # Stop workflow if needed if some_condition: task_context.stop_workflow() return task_context
Node Types
Base Node
The foundation for all nodes:
class CustomNode(Node): async def process(self, task_context: TaskContext) -> TaskContext: # Your processing logic here return task_context
Agent Node
For AI-powered processing:
from workflow_engine.nodes import AgentNode, AgentConfig, ModelProvider from pydantic_ai.models.openai import OpenAIModelName class AINode(AgentNode): def get_agent_config(self) -> AgentConfig: return AgentConfig( model_provider=ModelProvider.OPENAI, model_name=OpenAIModelName.GPT_4O, instructions="Analyze the input and provide insights", ) async def process(self, task_context: TaskContext) -> TaskContext: result = await self.agent.run(task_context.event.message) task_context.update_node(self.node_name, response=result.data) return task_context
Router Node
For conditional routing:
from workflow_engine.nodes import BaseRouter, RouterNode class PriorityRouter(RouterNode): def determine_next_node(self, task_context: TaskContext): if task_context.event.priority > 5: return HighPriorityNode() return LowPriorityNode() class MyRouter(BaseRouter): def __init__(self): self.routes = [PriorityRouter()] self.fallback = DefaultNode()
Concurrent Node
For parallel execution:
from workflow_engine.nodes import ConcurrentNode class ParallelProcessNode(ConcurrentNode): async def process(self, task_context: TaskContext) -> TaskContext: # Execute multiple nodes concurrently results = await self.execute_nodes_concurrently(task_context) task_context.update_node(self.node_name, results=results) return task_context
Environment Variables
For AI provider authentication:
# OpenAI OPENAI_API_KEY=your_key_here # Anthropic ANTHROPIC_API_KEY=your_key_here # Azure OpenAI AZURE_OPENAI_API_KEY=your_key_here AZURE_OPENAI_ENDPOINT=your_endpoint_here # AWS Bedrock BEDROCK_AWS_ACCESS_KEY_ID=your_key_here BEDROCK_AWS_SECRET_ACCESS_KEY=your_secret_here BEDROCK_AWS_REGION=us-east-1 # Ollama OLLAMA_BASE_URL=http://localhost:11434 # Gemini GOOGLE_API_KEY=your_key_here
Advanced Usage
Complex Routing
workflow_schema = WorkflowSchema( event_schema=MyEventSchema, start=AnalyzeNode, nodes=[ NodeConfig(node=AnalyzeNode, connections=[RouterNode]), NodeConfig( node=RouterNode, connections=[HighPriorityNode, LowPriorityNode, DefaultNode], is_router=True ), NodeConfig(node=HighPriorityNode, connections=[]), NodeConfig(node=LowPriorityNode, connections=[]), ] )
Concurrent Node Execution
NodeConfig( node=GuardrailNode, connections=[NextNode], concurrent_nodes=[ SQLInjectionCheckNode, ContentFilterNode, ToxicityCheckNode ] )
Integration with GenAI Launchpad
This library was extracted from GenAI Launchpad and can be seamlessly integrated back:
# In your launchpad application from workflow_engine import Workflow, WorkflowSchema, NodeConfig from workflow_engine.nodes import AgentNode # Define your workflow using the library class CustomerSupportWorkflow(Workflow): workflow_schema = WorkflowSchema( event_schema=SupportEventSchema, start=AnalyzeRequestNode, nodes=[...] )
Development
Setup
git clone https://github.com/lestarr/workflow-engine.git cd workflow-engine pip install -e ".[dev]"
Running Tests
License
MIT
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.