Like many, my first programming language is Python 2.7. This year, I decided to make the switch alias python=python3.6. Other than a 10% increase in processing speed, it has some perks not previously possible in 2.7.

In this post, I will publish a minimal, complete, and verifiable example of a script implemented in Python 3.6.

Async Generators: what are those?

Suppose we have a stream that generates a bunch of values required by other parts of the program. With asyncio, the stream processing loop is run concurrently so it doesn’t block. It is introduced in 3.6.

This example program does the following:

  1. Receive a batch of data from stream continuously
  2. Every x seconds, do something with the batch, repeat

Here is the async generator:

import asyncio
from tectonic import TectonicDB
import json
async def subscribe(name):
    db = TectonicDB(host="localhost", port=9001)
    _success, _text = await db.subscribe(name)
    while 1:
        _, text = await db.poll()
        if b"NONE" == text:
            await asyncio.sleep(0.001)
        else:
            yield json.loads(text)

As we can see, subscribe connects to a TectonicDB instance and subscribes to a data store then polls forever. This coroutine yields new order book updates as they come in.

Next, we define a structure to store this data.

class TickBatcher(object):
    def __init__(self, db_name):
        self.one_batch = []
        self.db_name = db_name

    async def sub(self):
        async for item in subscribe(self.db_name):
            self.one_batch.append(item)

Now since the generator is async, the iteration must also be async as in only iterate when new data comes in.

We write the main logic in a separate coroutine.

def timer(secs=1):
    """async timer decorator"""
    def _timer(f):
        async def wrapper(*args, **kwargs):
            while 1:
                await asyncio.sleep(s)
                await f()
        return wrapper
    return _timer

class TickBatcher(object):
    ...
    @timer(secs=10)
    async def run(self):
        # do work here
        print(len(self.one_batch))
        self.one_batch = []

We use a decorator to hide the sleeping logic.

Finally, in order to run the program, we need to create the tasks separately.

if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    proc = TickBatcher("bnc_xrp_btc")
    loop.create_task(proc.sub())
    loop.create_task(proc.run())

    loop.run_forever()
    loop.close()

Conclusion

In this short post we used Python3.6 async generator to implement a simple script to monitor market or place simple orders.

If you find this article helpful, you should sign up to get updates.