GitHub - cyberFlowTech/zapry-agents-sdk-python: Lightweight Python SDK for building bots on Zapry platform with automatic Telegram API compatibility

轻量级 Python SDK,用于在 Zapry 平台构建 Bot。

基于 python-telegram-bot,自动处理 Zapry 与 Telegram API 的兼容性差异, 让开发者专注于业务逻辑。

特性

  • Zapry 兼容层 — 自动修复 User/Chat/Update 数据格式差异
  • 双平台支持 — Telegram 和 Zapry 平台一键切换
  • Handler 装饰器 — 简洁的命令、回调、消息注册方式
  • 模块化注册HandlerRegistry 支持分模块管理 Handler
  • 灵活配置 — 从 .env 或代码直接构造配置
  • Webhook + Polling — 两种运行模式开箱即用
  • 主动触发调度器ProactiveScheduler 定时触发主动消息,支持自定义触发器
  • 反馈检测框架FeedbackDetector 自动检测用户反馈信号,调整回复风格
  • 偏好注入工具build_preference_prompt() 将偏好转为 AI system prompt
  • Middleware 管道 — 洋葱模型中间件,支持 before/after、拦截、上下文传递
  • Tool Calling 框架@tool 装饰器自动生成 JSON schema,ToolRegistry 统一管理
  • OpenAI 适配器OpenAIToolAdapter 一键对接 OpenAI function calling
  • Memory 持久化 — 三层记忆模型(工作/短期/长期),可插拔存储,自动提取,Zapry 云端预留
  • Agent Loop — ReAct 自动推理循环,LLM 自主调用工具直到产出最终回答
  • Guardrails 安全护栏 — Input/Output 护栏 + Tripwire 机制,防 prompt injection/内容泄露
  • Tracing 结构化追踪 — agent/llm/tool/guardrail Span 层级追踪,可导出到 OpenTelemetry
  • MCP Client — 连接任意 MCP 服务器(Stdio/HTTP),自动发现工具并注入 ToolRegistry,与 AgentLoop 无缝集成

快速开始

安装

pip install -e /path/to/zapry-agents-sdk-python

最小示例

from zapry_agents_sdk import ZapryAgent, AgentConfig

config = AgentConfig.from_env()
bot = ZapryAgent(config)

@bot.command("start")
async def start(update, context):
    await update.message.reply_text("Hello from Zapry Bot!")

@bot.command("help")
async def help_cmd(update, context):
    await update.message.reply_text("Available commands: /start, /help")

bot.run()

使用 HandlerRegistry(分模块)

# handlers/tarot.py
from zapry_agents_sdk.helpers import HandlerRegistry

tarot = HandlerRegistry()

@tarot.command("tarot")
async def tarot_command(update, context):
    await update.message.reply_text("🎴 抽牌中...")

@tarot.callback("^reveal_card_")
async def reveal_card(update, context):
    ...

# main.py
from zapry_agents_sdk import ZapryAgent, AgentConfig
from handlers.tarot import tarot

bot = ZapryAgent(AgentConfig.from_env())
bot.register(tarot)
bot.run()

环境变量

变量 说明 默认值
TG_PLATFORM 平台 (telegram / zapry) telegram
TELEGRAM_BOT_TOKEN Telegram Bot Token
ZAPRY_BOT_TOKEN Zapry Bot Token
ZAPRY_API_BASE_URL Zapry API 地址 https://openapi.mimo.immo/bot
RUNTIME_MODE 运行模式 (webhook / polling) webhook
TELEGRAM_WEBHOOK_URL Telegram Webhook URL
ZAPRY_WEBHOOK_URL Zapry Webhook URL
WEBHOOK_PATH Webhook 路径
WEBAPP_HOST 监听地址 0.0.0.0
WEBAPP_PORT 监听端口 8443
DEBUG 调试模式 false

主动触发 & 自我反思

ProactiveScheduler — 主动消息调度器

让 Bot 主动关心用户,定时检查触发条件并发送消息。

from zapry_agents_sdk import ProactiveScheduler

# 创建调度器(60 秒轮询一次)
scheduler = ProactiveScheduler(
    interval=60,
    send_fn=my_send_message,  # async def send(user_id, text)
)

# 方式 1:装饰器注册触发器
@scheduler.trigger("daily_greeting")
async def check_greeting(ctx):
    if ctx.now.hour == 12 and ctx.now.minute <= 30:
        return ["user_001", "user_002"]  # 需要发送的用户
    return []

@check_greeting.message
async def greeting_msg(ctx, user_id):
    return f"中午好~ 今天状态怎么样?"

# 方式 2:编程式注册
scheduler.add_trigger("birthday", check_fn, message_fn)

