Reactive Python - Asynchronous programming
@xav-b|October 5, 2016 (8y ago)4 views
On the Confluent website one can find this bold title :
Stream data changes everything
From the createors of Kafka, a real-time messaging system, this is not a surprising assertion. Yet, data streaming infrastructures have gained in popularity and many projects require the data to be processed as soon as it shows up. It contributed to the development of famous technologies like Spark Stremaing, Apache Storm and more broadly websockets.
This latest piece of software in particular brought real-time data feeds to web applications, trying to solve low latency connections. Coupled with the asynchronous Node.js, one can build a powerful event-based reactive system. But what about Python ? Given the popularity of the language in data science, would it be possible to bring the benefits of this kind of data ingestion ? As this article will show, it turns out that modern Python (hear Python 3.4 or later) supports asynchronous data streaming apps.
Introducing asyncio
Python 3.4 introduced in the standard library the module asyncio to provision the language with :
Asynchronous I/O, event loop, coroutines and tasks
While Python treats functions as first-class objects (meaning you can assign them to variable, pass them as arguments), most developers follow an imperative programming style. It seams on purpose :
It requires super human discipline to write readable code in callbacks and if you don’t believe me look at any piece of JavaScript code.
- Guido van Rossum
So Asyncio is the pythonic answer to asynchronous programming. This paradigm makes a lot of sense for otherwise costly I/O operations or when we need events to trigger code.
Scenario
For fun and profit, let's build such project. We will simulate a dummy electrical circuit composed of three components :
- A clock regularly ticking
- A board I/O pin randomly choosing to toggle its binary state on clock events
- A buzzer buzzing when the I/O pin flips to one
This set us up with an interesting machine-to-machine communication problem to solve.
Note the code snippets in this post makes use of features like async
and
await
introduced in Python 3.5. While it would be possible to backport to
Python 3.4, I highly recommend the reader to follow along with the same version
or newer. Anaconda or Pyenv can ease the installation
process if necessary.
Asynchronous webscoket Client/Server
The client : Clock
Our first step, the clock, will introduce both asyncio and websocket basics. We need a straightforward method that fires tick signals through a websocket and wait for acknowledgement.
The async
keyword is sugar syntaxing introduced in Python 3.5 to replace the
previous @asyncio.coroutine
. The official pep 492 explains it all
but tldr : API quality.
To simplify websocket connection plumbing we can take advantage of the eponym
package: pip install websockets==3.5.1
. It hides the protocol's
complexity behind an elegant context manager.
The keyworkd await
was introduced with async
and replaces the old yield from
to read values from asynchronous functions. Inside the context manager the
connection stays open and we can stream data to the server we contacted.
The server : IOPin
The core of our application are entities capable of speaking to each others
directly. To make things fun we will expose the same API as Arduino
sketches, i.e. a setup
method ran once at startup and a loop
called when new data is available.
The child objects will be required to implement setup
and loop
while this
class will take care of
- Initializing the sketch
- Registering a websocket server based on a asynchronous callback (
loop
) - Telling the event loop to poll for... events.
The websockets states the server callback is expected to have the
signature on_connection(websocket, path)
. This is too low level for our
purpose. Instead, we can write a decorator to manage asyncio
details, message
passing, error handling, etc... and only call self.loop
with application-level
relevant information : the actual message and the websocket path.
Now we can develop a readable IOPin
object.
We finally need some glue to run both clock and IOPin and test if the later toggles its state when the former fires new ticks. The following snippet uses a convenient library, click 6.6, to parse command line arguments.
Don't forget to chmod +x
the script and start the server in a first terminal
./arduino.py iopin
. When it is listening for connections, start the clock with
./arduino.py clock
and watch them communicate ! Note that we used here common
default host and port so they can find each other.
Peer to peer communication
So far we established a websocket connection to process asynchronously clock events. Now that one pin swing between 1's and 0's, let's wire a buzzer and pretend it buzzes on high state (1) and remains silent on low ones (0). We can rephrase that in Python like so :
So now how do we make them to communicate ? Since they share a common parent
class, we implement a stream
method to send arbitrary data and acknowledge
reception with, also, arbitrary data. To sum up, we want IOPin
to use this
API:
Service Discovery
The first challenge to solve is service discovery. We need to target specific nodes within a fleet of reactive workers.
This topic however goes past the scope of this article. The shortcut below will do the job (i.e. hardcode the nodes we will start) while keeping us focus on reactive messaging.
Streaming Machine-to-Machine chat
Let's provide FactoryLoop
with the knowledge of the grid and implement an
asynchrone communication channel.
We added a bit of debugging lines to better understand how the data flows
through the network. Every implementation of the FactoryLoop
can both react to
events and communicate with other nodes it is aware of.
Wrapping up
Time to update arduino.py
and run our cluster of three reactive
workers in three.
Launch three terminals or use a tool like foreman to spawn multiple processes. Either way, keep in mind you will need to track scripts output.
We just saw one worker reacting to a clock, and an other one reacting to randomly generated events. The websocket protocol allowed us to exchange streaming data and receive arbitrary responses, unlocking sophisticated fleet orchestration. While we limited this example to two nodes, a powerful service discovery mechanism could bring to life a distributed network of microservices.