#
## Connect your apps RMLL 2018 A presentation by
Florent Aide and Christophe de Vienne
## What is it ? ### Xbus is a service bus Any application can: * send events * without knowing who will use it * regardless of the consumers state
Any application can: * receive events * without knowing its actual origin * regardless of the emitter state
Xbus * routes messages * based on graphs * helps define data streams * safety and security
## The contract ### Safety * Once ack'ed, your data is safe * The message will be delivered or an error reported _no matter what_
## The contract ### Security * Actors are strictly identified * No eavesdropping
## Overview * Clients in Go, Python, Javascript, Elm, PHP... * HTTP(S)/REST gateway * Server written in Go * Microservices-based architecture * Communications via NATS and NRPC * Modular storage - currently Postgresql
Examples
### Graph definition ```yaml nodes: - id: relay type: worker actors: [worker-1] inputs: [default] outputs: [default] - id: sink type: consumer roles: [consume] rolebroadcast: true inputs: [default] - id: source type: emitter sourcematch: eventtypes: [demo.count] outputs: [default] edges: - source -> relay - relay -> sink ```
## Emitter ```python async def run(self): try: while True: output = self.actor.open_output( output='default', eventtypes=["demo.simplemessage"]) await output.add_items( "demo.simplemessage", self.message.encode('utf-8')) await output.close() await asyncio.sleep(self.interval) except Exception as e: print("Error in helloword", e) raise ```
## Consumer ```python async def receive_envelope(self, envelope_receiver): envelope = await envelope_receiver.complete() print("print-to-console:", envelope.events[0].items[0]) await self.actor.processing_success( envelope.context, envelope.id) print("print-to-console:", "processing_success sent") ```
## Worker ```python async def receive_envelope(self, envelope_receiver): envelope = await envelope_receiver.complete() output = self.actor.open_output( output='default', eventtypes=["demo.simplemessage"]) for item in envelope.events[0].items: await output.add_items( "demo.simplemessage", item) await output.close() ```
## use cases * Control/monitoring of data streams * HA front ↔ Slow back-office communications * Mission-critical call-center ↔ 40+ ERP instances * Legacy warehouse software → HTTP gw → ERP
## Why * A need * Closed-source solutions * closed source * very expensive * Open-source solutions * open-core only and/or uneasy * can be expensive
## Goals * Low-level entry barrier * Lightweight * Scalable * Affordable => Accessible to anyone
## Architecture * Very specialized micro-services * Communicating via NATS + NRPC only * Using specialized storages
Schema
Open Source. Simple. Secure. Scalable. * PUB/SUB server cluster * queue subscriptions * subject-based permissions * REQ/REP pattern
## NRPC * nRPC is an RPC framework like gRPC, but for NATS * Describe messages and services with protobuf * Protobuf and JSON encoded messages
## OSS Contributions * NATS * TLS authentication * NRPC * JSON messages, optimized message format * Custom subjects, Parametrized subjects * No reply functions, Void args and replies, Streamed replies * New langs: Gogo protobuf, Python (soon PHP, Elm)
Roadmap
#### ~ Short-term ~ * Replies * Channels * Performance * More distributed * Monitoring integration * GUI * More reusable connectors
#### ~ Long-term ~ * Expectations * More programming languages support * Handle massive volumes * Fully distributed * Multi-sites deployment * Multi-tenant deployment * Saas offer
## Demo
### Interested? Quickstart
https://docs.xbus.io/latest/quickstart.html Contact
*
contact@xbus.io
* Twitter
@xbus_io
* #xbus on freenode * NATS - https://www.nats.io * NRPC - https://github.com/nats-rpc
### Thank you ! Questions? * Xbus - https://xbus.io/ * Sources - https://orus.io/xbus/xbus * Slides - https://xbus.io/slides/2018-07-rmll * Email -
contact@xbus.io
* Twitter -
@xbus_io
* IRC - #xbus on freenode