Code Assistant with Message Queue Architecture
基于消息队列架构的代码助手系统,使用LangGraph和本地LLM服务实现。
🏗️ 系统架构
┌─────────────────────────────────────────────────────────────────────────────┐
│ User Interface Layer │
│ (Web API / CLI / Direct Call) │
└─────────────────────┬───────────────────────────────┬───────────────────────┘
│ │
▼ ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ LangGraph Application Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌──────────────┐ │
│ │ START │───▶│ CODER │───▶│ CHECKER │───▶│ DEBUG │ │
│ │ Node │ │ Agent │ │ Agent │ │ Agent │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └──────────────┘ │
└──────────┬──────────┬──────────┬──────────┬──────────┬──────────┬──────────┘
│ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ Message Queue Layer (Redis/RabbitMQ) │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌──────────────┐ │
│ │ Producer │ │ Consumer │ │ Producer │ │ Consumer │ │
│ │ (Coder) │ │ (Checker) │ │ (Checker) │ │ (Debugger) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └──────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ Message Queue Topics/Channels │ │
│ │ • coder.requests • checker.requests • debugger.requests │ │
│ │ • coder.responses • checker.responses • debugger.responses │ │
│ │ • system.status • error.logs • metrics │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
└─────────────────────┬───────────────────────────────┬───────────────────────┘
│ │
▼ ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ Local LLM Service Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌──────────────┐ │
│ │Coder Service│ │Checker Serv.│ │Debugger Serv│ │ Health Mon │ │
│ │(Port 8001) │ │(Port 8002) │ │(Port 8003) │ │ (Port 8004) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
🚀 快速开始
前提条件
- Docker 和 Docker Compose
- Python 3.9+
- Redis 或 RabbitMQ
1. 使用Docker Compose启动(推荐)
# 启动所有服务 docker-compose up -d # 查看服务状态 docker-compose ps # 查看日志 docker-compose logs -f
2. 手动启动
启动消息队列
# Redis docker run -d --name redis -p 6379:6379 redis:7-alpine # 或 RabbitMQ docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management-alpine
启动本地LLM服务器
python simple_llm_server.py
启动LLM服务
# 启动Coder服务 python local_llm_service_restful.py --service coder --port 8001 # 启动Checker服务 python local_llm_service_restful.py --service checker --port 8002 # 启动Debugger服务 python local_llm_service_restful.py --service debugger --port 8003
启动LangGraph应用
python main_langgraph.py --queue-type redis --redis-host localhost --redis-port 6379
📋 API 使用
生成代码
curl -X POST http://localhost:8004/generate \ -H "Content-Type: application/json" \ -d '{ "question": "How do I build a RAG chain in LCEL?", "context": "LCEL documentation", "max_iterations": 3 }'
响应示例:
{
"success": true,
"session_id": "session_1234567890",
"code_solution": {
"prefix": "A RAG chain implementation using LCEL",
"imports": "from langchain_core.prompts import ChatPromptTemplate\nfrom langchain_openai import ChatOpenAI",
"code": "# Build RAG chain\nrag_chain = prompt | model | parser"
},
"iterations": 2,
"execution_time": 5.23,
"error": null
}健康检查
curl http://localhost:8004/health
监控仪表板
# 系统状态 curl http://localhost:8004/status # 监控指标 curl http://localhost:8004/monitoring/metrics # 告警信息 curl http://localhost:8004/monitoring/alerts
🧪 测试系统
本项目包含完整的测试套件,用于验证系统功能和性能。
运行所有测试
# 运行所有测试 python test/run_tests.py --all # 运行特定类型的测试 python test/run_tests.py --unit # 单元测试 python test/run_tests.py --integration # 集成测试 python test/run_tests.py --performance # 性能测试
测试结果
| 测试类型 | 成功率 | 关键指标 |
|---|---|---|
| Redis基础功能 | 100% | 4/4测试通过 |
| 消息队列功能 | 100% | 9条消息处理 |
| 简化系统测试 | 100% | 3/3测试通过 |
| 平均响应时间 | - | 0.20秒 |
测试覆盖功能
- ✅ Redis连接和消息队列操作
- ✅ 发布订阅通信机制
- ✅ 多代理架构通信
- ✅ 代码生成和验证流程
- ✅ Mock LLM服务集成
- ✅ 错误处理和重试机制
- ✅ 系统监控和健康检查
详细测试报告请参考: test/TEST_SUMMARY.md
🔧 配置
环境变量
LLM服务
SERVICE_NAME: 服务名称 (coder/checker/debugger)LLM_SERVER_URL: 本地LLM服务器URLQUEUE_TYPE: 消息队列类型 (redis/rabbitmq)REDIS_HOST: Redis主机REDIS_PORT: Redis端口SERVICE_PORT: 服务端口号
LangGraph应用
QUEUE_TYPE: 消息队列类型REDIS_HOST: Redis主机REDIS_PORT: Redis端口MONITORING_PORT: 监控端口
🧪 测试
运行完整的测试套件:
# 安装测试依赖 pip install pytest pytest-asyncio # 运行所有测试 python -m pytest test_system.py -v # 运行特定测试类别 python -m pytest test_system.py::TestIntegration -v # 集成测试 python -m pytest test_system.py::TestPerformance -v # 性能测试
📊 监控和可观测性
Prometheus指标
- 请求计数和延迟
- 错误率和类型
- 队列深度
- 系统资源使用率
Grafana仪表板
预配置的仪表板包括:
- 系统概览
- 服务性能
- 错误分析
- 队列监控
访问:http://localhost:3000 (admin/admin)
日志
所有服务都配置结构化日志,存储在:/app/logs/
🔒 错误处理和容错
断路器模式
- 自动检测服务故障
- 防止级联故障
- 自动恢复机制
重试机制
- 指数退避重试
- 可配置重试策略
- 死信队列处理
超时处理
- 请求超时保护
- 优雅降级
- 错误恢复
🚀 扩展性
水平扩展
- 每个服务可以独立扩展
- 消息队列负载均衡
- 无状态服务设计
垂直扩展
- 资源配置优化
- 批处理支持
- 缓存策略
🔧 开发
项目结构
.
├── message_queue_client.py # 消息队列客户端
├── local_llm_service_restful.py # LLM服务层
├── langgraph_message_queue_nodes.py # LangGraph节点
├── error_handling.py # 错误处理
├── monitoring.py # 监控和可观测性
├── main_langgraph.py # 主应用入口
├── simple_llm_server.py # 本地LLM服务器
├── test_system.py # 系统测试
├── docker-compose.yml # Docker编排
└── requirements-*.txt # 依赖文件
添加新服务
- 创建服务类继承
LocalLLMRestfulService - 实现
process_request方法 - 创建FastAPI应用
- 更新Docker配置
自定义消息队列
实现 MessageQueueClient 抽象类:
class CustomMessageQueue(MessageQueueClient): def publish(self, topic: str, message: Message) -> None: # 实现发布逻辑 pass def subscribe(self, topic: str, callback: Callable[[Message], None]) -> None: # 实现订阅逻辑 pass def close(self) -> None: # 实现关闭逻辑 pass
📚 相关文档
🤝 贡献
- Fork项目
- 创建特性分支
- 提交更改
- 推送到分支
- 创建Pull Request
📄 许可证
MIT License
🆘 故障排除
常见问题
服务无法连接
- 检查网络配置
- 验证端口是否开放
- 查看服务日志
消息队列连接失败
- 检查Redis/RabbitMQ状态
- 验证连接参数
- 检查防火墙设置
代码生成失败
- 检查LLM服务器状态
- 验证API调用格式
- 查看错误日志
调试工具
# 检查服务状态 curl http://localhost:8004/health # 查看Prometheus指标 curl http://localhost:9090/metrics # 检查队列状态 redis-cli ping redis-cli pubsub channels
📞 支持
如有问题,请:
- 查看日志文件
- 检查监控仪表板
- 提交Issue到项目仓库