EVOLUTION OF asynchronous programming

with Python

Multitasking

  • preemptive (threads, greenlets)

  • cooperative (callbacks, corutines)


EPISODE #0:

Event Loop


ONE PROCESS, ONE THREAD, ONE LOOP


while True:
    events = selector.select()

    for event in events
    	heapq.heappush(self._scheduled, prepare_event(event))

    while self._scheduled:
        handle = self._scheduled[0]
        if handle._when >= end_time:
            break
        handle = heapq.heappop(self._scheduled)
        self._ready.append(handle)

    for handle in self._ready:
    	handle.run()

EPISODE #1:

CALLBACK

class PikaClient(object):
def connect():
self.conn = PikaConnection(params, callback=self.on_connected)

def on_connected(conn):
self.conn.channel(self.on_channel_open)

def on_channel_open(channel):
channel.exchange_declare(exchange=EXCHANGE_NAME)
channel.queue_declare(callback=self.on_queue_declared)

def on_queue_declared(self, method_frame):
self.queue = method_frame.method.queue
self.channel.basic_consume(self.on_message, self.queue)

Pros

  • Runs everywhere


Cons

  • Unhandled exceptions
  • Increasing complexity

EPISODE #2:

 DEFERRED

 def foo():
def on_success(result):
print 'success'

def on_failure(failure):
print failure.value

d = do_something()
d.addCallbacks(on_success, on_failure)

def do_something():
...
return defer.Deferred()

def on_result(result):
d.callback(result)

EPISODE #2:

DEFERRED CHAINS

def fun():
d = do_something()
d.addCallback(foo)
d.addCallback(bar)
d.addCallback(baz)

def callback(self, result):
    for (cb, eb) in self.callbacks:
        if isinstance(result, Failure):
            cb = eb  # Use errback
        try:
            result = cb(result)
        except:
            result = Failure() 


EPISODE #2:

 DEFERRED CHAINS


PROS

  • Integrated error handling
  • Chaining deferreds
  • Something is returned


Cons

  • Still hard to read
  • Result is available only for one function

EPISODE #3:

GENERATOR

class GenAsyncHandler(RequestHandler):
@asynchronous
@gen.engine
def get(self):
http_client = AsyncHTTPClient()
response = yield gen.Task(http_client.fetch, "http://example.com")
do_something_with_response(response)
self.render("template.html")

EPISODE #3:

GENERATOR RUNNER

class Runner(object):
def run(self):
while True:
try:
next = self.yield_point.get_result()
except Exception:
self.exc_info = sys.exc_info()

try:
if self.exc_info is not None:
yielded = self.gen.throw(*self.exc_info)
else:
yielded = self.gen.send(next)
except StopIteration:
break

PROS

  • Easy to read & understand
  • Backward compatibility


CONS

  • Huge overhead
  • Still have to work with callbacks

EPISODE #4:

Future

class Future:
def cancelled():
def done():
def result():
def exception():

def add_done_callback(fn):
def set_result(result):
def set_exception(exception):
def cancel():

EPISODE #4:

FUTURE

@gen.coroutine
def foo():
a = yield bar()
b = yield baz()

@gen.coroutine
def bar():
raise gen.Return(3)

def baz():
f = Future()
return f

def on_complete():
f.set_result(5)

EPISODE #5:

yield from


@asyncio.coroutine
def foo():
    yield from bar()

@asyncio.coroutine
def bar():
    yield from baz()

@asyncio.coroutine
def baz():
    yield from Future()

EPISODE #5:

ASYNCIO & yield from

@asyncio.coroutine
def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    yield from asyncio.sleep(1.0)
    return x + y

@asyncio.coroutine
def print_sum(x, y):
    result = yield from compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close() 


EPISODE #5:

asyncio & task

Links

  1. http://goo.gl/dyA1EF   - Deconstructing Deferred
  2. http://goo.gl/3SXR8y  - The difference between yield and yield-from
  3. http://goo.gl/eYnnmF  - Unyielding
  4. http://goo.gl/sVyHc3  - Guido on Tulip
 get_event_loop().stop()

AsyncIO Evolution

By Alexey Moskalenko

AsyncIO Evolution

  • 3,234