Initial focus on embedded systems Broqer can be used wherever continuous streams of data have to be processed - and they are everywhere. Watch out!
Synopsis
- Pure python implementation without dependencies
- Under MIT license (2018 Günther Jena)
- Source is hosted on GitHub.com
- Documentation is hosted on ReadTheDocs.com
- Tested on Python 3.7. 3.8, 3.9, 3.10 and 3.11
- Unit tested with pytest, coding style checked with Flake8, static type checked with mypy, static code checked with Pylint, documented with Sphinx
- Operators known from ReactiveX and other streaming frameworks (like Map, CombineLatest, ...)
- Centralised object to keep track of publishers and subscribers
- Starting point to build applications with a microservice architecture
Showcase
In other frameworks a Publisher is sometimes called Oberservable. A Subscriber is able to observe changes the publisher is emitting. With these basics you're able to use the observer pattern - let's see!
Observer pattern
Subscribing to a publisher is done via the .subscribe() method.
A simple subscriber is Sink which is calling a function with optional positional
and keyword arguments.
>>> from broqer import Publisher, Sink >>> a = Publisher(5) # create a publisher with state `5` >>> s = Sink(print, 'Change:') # create a subscriber >>> disposable = a.subscribe(s) # subscribe subscriber to publisher Change: 5 >>> a.notify(3) # change the state Change: 3 >>> disposable.dispose() # unsubscribe
Combine publishers with arithmetic operators
You're able to create publishers on the fly by combining two publishers with
the common operators (like +, >, <<, ...).
>>> a = Publisher(1) >>> b = Publisher(3) >>> c = a * 3 > b # create a new publisher via operator overloading >>> disposable = c.subscribe(Sink(print, 'c:')) c: False >>> a.notify(2) c: True >>> b.notify(10) c: False
Also fancy stuff like getting item by index or key is possible:
>>> i = Publisher('a') >>> d = Publisher({'a':100, 'b':200, 'c':300}) >>> disposable = d[i].subscribe(Sink(print, 'r:')) r: 100 >>> i.notify('c') r: 300 >>> d.notify({'c':123}) r: 123
Some python built in functions can't return Publishers (e.g. len() needs to
return an integer). For these cases special functions are defined in broqer: Str,
Int, Float, Len and In (for x in y). Also other functions
for convenience are available: All, Any, BitwiseAnd and BitwiseOr.
Attribute access on a publisher is building a publisher where the actual attribute
access is done on emitting values. A publisher has to know, which type it should
mimic - this is done via .inherit_type(type).
>>> i = Publisher('Attribute access made REACTIVE') >>> i.inherit_type(str) >>> disposable = i.lower().split(sep=' ').subscribe(Sink(print)) ['attribute', 'access', 'made', 'reactive'] >>> i.notify('Reactive and pythonic') ['reactive', 'and', 'pythonic']
Function decorators
Make your own operators on the fly with function decorators. Decorators are
available for Accumulate, CombineLatest, Filter, Map, MapAsync,
MapThreaded, Reduce and Sink.
>>> from broqer import op >>> @op.build_map ... def count_vowels(s): ... return sum([s.count(v) for v in 'aeiou']) >>> msg = Publisher('Hello World!') >>> disposable = (msg | count_vowels).subscribe(Sink(print, 'Number of vowels:')) Number of vowels: 3 >>> msg.notify('Wahuuu') Number of vowels: 4
You can even make configurable Map s and Filter s:
>>> import re >>> @op.build_filter_factory ... def filter_pattern(pattern, s): ... return re.search(pattern, s) is not None >>> msg = Publisher('Cars passed: 135!') >>> disposable = (msg | filter_pattern('[0-9]+')).subscribe(Sink(print)) Cars passed: 135! >>> msg.notify('No cars have passed') >>> msg.notify('Only 1 car has passed') Only 1 car has passed
Install
Credits
Broqer was inspired by:
- RxPY: Reactive Extension for Python (by Børge Lanes and Dag Brattli)
- aioreactive: Async/Await reactive tools for Python (by Dag Brattli)
- streamz: build pipelines to manage continuous streams of data (by Matthew Rocklin)
- MQTT: M2M connectivity protocol
- Florian Feurstein: spending hours of discussion, coming up with great ideas and help me understand the concepts!
API
Publishers
A Publisher is the source for messages.
Operators
| CombineLatest (*publishers) | Combine the latest emit of multiple publishers and emit the combination |
| Filter (predicate, ...) | Filters values based on a predicate function |
| Map (map_func, *args, **kwargs) | Apply map_func(*args, value, **kwargs) to each emitted value |
| MapAsync (coro, mode, ...) | Apply coro(*args, value, **kwargs) to each emitted value |
| Throttle (duration) | Limit the number of emits per duration |
Subscribers
A Subscriber is the sink for messages.
| Sink (func, *args, **kwargs) | Apply func(*args, value, **kwargs) to each emitted value |
| SinkAsync (coro, ...) | Apply coro(*args, value, **kwargs) to each emitted value |
| OnEmitFuture (timeout=None) | Build a future able to await for |
| Trace (d) | Debug output for publishers |
Values
| Value (*init) | Publisher and Subscriber |