# 生命周期
await scheduler.start()   # 启动后台轮询
await scheduler.stop()    # 停止

# 用户级开关
await scheduler.enable_user("user_001")
await scheduler.disable_user("user_001")

FeedbackDetector — 反馈检测 & 偏好调整

从用户消息中检测反馈信号(如"太长了"→简洁风格),自动调整偏好。

from zapry_agents_sdk import FeedbackDetector, build_preference_prompt

detector = FeedbackDetector()

# 检测反馈信号
result = detector.detect("太长了,说重点")
# result.matched => True
# result.changes => {"style": "concise"}

# 一步完成检测 + 更新偏好
prefs = {"style": "balanced"}
await detector.detect_and_adapt("user_001", "太长了", prefs)
# prefs => {"style": "concise", "updated_at": "..."}

# 自定义关键词(默认中文,可覆盖)
detector.add_pattern("language", "english", ["speak english", "in english"])

# 偏好注入 AI prompt
prompt = build_preference_prompt({"style": "concise", "tone": "casual"})
# => "回复风格偏好:\n这位用户偏好简洁的回复..."

与 ZapryAgent 集成

bot = ZapryAgent(config)
scheduler = ProactiveScheduler(interval=60)
detector = FeedbackDetector()

@bot.on_post_init
async def post_init(app):
    scheduler.send_fn = lambda uid, text: app.bot.send_message(int(uid), text)
    await scheduler.start()

@bot.on_post_shutdown
async def shutdown(app):
    await scheduler.stop()

@bot.message()
async def on_message(update, context):
    user_id = str(update.effective_user.id)
    # 在回复后异步检测反馈
    result = await detector.detect_and_adapt(user_id, update.message.text, user_prefs)

Middleware 管道

洋葱模型中间件,每个 middleware 包裹下一层,可在 handler 前后执行逻辑。

from zapry_agents_sdk import ZapryAgent, AgentConfig

bot = ZapryAgent(AgentConfig.from_env())

# 注册中间件(按顺序包裹)
async def timer_middleware(ctx, next_fn):
    import time
    start = time.time()
    await next_fn()  # 调用下一层
    print(f"耗时: {time.time() - start:.3f}s")

async def auth_middleware(ctx, next_fn):
    if not is_authorized(ctx.update):
        return  # 不调用 next_fn → 拦截
    ctx.extra["role"] = "admin"
    await next_fn()

bot.use(timer_middleware)
bot.use(auth_middleware)

执行顺序: timer before → auth before → handler → auth after → timer after

Tool Calling 框架

LLM-agnostic 的工具注册、schema 管理与调用分发。

from zapry_agents_sdk.tools import tool, ToolRegistry

# @tool 装饰器自动从 type hints + docstring 生成 JSON schema
@tool
async def get_weather(city: str, unit: str = "celsius") -> str:
    """获取指定城市的当前天气。

    Args:
        city: 城市名称
        unit: 温度单位
    """
    return f"{city}: 25°C, 晴"

registry = ToolRegistry()
registry.register(get_weather)

# 导出 schema
schema = registry.to_json_schema()
openai_tools = registry.to_openai_schema()

# 执行工具
result = await registry.execute("get_weather", {"city": "上海"})

OpenAI Function Calling 适配器

from zapry_agents_sdk.tools.openai_adapter import OpenAIToolAdapter

adapter = OpenAIToolAdapter(registry)

# 1. 获取 tools 参数
tools_param = adapter.to_openai_tools()

# 2. 调用 OpenAI
response = await client.chat.completions.create(
    model="gpt-4o", messages=messages, tools=tools_param,
)

# 3. 处理 tool_calls
if response.choices[0].message.tool_calls:
    results = await adapter.handle_tool_calls(
        response.choices[0].message.tool_calls
    )
    messages.extend(adapter.results_to_messages(results))

Memory 持久化框架

三层记忆模型,按 {agent_id}:{user_id} 隔离,可插拔存储后端。

from zapry_agents_sdk.memory import MemorySession, InMemoryStore, SQLiteMemoryStore

# 创建 session(agent+user 隔离)
session = MemorySession(
    agent_id="my_agent",
    user_id="user_123",
    store=SQLiteMemoryStore("memory.db"),  # 或 InMemoryStore()
)

# 加载所有记忆
ctx = await session.load()
# ctx.short_term  → 对话历史
# ctx.long_term   → 用户档案
# ctx.working     → 会话临时数据

# 添加消息(自动持久化 + 缓冲区管理)
await session.add_message("user", "我今年25岁,在上海做程序员")
await session.add_message("assistant", "了解了~")

# 获取可注入 LLM 的 prompt
prompt = session.format_for_prompt()

