How to Use Async Generator in Python 3.6 to Process Streaming Data
Like many, my first programming language is Python 2.7. This year, I decided to make the switch alias python=python3.6. Other than a 10% increase in processing speed, it has some perks not previously possible in 2.7.
In this post, I will publish a minimal, complete, and verifiable example of a script implemented in Python 3.6.
Async Generators: what are those?
Suppose we have a stream that generates a bunch of values required by other parts of the program. With asyncio, the stream processing loop is run concurrently so it doesn’t block. It is introduced in 3.6.
This example program does the following:
- Receive a batch of data from stream continuously
- Every x seconds, do something with the batch, repeat
Here is the async generator:
import asyncio
from tectonic import TectonicDB
import json
async def subscribe(name):
db = TectonicDB(host="localhost", port=9001)
_success, _text = await db.subscribe(name)
while 1:
_, text = await db.poll()
if b"NONE" == text:
await asyncio.sleep(0.001)
else:
yield json.loads(text)
As we can see, subscribe connects to a TectonicDB instance and subscribes to a data store then polls forever. This coroutine yields new order book updates as they come in.
Next, we define a structure to store this data.
class TickBatcher(object):
def __init__(self, db_name):
self.one_batch = []
self.db_name = db_name
async def sub(self):
async for item in subscribe(self.db_name):
self.one_batch.append(item)
Now since the generator is async, the iteration must also be async as in only iterate when new data comes in.
We write the main logic in a separate coroutine.
def timer(secs=1):
"""async timer decorator"""
def _timer(f):
async def wrapper(*args, **kwargs):
while 1:
await asyncio.sleep(s)
await f()
return wrapper
return _timer
class TickBatcher(object):
...
@timer(secs=10)
async def run(self):
# do work here
print(len(self.one_batch))
self.one_batch = []
We use a decorator to hide the sleeping logic.
Finally, in order to run the program, we need to create the tasks separately.
if __name__ == '__main__':
loop = asyncio.get_event_loop()
proc = TickBatcher("bnc_xrp_btc")
loop.create_task(proc.sub())
loop.create_task(proc.run())
loop.run_forever()
loop.close()
Conclusion
In this short post we used Python3.6 async generator to implement a simple script to monitor market or place simple orders.