GitHub - lestarr/workflow-engine

A lightweight, event-driven workflow orchestration library for building AI-powered applications.

Features

  • 🔄 Flexible Workflow Orchestration: Define complex workflows with multiple nodes and routing logic
  • 🤖 AI Agent Integration: Built-in support for multiple LLM providers (OpenAI, Anthropic, Gemini, Bedrock, Ollama)
  • ⚡ Async-First Design: Full async/await support for high-performance execution
  • 🔀 Smart Routing: Route workflow execution based on dynamic conditions
  • ⚙️ Concurrent Execution: Run multiple nodes in parallel for I/O-bound operations
  • ✅ Type-Safe: Full Pydantic validation throughout
  • 📊 DAG Validation: Automatic validation of workflow graphs (no cycles, all nodes reachable)

Installation

Basic Installation

pip install workflow-engine

With AI Provider Support

# For OpenAI
pip install workflow-engine[openai]

# For Anthropic
pip install workflow-engine[anthropic]

# For AWS Bedrock
pip install workflow-engine[bedrock]

# For all providers
pip install workflow-engine[all-providers]

From Source

# Install in editable mode for local development
pip install -e /path/to/workflow-engine

Quick Start

1. Define Your Event Schema

from pydantic import BaseModel

class MyEventSchema(BaseModel):
    message: str
    priority: int

2. Create Workflow Nodes

from workflow_engine import TaskContext
from workflow_engine.nodes import Node

class ProcessNode(Node):
    async def process(self, task_context: TaskContext) -> TaskContext:
        # Access the event data
        event = task_context.event

        # Do some processing
        result = f"Processed: {event.message}"

        # Store results in the task context
        task_context.update_node(self.node_name, result=result)

        return task_context

3. Define Your Workflow

from workflow_engine import Workflow, WorkflowSchema, NodeConfig

class MyWorkflow(Workflow):
    workflow_schema = WorkflowSchema(
        description="A simple workflow example",
        event_schema=MyEventSchema,
        start=ProcessNode,
        nodes=[
            NodeConfig(
                node=ProcessNode,
                connections=[],
                description="Processes the incoming message"
            )
        ]
    )

4. Execute the Workflow

# Create workflow instance
workflow = MyWorkflow()

# Run synchronously (creates new event loop)
event_data = {"message": "Hello World", "priority": 1}
result = workflow.run(event_data)

# Or run asynchronously (in existing event loop)
result = await workflow.run_async(event_data)

# Access results
print(result.nodes["ProcessNode"]["result"])

Core Concepts

Workflow

The main orchestrator that manages node execution, routing, and state management.

class MyWorkflow(Workflow):
    workflow_schema = WorkflowSchema(
        event_schema=MyEventSchema,
        start=FirstNode,
        nodes=[
            NodeConfig(node=FirstNode, connections=[SecondNode]),
            NodeConfig(node=SecondNode, connections=[]),
        ]
    )

TaskContext

The context object passed between nodes containing:

  • event: The original event that triggered the workflow
  • nodes: Dictionary storing results from each node
  • metadata: Workflow-level metadata
  • should_stop: Flag to terminate workflow early
async def process(self, task_context: TaskContext) -> TaskContext:
    # Access event
    message = task_context.event.message

    # Store results
    task_context.update_node(self.node_name, status="completed")

    # Stop workflow if needed
    if some_condition:
        task_context.stop_workflow()

    return task_context

Node Types

Base Node

The foundation for all nodes:

class CustomNode(Node):
    async def process(self, task_context: TaskContext) -> TaskContext:
        # Your processing logic here
        return task_context

Agent Node

For AI-powered processing:

from workflow_engine.nodes import AgentNode, AgentConfig, ModelProvider
from pydantic_ai.models.openai import OpenAIModelName

class AINode(AgentNode):
    def get_agent_config(self) -> AgentConfig:
        return AgentConfig(
            model_provider=ModelProvider.OPENAI,
            model_name=OpenAIModelName.GPT_4O,
            instructions="Analyze the input and provide insights",
        )

    async def process(self, task_context: TaskContext) -> TaskContext:
        result = await self.agent.run(task_context.event.message)
        task_context.update_node(self.node_name, response=result.data)
        return task_context

Router Node

For conditional routing:

from workflow_engine.nodes import BaseRouter, RouterNode

class PriorityRouter(RouterNode):
    def determine_next_node(self, task_context: TaskContext):
        if task_context.event.priority > 5:
            return HighPriorityNode()
        return LowPriorityNode()

class MyRouter(BaseRouter):
    def __init__(self):
        self.routes = [PriorityRouter()]
        self.fallback = DefaultNode()

Concurrent Node

For parallel execution:

from workflow_engine.nodes import ConcurrentNode

class ParallelProcessNode(ConcurrentNode):
    async def process(self, task_context: TaskContext) -> TaskContext:
        # Execute multiple nodes concurrently
        results = await self.execute_nodes_concurrently(task_context)

        task_context.update_node(self.node_name, results=results)
        return task_context

Environment Variables

For AI provider authentication:

# OpenAI
OPENAI_API_KEY=your_key_here

# Anthropic
ANTHROPIC_API_KEY=your_key_here

# Azure OpenAI
AZURE_OPENAI_API_KEY=your_key_here
AZURE_OPENAI_ENDPOINT=your_endpoint_here

# AWS Bedrock
BEDROCK_AWS_ACCESS_KEY_ID=your_key_here
BEDROCK_AWS_SECRET_ACCESS_KEY=your_secret_here
BEDROCK_AWS_REGION=us-east-1

# Ollama
OLLAMA_BASE_URL=http://localhost:11434

# Gemini
GOOGLE_API_KEY=your_key_here

Advanced Usage

Complex Routing

workflow_schema = WorkflowSchema(
    event_schema=MyEventSchema,
    start=AnalyzeNode,
    nodes=[
        NodeConfig(node=AnalyzeNode, connections=[RouterNode]),
        NodeConfig(
            node=RouterNode,
            connections=[HighPriorityNode, LowPriorityNode, DefaultNode],
            is_router=True
        ),
        NodeConfig(node=HighPriorityNode, connections=[]),
        NodeConfig(node=LowPriorityNode, connections=[]),
    ]
)

Concurrent Node Execution

NodeConfig(
    node=GuardrailNode,
    connections=[NextNode],
    concurrent_nodes=[
        SQLInjectionCheckNode,
        ContentFilterNode,
        ToxicityCheckNode
    ]
)

Integration with GenAI Launchpad

This library was extracted from GenAI Launchpad and can be seamlessly integrated back:

# In your launchpad application
from workflow_engine import Workflow, WorkflowSchema, NodeConfig
from workflow_engine.nodes import AgentNode

# Define your workflow using the library
class CustomerSupportWorkflow(Workflow):
    workflow_schema = WorkflowSchema(
        event_schema=SupportEventSchema,
        start=AnalyzeRequestNode,
        nodes=[...]
    )

Development

Setup

git clone https://github.com/lestarr/workflow-engine.git
cd workflow-engine
pip install -e ".[dev]"

Running Tests

License

MIT

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.