# 自动记忆提取(需设置 extractor)
from zapry_agents_sdk.memory import LLMMemoryExtractor
session.extractor = LLMMemoryExtractor(llm_fn=my_llm_call)
await session.extract_if_needed()

# 手动更新长期记忆
await session.update_long_term({"basic_info": {"age": 25}})

# 清空
await session.clear_history()  # 只清对话
await session.clear_all()      # 清除所有

存储后端

后端 用途 持久化
InMemoryStore 开发/测试
SQLiteMemoryStore 本地生产
ZapryCloudStore Zapry 云端托管(预留)

与 Middleware 集成

async def memory_middleware(ctx, next_fn):
    session = MemorySession("bot", get_user_id(ctx.update), store)
    await session.load()
    ctx.extra["session"] = session
    await next_fn()
    await session.extract_if_needed()

bot.use(memory_middleware)

Agent Loop(自动推理循环)

ReAct 模式:LLM 自主决策调用工具、获取结果、再决策,直到产出最终回答。

from zapry_agents_sdk.agent import AgentLoop
from zapry_agents_sdk.tools import ToolRegistry, tool

@tool
async def get_weather(city: str) -> str:
    """获取天气。"""
    return f"{city}: 25°C"

registry = ToolRegistry()
registry.register(get_weather)

async def my_llm(messages, tools=None):
    response = await openai_client.chat.completions.create(
        model="gpt-4o", messages=messages, tools=tools,
    )
    return response.choices[0].message

loop = AgentLoop(
    llm_fn=my_llm,
    tool_registry=registry,
    system_prompt="You are a helpful assistant.",
    max_turns=10,  # 防止无限循环
)

result = await loop.run("上海天气怎么样?")
print(result.final_output)       # "上海现在 25°C,晴天。"
print(result.tool_calls_count)   # 1
print(result.total_turns)        # 2 (1次工具调用 + 1次最终回答)
print(result.stopped_reason)     # "completed"

事件钩子(可观测性)

from zapry_agents_sdk.agent import AgentHooks

hooks = AgentHooks(
    on_llm_start=lambda turn, msgs: print(f"Turn {turn}: calling LLM..."),
    on_tool_start=lambda name, args: print(f"Calling tool: {name}"),
    on_tool_end=lambda name, result, err: print(f"Tool result: {result}"),
    on_error=lambda e: print(f"Error: {e}"),
)
loop = AgentLoop(llm_fn=my_llm, tool_registry=registry, hooks=hooks)

与 Memory 集成

session = MemorySession("my_agent", "user_123", store)
ctx = await session.load()

result = await loop.run(
    "记住我的生日是10月15日",
    conversation_history=await session.short_term.get_history_dicts(),
    extra_context=session.format_for_prompt(),
)

MCP Client — 连接任意 MCP 服务器

SDK 内置 MCP(Model Context Protocol)客户端,让你的 Agent 连接任意 MCP 服务器,自动发现工具并通过标准 ToolRegistry 使用 —— AgentLoop 完全透明。

