MX-RMQ 是一个高性能、可靠的基于Redis的分布式消息队列系统,支持普通消息、延时消息、优先级消息,具备完善的监控和重试机制。
目前已基本生产可用。
目录
特性概览
- 🚀 高性能: 基于Redis的内存存储,支持10,000+消息/秒的吞吐量
- 🔄 可靠性: 原子性Lua脚本操作,保证消息不丢失
- ⏰ 延时消息: 支持任意时间延迟的消息调度
- 🏷️ 优先级: 支持高、中、低优先级消息处理
- 🔁 自动重试: 可配置的重试机制和指数退避
- 💀 死信队列: 失败消息自动进入死信队列,支持人工干预
- 📊 监控指标: 实时监控队列状态、处理时间、吞吐率等
- 🛑 优雅停机: 支持优雅停机,确保消息处理完成
- 🔧 易于使用: 简洁的API设计,开箱即用
设计限制
- ❌ 不支持消费者组: 每个topic只能被单一消费者组负载均衡消费,如需多组消费同一消息,请创建多个topic并投递多次消息
快速开始
启动 redis
docker run -d --name redis8 -p 6379:6379 redis:8 redis-server
30秒快速体验
import asyncio from mx_rmq import MQConfig, RedisMessageQueue async def handle_order(payload: dict) -> None: """处理订单消息""" print(f"处理订单: {payload['order_id']}") # 你的业务逻辑 await asyncio.sleep(1) async def main(): # 创建消息队列 mq = RedisMessageQueue() # 注册消息处理器 mq.register_handler("order_created", handle_order) # 生产消息 await mq.produce("order_created", { "order_id": "ORD_123", "user_id": 456, "amount": 99.99 }) # 非阻塞启动(推荐) await mq.start_background() # 等待消息处理 await asyncio.sleep(2) # 优雅停止 await mq.stop() if __name__ == "__main__": asyncio.run(main())
安装
使用 uv (推荐)
# 添加到现有项目 uv add mx-rmq # 或者从源码安装 git clone https://github.com/CodingOX/mx-rmq.git cd mx-rmq uv sync
使用 pip
pip install mx-rmq
# 或从源码安装
pip install git+https://github.com/CodingOX/mx-rmq.git系统要求
- Python 3.12+
- Redis 5.0+ 【推荐 Redis 7.4+】
基本使用
1. 创建消息队列
from mx_rmq import MQConfig, RedisMessageQueue # 使用默认配置 mq = RedisMessageQueue() # 或自定义配置 config = MQConfig( redis_host="redis://localhost:6379", max_workers=10, task_queue_size=20 ) mq = RedisMessageQueue(config)
2. 注册消息处理器
# 方式1: 使用装饰器 @mq.register_handler("user_registration") async def handle_user_registration(payload: dict) -> None: user_id = payload['user_id'] email = payload['email'] print(f"欢迎新用户: {user_id} ({email})") # 方式2: 直接注册 async def handle_payment(payload: dict) -> None: print(f"处理支付: {payload}") mq.register_handler("payment_completed", handle_payment)
3. 生产消息
# 生产普通消息 message_id = await mq.produce("user_registration", { "user_id": 12345, "email": "user@example.com", "timestamp": "2024-01-01T00:00:00Z" }) print(f"消息已发送: {message_id}")
4. 启动消费者
MX-RMQ 提供多种方式启动消费者,以适应不同使用场景:
方式1: 非阻塞启动(推荐)
# 非阻塞启动,立即返回控制权 background_task = await mq.start_background() # 可以继续执行其他操作 await mq.produce("test_topic", {"message": "Hello World"}) # 等待一段时间或执行其他任务 await asyncio.sleep(10) # 优雅停止 await mq.stop()
方式2: 阻塞式启动(传统方式)
# 启动消费者(会阻塞,直到收到停机信号) await mq.start()
方式3: 异步上下文管理器
# 使用异步上下文管理器自动管理资源
async with RedisMessageQueue() as mq:
mq.register_handler("test_topic", handle_message)
await mq.start_background()
await mq.produce("test_topic", {"message": "Hello World"})
await asyncio.sleep(5)
# 自动停止和清理资源
方式4: 同步运行(简单场景)
# 同步运行指定时长
mq = RedisMessageQueue()
mq.register_handler("test_topic", handle_message)
mq.run(duration=10.0) # 运行10秒后自动停止
高级功能
延时消息
# 5分钟后发送提醒 await mq.produce( topic="send_reminder", payload={"user_id": 123, "type": "payment_due"}, delay=300 # 300秒后执行 ) # 1小时后发送邮件 await mq.produce( topic="send_email", payload={ "to": "user@example.com", "subject": "订单确认", "body": "感谢您的订单..." }, delay=3600 # 1小时后执行 )
优先级消息
from mx_rmq import MessagePriority # 高优先级消息(优先处理) await mq.produce( topic="system_alert", payload={"level": "critical", "message": "系统告警"}, priority=MessagePriority.HIGH ) # 普通优先级(默认) await mq.produce( topic="user_activity", payload={"user_id": 123, "action": "login"}, priority=MessagePriority.NORMAL ) # 低优先级消息(最后处理) await mq.produce( topic="analytics_data", payload={"event": "page_view", "page": "/home"}, priority=MessagePriority.LOW )
自定义重试配置
config = MQConfig( redis_url="redis://localhost:6379", max_retries=5, # 最大重试5次 retry_delays=[30, 60, 300, 900, 1800], # 重试间隔:30s, 1m, 5m, 15m, 30m processing_timeout=300, # 5分钟处理超时 ) mq = RedisMessageQueue(config)
消息生存时间(TTL)
# 设置消息1小时后过期 await mq.produce( topic="temp_notification", payload={"message": "临时通知"}, ttl=3600 # 1小时后过期 )
批量生产消息
# 批量发送多个消息
messages = [
{"topic": "order_created", "payload": {"order_id": f"ORD_{i}"}}
for i in range(100)
]
for msg in messages:
await mq.produce(msg["topic"], msg["payload"])
配置参考
MQConfig 完整参数
from mx_rmq import MQConfig
config = MQConfig(
# Redis 连接配置
redis_host="redis://localhost:6379", # Redis连接URL
redis_db=0, # Redis数据库编号 (0-15)
redis_password=None, # Redis密码
queue_prefix="", # 队列前缀,用于多环境隔离
connection_pool_size=20, # 连接池大小
# 消费者配置
max_workers=5, # 最大工作协程数
task_queue_size=8, # 本地任务队列大小
# 消息生命周期配置
message_ttl=86400, # 消息TTL(秒),默认24小时
processing_timeout=180, # 消息处理超时(秒),默认3分钟
# 重试配置
max_retries=3, # 最大重试次数
retry_delays=[60, 300, 1800], # 重试延迟间隔(秒)
# 死信队列配置
enable_dead_letter=True, # 是否启用死信队列
# 监控配置
monitor_interval=30, # 监控检查间隔(秒)
expired_check_interval=10, # 过期消息检查间隔(秒)
processing_monitor_interval=30, # Processing队列监控间隔(秒)
batch_size=100, # 批处理大小
)
环境变量配置
支持通过环境变量配置:
export REDIS_URL="redis://localhost:6379" export REDIS_PASSWORD="your_password" export MQ_MAX_WORKERS=10 export MQ_TASK_QUEUE_SIZE=20 export MQ_MESSAGE_TTL=86400
import os from mx_rmq import MQConfig config = MQConfig( redis_host=os.getenv("REDIS_URL", "localhost"), redis_port=os.getenv("REDIS_PORT","6379"), redis_password=os.getenv("REDIS_PASSWORD"), max_workers=int(os.getenv("MQ_MAX_WORKERS", "5")), task_queue_size=int(os.getenv("MQ_TASK_QUEUE_SIZE", "8")), message_ttl=int(os.getenv("MQ_MESSAGE_TTL", "86400")), )
API 参考
RedisMessageQueue 类
初始化
def __init__(self, config: MQConfig | None = None) -> None: """ 初始化消息队列 Args: config: 消息队列配置,如为None则使用默认配置 """
核心方法
async def produce( self, topic: str, payload: dict[str, Any], delay: int = 0, priority: MessagePriority = MessagePriority.NORMAL, ttl: int | None = None, message_id: str | None = None, ) -> str: """ 生产消息 Args: topic: 主题名称 payload: 消息负载(必须是可JSON序列化的字典) delay: 延迟执行时间(秒),0表示立即执行 priority: 消息优先级 ttl: 消息生存时间(秒),None使用配置默认值 message_id: 消息ID,None则自动生成UUID Returns: 消息ID(字符串) Raises: ValueError: 参数验证失败 RedisError: Redis操作失败 """ def register_handler(self, topic: str, handler: Callable) -> None: """ 注册消息处理器 Args: topic: 主题名称 handler: 处理函数,必须是async函数,接受一个dict参数 Raises: ValueError: 处理器不是可调用对象 """ async def start(self) -> None: """ 启动消息分发和消费(阻塞式) 此方法会阻塞,直到收到停机信号(SIGINT/SIGTERM) Raises: RuntimeError: 系统未正确初始化 RedisError: Redis连接错误 """ async def start_background(self) -> Task: """ 启动消息分发和消费(非阻塞式) 此方法不会阻塞,立即返回一个Task对象 Returns: Task: 后台任务对象 Raises: RuntimeError: 系统未正确初始化 RedisError: Redis连接错误 """ async def stop(self) -> None: """ 停止消息队列处理 优雅地停止所有后台任务并清理资源 """ async def initialize(self) -> None: """ 手动初始化消息队列 通常在start/start_background之前自动调用 """ async def cleanup(self) -> None: """ 清理资源,关闭Redis连接池 """ def run(self, duration: float | None = None) -> None: """ 同步运行消息队列 Args: duration: 运行时长(秒),None表示无限运行直到收到信号 """ async def health_check(self) -> dict[str, Any]: """ 执行健康检查 Returns: dict: 包含健康状态信息的字典 """
属性
@property def status(self) -> dict[str, Any]: """ 获取队列状态信息 Returns: dict: 包含运行状态、初始化状态、活跃任务数等信息 """ @property def is_running(self) -> bool: """ 检查队列是否正在运行 Returns: bool: 队列运行状态 """
Message 类
@dataclass class Message: """消息数据类""" id: str # 消息唯一ID version: str # 消息格式版本 topic: str # 主题名称 payload: dict[str, Any] # 消息负载 priority: MessagePriority # 消息优先级 created_at: int # 创建时间戳(毫秒) meta: MessageMeta # 消息元数据 @dataclass class MessageMeta: """消息元数据""" status: MessageStatus # 消息状态 retry_count: int # 重试次数 max_retries: int # 最大重试次数 retry_delays: list[int] # 重试延迟配置 last_error: str | None # 最后一次错误信息 expire_at: int # 过期时间戳 # ... 其他元数据字段
枚举类型
class MessagePriority(str, Enum): """消息优先级""" HIGH = "high" # 高优先级 NORMAL = "normal" # 普通优先级 LOW = "low" # 低优先级 class MessageStatus(str, Enum): """消息状态""" PENDING = "pending" # 待处理 PROCESSING = "processing" # 处理中 COMPLETED = "completed" # 已完成 RETRYING = "retrying" # 重试中 DEAD_LETTER = "dead_letter" # 死信
监控和管理
指标收集
from mx_rmq import MetricsCollector # 创建指标收集器 collector = MetricsCollector(redis=mq.redis, queue_prefix=config.queue_prefix) # 收集所有指标 metrics = await collector.collect_all_metrics(["order_created", "user_registration"]) # 打印关键指标 print(f"待处理消息: {metrics['queue.order_created.pending']}") print(f"处理中消息: {metrics['queue.order_created.processing']}") print(f"总吞吐量: {metrics['throughput.messages_per_minute']}") print(f"死信队列: {metrics['queue.dlq.count']}")
队列监控
# 监控单个队列 queue_metrics = await collector.collect_queue_metrics(["order_created"]) print(f"订单队列状态: {queue_metrics}") # 监控处理性能 processing_metrics = await collector.collect_processing_metrics(["order_created"]) print(f"平均处理时间: {processing_metrics['order_created.avg_processing_time']}ms")
死信队列管理
# 查看死信队列 dlq_count = await mq.redis.llen("dlq:queue") print(f"死信队列消息数: {dlq_count}") # 获取死信消息列表 dlq_messages = await mq.redis.lrange("dlq:queue", 0, 9) # 获取前10条 for msg_id in dlq_messages: payload = await mq.redis.hget("dlq:payload:map", msg_id) print(f"死信消息: {msg_id} - {payload}") # 手动重试死信消息(需要自定义实现) async def retry_dead_message(message_id: str): # 从死信队列获取消息 payload_json = await mq.redis.hget("dlq:payload:map", message_id) if payload_json: # 解析消息并重新生产 message = json.loads(payload_json) await mq.produce(message["topic"], message["payload"]) # 从死信队列移除 await mq.redis.lrem("dlq:queue", 1, message_id) await mq.redis.hdel("dlq:payload:map", message_id)
实时监控脚本
import asyncio import time async def monitor_loop(): """实时监控循环""" collector = MetricsCollector(redis=mq.redis) while True: try: # 收集指标 metrics = await collector.collect_all_metrics(["order_created"]) # 输出关键指标 print(f"[{time.strftime('%H:%M:%S')}] 队列状态:") print(f" 待处理: {metrics.get('queue.order_created.pending', 0)}") print(f" 处理中: {metrics.get('queue.order_created.processing', 0)}") print(f" 死信队列: {metrics.get('queue.dlq.count', 0)}") # 检查告警条件 pending = metrics.get('queue.order_created.pending', 0) if pending > 100: print(f"⚠️ 告警: 待处理消息积压 ({pending})") dlq_count = metrics.get('queue.dlq.count', 0) if dlq_count > 10: print(f"🚨 告警: 死信队列消息过多 ({dlq_count})") except Exception as e: print(f"监控错误: {e}") await asyncio.sleep(10) # 每10秒检查一次 # 启动监控 asyncio.create_task(monitor_loop())
部署指南
高可用部署
Redis Sentinel 配置:
import redis.sentinel # 配置Sentinel sentinels = [ ('sentinel1', 26379), ('sentinel2', 26379), ('sentinel3', 26379), ] sentinel = redis.sentinel.Sentinel(sentinels, socket_timeout=0.1) # 发现主节点 redis_master = sentinel.master_for('mymaster', socket_timeout=0.1) # 自定义Redis连接 config = MQConfig(redis_url="") # 留空,使用自定义连接 mq = RedisMessageQueue(config) mq.redis = redis_master # 使用Sentinel管理的连接
监控和告警
Prometheus 指标暴露:
from prometheus_client import start_http_server, Gauge, Counter # 定义指标 queue_size = Gauge('mq_queue_size', 'Queue size', ['topic', 'status']) messages_processed = Counter('mq_messages_processed_total', 'Messages processed', ['topic', 'status']) async def export_metrics(): """导出Prometheus指标""" collector = MetricsCollector(redis=mq.redis) while True: metrics = await collector.collect_all_metrics(['order_created']) # 更新Prometheus指标 queue_size.labels(topic='order_created', status='pending').set( metrics.get('queue.order_created.pending', 0) ) queue_size.labels(topic='order_created', status='processing').set( metrics.get('queue.order_created.processing', 0) ) await asyncio.sleep(30) # 启动Prometheus HTTP服务器 start_http_server(8000) asyncio.create_task(export_metrics())
最佳实践
1. 消息设计
✅ 推荐做法:
# 消息结构清晰,包含必要的上下文信息 await mq.produce("order_created", { "order_id": "ORD_123456", "user_id": 789, "total_amount": 99.99, "currency": "USD", "timestamp": "2024-01-01T12:00:00Z", "metadata": { "source": "web", "version": "v1.0" } })
❌ 避免做法:
# 消息过于简单,缺少上下文 await mq.produce("process", {"id": 123}) # 消息过于复杂,包含大量数据 await mq.produce("user_update", { "user": {...}, # 包含用户的所有信息 "history": [...], # 包含完整历史记录 "related_data": {...} # 包含大量关联数据 })
2. 错误处理
✅ 推荐做法:
async def handle_payment(payload: dict) -> None: try: order_id = payload["order_id"] amount = payload["amount"] # 参数验证 if not order_id or amount <= 0: raise ValueError(f"无效的订单参数: {payload}") # 业务逻辑 result = await process_payment(order_id, amount) # 记录成功日志 logger.info("支付处理成功", order_id=order_id, amount=amount) except ValueError as e: # 参数错误,不重试 logger.error("支付参数错误", error=str(e), payload=payload) raise # 重新抛出,进入死信队列 except PaymentGatewayError as e: # 外部服务错误,可重试 logger.warning("支付网关错误", error=str(e), order_id=order_id) raise # 重新抛出,触发重试 except Exception as e: # 未知错误 logger.error("支付处理失败", error=str(e), order_id=order_id) raise
3. 幂等性处理
async def handle_order_created(payload: dict) -> None: order_id = payload["order_id"] # 检查是否已处理(幂等性保护) if await is_order_processed(order_id): logger.info("订单已处理,跳过", order_id=order_id) return try: # 处理订单 await process_order(order_id) # 标记为已处理 await mark_order_processed(order_id) except Exception as e: logger.error("订单处理失败", order_id=order_id, error=str(e)) raise
4. 性能优化
工作协程数调优:
import os import multiprocessing # 根据CPU核心数和IO特性调整工作协程数 cpu_count = multiprocessing.cpu_count() config = MQConfig( # CPU密集型任务:工作协程数 = CPU核心数 max_workers=cpu_count if is_cpu_intensive else cpu_count * 2, # IO密集型任务:工作协程数 = CPU核心数 * 2-4 # max_workers=cpu_count * 3, # 任务队列大小应该大于工作协程数 task_queue_size=max_workers * 2, )
批量处理优化:
async def handle_batch_emails(payload: dict) -> None: """批量处理邮件发送""" email_list = payload["emails"] # 分批处理,避免内存占用过大 batch_size = 10 for i in range(0, len(email_list), batch_size): batch = email_list[i:i + batch_size] # 并发发送邮件 tasks = [send_email(email) for email in batch] await asyncio.gather(*tasks, return_exceptions=True) # 避免过快的请求 await asyncio.sleep(0.1)
5. API使用建议
推荐使用非阻塞API
# 推荐:使用非阻塞启动 async def recommended_usage(): mq = RedisMessageQueue() mq.register_handler("topic", message_handler) # 非阻塞启动 task = await mq.start_background() # 可以继续执行其他操作 await do_other_work() # 优雅停止 await mq.stop() # 不推荐:使用阻塞式启动(除非有特殊需求) async def legacy_usage(): mq = RedisMessageQueue() mq.register_handler("topic", message_handler) # 会阻塞直到收到信号 await mq.start()
使用异步上下文管理器自动管理资源
# 推荐:使用异步上下文管理器
async def context_manager_usage():
async with RedisMessageQueue() as mq:
mq.register_handler("topic", message_handler)
await mq.start_background()
await mq.produce("topic", {"data": "example"})
await asyncio.sleep(5)
# 自动清理资源
合理使用同步API
# 适用于简单场景的同步API
def simple_usage():
mq = RedisMessageQueue()
mq.register_handler("topic", message_handler)
# 运行10秒后自动停止
mq.run(duration=10.0)
6. 多组消费的实现方案
由于系统不支持消费者组功能,如需实现多组消费同一消息,建议采用以下方案:
✅ 推荐做法:
# 方案1:创建多个topic,发送多次消息 async def send_order_created(order_data: dict): """发送订单创建消息到多个处理组""" # 发送到不同的处理组 await mq.produce("order_created_payment", order_data) # 支付处理组 await mq.produce("order_created_inventory", order_data) # 库存处理组 await mq.produce("order_created_analytics", order_data) # 分析处理组 await mq.produce("order_created_notification", order_data) # 通知处理组 # 注册不同的处理器 @mq.register_handler("order_created_payment") async def handle_payment_processing(payload: dict): """处理支付相关逻辑""" await process_payment(payload) @mq.register_handler("order_created_inventory") async def handle_inventory_processing(payload: dict): """处理库存相关逻辑""" await update_inventory(payload) @mq.register_handler("order_created_analytics") async def handle_analytics_processing(payload: dict): """处理分析相关逻辑""" await update_analytics(payload) @mq.register_handler("order_created_notification") async def handle_notification_processing(payload: dict): """处理通知相关逻辑""" await send_notifications(payload)
方案2:使用统一的分发器
# 创建一个分发器topic @mq.register_handler("order_created") async def order_dispatcher(payload: dict): """订单消息分发器""" order_id = payload["order_id"] # 并发分发到各个处理组 tasks = [ mq.produce("order_payment", payload), mq.produce("order_inventory", payload), mq.produce("order_analytics", payload), mq.produce("order_notification", payload), ] try: await asyncio.gather(*tasks) logger.info("订单消息分发成功", order_id=order_id) except Exception as e: logger.error("订单消息分发失败", order_id=order_id, error=str(e)) raise
方案3:使用topic命名规范
# 使用统一的命名规范 TOPIC_PATTERNS = { "order_created": [ "order_created.payment", "order_created.inventory", "order_created.analytics", "order_created.notification" ] } async def broadcast_message(base_topic: str, payload: dict): """广播消息到多个相关topic""" topics = TOPIC_PATTERNS.get(base_topic, [base_topic]) tasks = [mq.produce(topic, payload) for topic in topics] await asyncio.gather(*tasks) logger.info("消息广播完成", base_topic=base_topic, target_topics=topics) # 使用示例 await broadcast_message("order_created", order_data)
6. 监控和告警
async def setup_monitoring(): """设置监控和告警""" collector = MetricsCollector(redis=mq.redis) while True: try: metrics = await collector.collect_all_metrics(["order_created"]) # 队列积压告警 pending = metrics.get('queue.order_created.pending', 0) if pending > 1000: await send_alert(f"队列积压严重: {pending} 条消息待处理") # 死信队列告警 dlq_count = metrics.get('queue.dlq.count', 0) if dlq_count > 50: await send_alert(f"死信队列消息过多: {dlq_count} 条") # 处理时间告警 avg_time = metrics.get('processing.order_created.avg_time', 0) if avg_time > 30000: # 30秒 await send_alert(f"消息处理时间过长: {avg_time}ms") except Exception as e: logger.error("监控检查失败", error=str(e)) await asyncio.sleep(60) # 每分钟检查一次
故障排除
常见问题
Q1: 消息丢失怎么办?
症状: 发送的消息没有被处理
可能原因:
- Redis 连接中断
- 消费者没有正确启动
- 消息处理器抛出异常但没有正确处理
解决方案:
# 1. 检查Redis连接 try: await mq.redis.ping() print("Redis连接正常") except Exception as e: print(f"Redis连接失败: {e}") # 2. 检查消息是否在队列中 pending_count = await mq.redis.llen("order_created:pending") processing_count = await mq.redis.llen("order_created:processing") print(f"待处理: {pending_count}, 处理中: {processing_count}") # 3. 检查死信队列 dlq_count = await mq.redis.llen("dlq:queue") print(f"死信队列: {dlq_count}")
Q2: 消息处理过慢
症状: 队列积压,消息处理不及时
可能原因:
- 工作协程数不足
- 处理函数执行时间过长
- Redis性能瓶颈
解决方案:
# 1. 增加工作协程数 config = MQConfig(max_workers=20) # 增加到20个 # 2. 优化处理函数 async def optimized_handler(payload: dict) -> None: # 使用异步IO async with aiohttp.ClientSession() as session: response = await session.post(url, json=payload) # 避免阻塞操作 await asyncio.to_thread(blocking_operation, payload) # 3. 监控处理时间 import time async def timed_handler(payload: dict) -> None: start_time = time.time() try: await actual_handler(payload) finally: processing_time = time.time() - start_time if processing_time > 5: # 处理时间超过5秒 logger.warning("处理时间过长", time=processing_time, payload=payload)
Q3: 内存使用过高
症状: 应用内存持续增长
可能原因:
- 本地队列积压
- 消息对象没有正确释放
- Redis连接池过大
解决方案:
# 1. 调整队列大小 config = MQConfig( task_queue_size=10, # 减少本地队列大小 connection_pool_size=10, # 减少连接池大小 ) # 2. 监控内存使用 import psutil import gc async def memory_monitor(): while True: process = psutil.Process() memory_mb = process.memory_info().rss / 1024 / 1024 if memory_mb > 500: # 内存超过500MB logger.warning("内存使用过高", memory_mb=memory_mb) gc.collect() # 强制垃圾回收 await asyncio.sleep(60)
性能诊断
延迟分析
import time from collections import defaultdict class PerformanceAnalyzer: def __init__(self): self.metrics = defaultdict(list) async def analyze_handler(self, handler_name: str, handler_func): """分析处理器性能""" async def wrapped_handler(payload: dict): start_time = time.time() try: result = await handler_func(payload) return result finally: end_time = time.time() processing_time = (end_time - start_time) * 1000 # 毫秒 self.metrics[handler_name].append(processing_time) # 定期输出统计信息 if len(self.metrics[handler_name]) % 100 == 0: times = self.metrics[handler_name] avg_time = sum(times) / len(times) max_time = max(times) min_time = min(times) print(f"{handler_name} 性能统计 (最近100次):") print(f" 平均时间: {avg_time:.2f}ms") print(f" 最大时间: {max_time:.2f}ms") print(f" 最小时间: {min_time:.2f}ms") return wrapped_handler # 使用示例 analyzer = PerformanceAnalyzer() @mq.register_handler("order_created") async def handle_order(payload: dict): # 处理逻辑 await process_order(payload) # 包装处理器进行性能分析 mq.handlers["order_created"] = await analyzer.analyze_handler( "order_created", handle_order )