anthunder(a.k.a. sofa-bolt-python)
See English README
anthunder是一个python实现的BOLT协议库,提供BOLT client和server功能,支持使用BOLT + Protobuf方式的RPC调用。
requirements
- python3 >= 3.5 (aio classes needs asyncio support)
- mosn >= 1.3 (to use with version >= 0.6)
- mosn < 1.3 (to use with version < 0.6)
roadmap
- 支持Bolt+pb调用服务端(client端)
- 支持通过servicemesh的服务发现与服务发布
- 支持使用Bolt+pb提供服务(server端)
- 支持其它序列化协议
Tutorial
以下示例以使用protobuf序列化为例。其它序列化协议请参考demo。
做为调用方
- 获取服务方提供的
.proto文件 - 执行
protoc --python_out=. *.proto命令,编译protobuf文件获得_pb2.py文件 - 导入pb类并调用接口
from SampleServicePbResult_pb2 import SampleServicePbResult from SampleServicePbRequest_pb2 import SampleServicePbRequest from anthunder import AioClient from anthunder.discovery.mosn import MosnClient, ApplicationInfo spanctx = ctx # ctx is transfered from upstream rpc, which is an object of mytracer.SpanContext, stores rpc_trace_context # spanctx = SpanContext() # or generate a new context service_reg = MosnClient() # using mosn for service discovery, see https://mosn.io for detail service_reg.startup(ApplicationInfo(YOUR_APP_NAME)) # service_reg = LocalRegistry({interface: (inf_ip, inf_port)}) # or a service-address dict as service discovery # 订阅服务, subscribe before client's requests service_reg.subscribe(interface) client = AioClient(YOUR_APP_NAME, service_register=service_reg) # will create a thread, and send heartbeat to remote every 30s interface = 'com.alipay.rpc.common.service.facade.pb.SampleServicePb:1.0' # 同步调用 content = client.invoke_sync(interface, "hello", SampleServicePbRequest(name=some_name).SerializeToString(), timeout_ms=500, spanctx=spanctx) result = SampleServicePbResult() result.ParseFromString(content) # 异步调用 def client_callback(resp): # callback function, accepts bytes as the only argument, # then do deserialize and further processes result = SampleServicePbResult() result.ParseFromString(content) # do something future = client.invoke_async(interface, "hello", SampleServicePbRequest(name=some_name).SerializeToString(), spanctx=spanctx, callback=client_callback) )
参考unittest
做为服务方
from anthunder import AioListener from anthunder.discovery.mosn import MosnClient, ApplicationInfo class SampleService(object): def __init__(self, ctx): # service must accept one param as spanctx for rpc tracing support self.ctx = ctx def hello(self, bs: bytes): obj = SampleServicePbRequest() obj.ParseFromString(bs) print("Processing Request", obj) return SampleServicePbResult(result=obj.name).SerializeToString() interface = 'com.alipay.rpc.common.service.facade.pb.SampleServicePb:1.0' service_reg = MosnClient() # using mosn for service discovery, see https://mosn.io for detail service_reg.startup(ApplicationInfo(YOUR_APP_NAME)) listener = AioListener(('127.0.0.1', 12199), YOUR_APP_NAME, service_register=service_reg) # register interface and its function, plus its protobuf definition class listener.register_interface(interface, service_cls=SampleService, provider_meta=ProviderMetaInfo(appName="test_app")) # start server in a standalone thread listener.run_threading() # or start in current thread listener.run_forever() # publish interfaces, MUST after listener start. listener.publish() # shutdown the server listener.shutdown()
License
Copyright (c) 2018-present, Ant Financial Service Group
Apache License 2.0
See LICENSE file.
Thirdparty
Part of the mysockpool package uses codes from urllib3 project under the term of MIT License. See origin-license.txt under the mysockpool package.