支持的传输方式:

  • HTTP — 远程/云端 MCP 服务器
  • Stdio — 本地 MCP 服务器(如 npx @modelcontextprotocol/server-filesystem

快速开始

from zapry_agents_sdk import MCPManager, MCPServerConfig, ToolRegistry, AgentLoop

mcp = MCPManager()

# 连接 MCP 服务器
await mcp.add_server(MCPServerConfig(
    name="filesystem",
    transport="stdio",
    command="npx",
    args=["-y", "@modelcontextprotocol/server-filesystem", "/tmp"],
))

await mcp.add_server(MCPServerConfig(
    name="search",
    transport="http",
    url="https://mcp.example.com/search",
    headers={"Authorization": "Bearer xxx"},
))

# 注入到 ToolRegistry(与本地工具共存)
registry = ToolRegistry()
registry.register(my_local_tool)   # 你的本地工具
mcp.inject_tools(registry)         # MCP 工具自动添加

# AgentLoop 透明使用 MCP 工具
loop = AgentLoop(llm_fn=my_llm, tool_registry=registry)
result = await loop.run("读取 /tmp/data.txt")

# 清理
await mcp.disconnect_all()

工具过滤

过滤匹配原始 MCP 工具名(不是注入后的 SDK 名),支持 fnmatch 通配符:

await mcp.add_server(MCPServerConfig(
    name="filesystem",
    transport="stdio",
    command="npx",
    args=["-y", "@modelcontextprotocol/server-filesystem", "/tmp"],
    allowed_tools=["read_*", "list_*"],   # 只允许读/列表工具
    blocked_tools=["write_*", "delete_*"], # 阻止危险工具
    max_tools=10,                           # 限制上下文大小
))

工具命名

MCP 工具注入时添加 mcp.{server}.{tool} 前缀:

  • read_file on server filesystemmcp.filesystem.read_file
  • query on server databasemcp.database.query

注入行为

  • 幂等:多次调用 inject_tools() 安全(先移除旧的再注入)
  • 精确移除remove_tools() 只移除 MCP 注入的工具,不影响本地工具
  • 调用方控制:注入由你决定,不在 AgentLoop.run() 中 —— 无并发问题

Schema 保真

MCP 工具的 inputSchema 原样保留在 ToolDef.raw_json_schema 中。发送给 LLM 时使用原始 JSON Schema(包括嵌套对象、oneOfenum 等),不做有损转换。


项目结构

zapry-agents-sdk/
├── pyproject.toml
├── README.md
├── zapry_agents_sdk/
│   ├── __init__.py          # 包入口
│   ├── core/
│   │   ├── bot.py           # ZapryAgent 主类(含 middleware 集成)
│   │   ├── config.py        # AgentConfig 配置
│   │   └── middleware.py    # Middleware 洋葱管道
│   ├── helpers/
│   │   └── handler_registry.py  # Handler 注册装饰器 & Registry
│   ├── proactive/
│   │   ├── scheduler.py     # ProactiveScheduler 主动消息调度器
│   │   └── feedback.py      # FeedbackDetector 反馈检测 & 偏好注入
│   ├── tools/
│   │   ├── registry.py      # @tool 装饰器 + ToolRegistry + schema 生成
│   │   └── openai_adapter.py # OpenAI function calling 适配器
│   ├── agent/
│   │   └── loop.py          # AgentLoop ReAct 推理循环(含 Guardrails + Tracing)
│   ├── guardrails/
│   │   └── engine.py        # Guardrails 安全护栏 + Tripwire 机制
│   ├── tracing/
│   │   └── engine.py        # 结构化 Span 追踪系统
│   ├── mcp/
│   │   ├── __init__.py      # MCP 模块入口
│   │   ├── config.py        # MCPServerConfig, 工具过滤
│   │   ├── transport.py     # HTTPTransport, StdioTransport, InProcessTransport
│   │   ├── protocol.py      # JSON-RPC 2.0, MCPClient, MCPError
│   │   ├── converter.py     # MCP tool → SDK ToolDef 转换
│   │   └── manager.py       # MCPManager 统一管理
│   ├── memory/
│   │   ├── session.py       # MemorySession 便捷 API
│   │   ├── store.py         # MemoryStore Protocol + InMemoryStore
│   │   ├── store_sqlite.py  # SQLiteMemoryStore
│   │   ├── short_term.py    # ShortTermMemory 对话历史
│   │   ├── long_term.py     # LongTermMemory 用户档案
│   │   ├── working.py       # WorkingMemory 会话临时数据
│   │   ├── buffer.py        # ConversationBuffer 对话缓冲
│   │   ├── extractor.py     # MemoryExtractor + LLMMemoryExtractor
│   │   ├── formatter.py     # prompt 注入格式化
│   │   └── types.py         # 数据类型定义
│   └── utils/
│       ├── telegram_compat.py   # Zapry 兼容层
│       └── logger.py            # 日志工具
└── tests/
    ├── test_compat.py       # 兼容层测试
    ├── test_proactive.py    # 主动触发 & 反馈检测测试(44 项)
    ├── test_middleware.py   # Middleware 管道测试(9 项)
    ├── test_tools.py        # Tool Calling + OpenAI 适配器测试(32 项)
    ├── test_memory.py       # Memory 框架全量测试(55 项)
    ├── test_agent_loop.py   # AgentLoop 测试(17 项)
    └── test_guardrails.py   # Guardrails + Tracing 测试(28 项)

Zapry 兼容性

SDK 自动处理以下 Zapry 与 Telegram API 的差异:

问题 描述 状态
#1 User.first_name 缺失 ✅ Zapry 已修复(SDK 保留兜底)
#2 User.is_bot 缺失 ✅ Zapry 已修复(SDK 保留兜底)
#3 ID 字段为字符串 🔧 SDK 自动转换
#4 User.username 缺失 🔧 SDK 兼容处理
#5 私聊 chat.id 错误 ✅ Zapry 已修复
#6 chat.type 缺失 ✅ Zapry 已修复(SDK 保留兜底)
#7 群聊 ID 带 g_ 前缀 🔧 SDK 自动去除
#8 命令 entities 缺失 ✅ Zapry 已修复(SDK 保留兜底)
#9 sendChatAction 不支持 ⚠️ 业务层需跳过
#10 editMessageText 不支持 ⚠️ 业务层需跳过
#11 answerCallbackQuery 需 chat_id 🔧 SDK 自动容错
#14 不支持 Markdown 🔧 ZapryCompat.clean_markdown()

License

MIT