Agents Workflow with MongoDB Persistence
This project implements a CrewAI-based workflow system with comprehensive MongoDB persistence for tracking every step of the workflow execution.
Features
- MongoDB Persistence: Every step of the workflow is persisted to MongoDB for audit trails and debugging
- Workflow Tracking: Complete visibility into the execution flow with timestamps and step data
- Error Handling: Comprehensive error tracking and recovery mechanisms
- Chat History: Redis-based chat history management
- RAG Integration: Retrieval-Augmented Generation pipeline for data source queries
Architecture
Components
- MongoPersistence: Handles all MongoDB operations for workflow state tracking
- AgenticHivemindFlow: CrewAI flow that orchestrates the agent interactions
- run_hivemind_agent_activity: Temporal activity that manages the workflow execution
- QueryDataSources: Handles RAG queries with workflow ID tracking
Workflow Steps Tracked
The system tracks the following steps in MongoDB:
- initialization: Initial workflow setup with parameters
- chat_history_retrieval: Redis chat history retrieval (if applicable)
- no_chat_history: When no chat history is available
- flow_initialization: AgenticHivemindFlow setup
- flow_execution_start: Beginning of CrewAI flow execution
- local_model_classification: Local transformer model classification result
- question_classification: Language model question classification with reasoning
- rag_classification: RAG question classification with score and reasoning
- history_query_classification: History vs RAG query classification (if applicable)
- flow_execution_complete: Completion of CrewAI flow
- answer_processing: Processing of the final answer
- error_handling: Any error handling steps
- memory_update: Redis memory updates (if applicable)
- error_occurred: Any errors during execution
Environment Variables
Use the .env.example to prepare your .env file.
Classification Data Persistence
The system now persists detailed classification reasoning and results for better audit trails and debugging:
Local Model Classification
- Step Name:
local_model_classification - Data: Result from local transformer model
- Model:
local_transformer
Question Classification
- Step Name:
question_classification - Data:
result: Boolean indicating if the message is a questionreasoning: Detailed explanation for the classificationmodel:language_modelquery: Original user query
RAG Classification
- Step Name:
rag_classification - Data:
result: Boolean indicating if RAG is neededscore: Sensitivity score (0-1)reasoning: Detailed explanation for the scoremodel:language_modelquery: Original user query
History Query Classification
- Step Name:
history_query_classification - Data:
result: Boolean indicating if it's a history querymodel:openai_gpt4query: Original user queryhasChatHistory: Boolean indicating if chat history was available
MongoDB Schema
The workflow states are stored in the internal_messages collection with the following structure:
{
"_id": "ObjectId",
"communityId": "string",
"route": {
"source": "string",
"destination": {
"queue": "string",
"event": "string"
}
},
"question": {
"message": "string",
"filters": "object (optional)"
},
"response": {
"message": "string"
},
"metadata": "object",
"createdAt": "datetime",
"updatedAt": "datetime",
"steps": [
{
"stepName": "string",
"timestamp": "datetime",
"data": "object"
}
],
"currentStep": "string",
"status": "string",
"chatId": "string (optional)",
"enableAnswerSkipping": "boolean"
}Usage
Running the Worker
Querying Workflow States
You can query the MongoDB collection to inspect workflow execution:
from tasks.mongo_persistence import MongoPersistence persistence = MongoPersistence() workflow_state = persistence.get_workflow_state("workflow_id_here") print(workflow_state)
Testing
Run the unit tests:
python -m pytest tests/unit/test_mongo_persistence.py
Dependencies
pymongo==4.8.0: MongoDB driverredis==5.2.0: Redis clientcrewai==0.105.0: AI agent frameworktemporalio: Temporal workflow engineopenai==1.66.3: OpenAI API client
Workflow ID Tracking
The workflow ID is passed through the entire execution chain:
- Created in
run_hivemind_agent_activity - Passed to
AgenticHivemindFlow - Passed to
RAGPipelineTool - Passed to
QueryDataSources - Included in
HivemindQueryPayloadfor theHivemindWorkflow
This ensures complete traceability from the initial query to the final response.