How to Use Async Generator in Python 3.6 to Process Streaming Data
Like many, my first programming language is Python 2.7. This year, I decided to make the switch alias python=python3.6
. Other than a 10% increase in processing speed, it has some perks not previously possible in 2.7.
In this post, I will publish a minimal, complete, and verifiable example of a script implemented in Python 3.6.
Async Generators: what are those?
Suppose we have a stream that generates a bunch of values required by other parts of the program. With asyncio
, the stream processing loop is run concurrently so it doesn’t block. It is introduced in 3.6.
This example program does the following:
- Receive a batch of data from stream continuously
- Every x seconds, do something with the batch, repeat
Here is the async generator:
import asyncio
from tectonic import TectonicDB
import json
async def subscribe(name):
db = TectonicDB(host="localhost", port=9001)
_success, _text = await db.subscribe(name)
while 1:
_, text = await db.poll()
if b"NONE" == text:
await asyncio.sleep(0.001)
else:
yield json.loads(text)
As we can see, subscribe
connects to a TectonicDB instance and subscribes to a data store then polls forever. This coroutine yields new order book updates as they come in.
Next, we define a structure to store this data.
class TickBatcher(object):
def __init__(self, db_name):
self.one_batch = []
self.db_name = db_name
async def sub(self):
async for item in subscribe(self.db_name):
self.one_batch.append(item)
Now since the generator is async, the iteration must also be async as in only iterate when new data comes in.
We write the main logic in a separate coroutine.
def timer(secs=1):
"""async timer decorator"""
def _timer(f):
async def wrapper(*args, **kwargs):
while 1:
await asyncio.sleep(s)
await f()
return wrapper
return _timer
class TickBatcher(object):
...
@timer(secs=10)
async def run(self):
# do work here
print(len(self.one_batch))
self.one_batch = []
We use a decorator to hide the sleep
ing logic.
Finally, in order to run the program, we need to create the tasks separately.
if __name__ == '__main__':
loop = asyncio.get_event_loop()
proc = TickBatcher("bnc_xrp_btc")
loop.create_task(proc.sub())
loop.create_task(proc.run())
loop.run_forever()
loop.close()
Conclusion
In this short post we used Python3.6 async generator to implement a simple script to monitor market or place simple orders.