Toad vs Birdo

Async frameworks like Tornado scramble our usual unittest strategies: how can you validate the outcome when you do not know when to expect it? Tornado ships with a tornado.testing module that provides two solutions: the wait / stop pattern, and gen_test.

Wait / Stop

To begin, let us say we are writing an async application with feature like Gmail's undo send: when I click "send", Gmail delays a few seconds before actually sending the email. It is a funny phenomenon, that during the seconds after clicking "sending" I experience a special clarity about my email. It was too angry, or I forgot an attachment, most often both. If I click the "undo" button in time, the email reverts to a draft and I can tone it down, add the attachment, and send it again.

To write an application with this feature, we will need an asynchronous "delay" function, and we must test it. If we were testing a normal blocking delay function we could use unittest.TestCase from the standard library:

import time
import unittest

from my_application import delay


class MyTestCase(unittest.TestCase):
    def test_delay(self):
        start = time.time()
        delay(1)
        duration = time.time() - start
        self.assertAlmostEqual(duration, 1, places=2)

When we run this, it prints:

Ran 1 test in 1.000s
OK

And if we replace delay(1) with delay(2) it fails as expected:

=======================================================
FAIL: test_delay (delay0.MyTestCase)
------------------------------------------------------
Traceback (most recent call last):
File "delay0.py", line 12, in test_delay
    self.assertAlmostEqual(duration, 1, places=2)
AssertionError: 2.000854969024658 != 1 within 2 places

------------------------------------------------------
Ran 1 test in 2.002s
FAILED (failures=1)

Great! What about testing a delay_async(seconds, callback) function?

def test_delay(self):
    start = time.time()
    delay_async(1, callback=)  # What goes here?
    duration = time.time() - start
    self.assertAlmostEqual(duration, 1, places=2)

An asynchronous "delay" function can't block the caller, so it must take a callback and execute it once the delay is over. (In fact we are just reimplementing Tornado's call_later, but please pretend for pedagogy's sake this is a new function that we must test.) To test our delay_async, we will try a series of testing techniques until we have effectively built Tornado's test framework from scratch—you will see why we need special test tools for async code and how Tornado's tools work.

So, we define a function done to measure the delay, and pass it as the callback to delay_async:

def test_delay(self):
    start = time.time()

    def done():
        duration = time.time() - start
        self.assertAlmostEqual(duration, 1, places=2)

    delay_async(1, done)

If we run this:

Ran 1 test in 0.001s
OK

Success! ...right? But why does it only take a millisecond? And what happens if we delay by two seconds instead?

def test_delay(self):
    start = time.time()

    def done():
        duration = time.time() - start
        self.assertAlmostEqual(duration, 1, places=2)

    delay_async(2, done)

Run it again:

Ran 1 test in 0.001s
OK

Something is very wrong here. The test appears to pass instantly, regardless of the argument to delay_async, because we neither start the event loop nor wait for it to complete. We have to actually pause the test until the callback has executed:

def test_delay(self):
    start = time.time()
    io_loop = IOLoop.instance()

    def done():
        duration = time.time() - start
        self.assertAlmostEqual(duration, 1, places=2)
        io_loop.stop()

    delay_async(1, done)
    io_loop.start()

Now if we run the test with a delay of one second:

Ran 1 test in 1.002s
OK

That looks better. And if we delay for two seconds?

ERROR:tornado.application:Exception in callback
Traceback (most recent call last):
  File "site-packages/tornado/ioloop.py", line 568, in _run_callback
    ret = callback()
  File "site-packages/tornado/stack_context.py", line 275, in null_wrapper
    return fn(*args, **kwargs)
  File "delay3.py", line 16, in done
    self.assertAlmostEqual(duration, 1, places=2)
  File "unittest/case.py", line 845, in assertAlmostEqual
    raise self.failureException(msg)
AssertionError: 2.001540184020996 != 1 within 2 places

The test appears to fail, as expected, but there are a few problems. First, notice that it is not the unittest that prints the traceback: it is Tornado's application logger. We do not get the unittest's characteristic output. Second, the process is now hung and remains so until I type Control-C. Why?

The bug is here:

def done():
    duration = time.time() - start
    self.assertAlmostEqual(duration, 1, places=2)
    io_loop.stop()

Since the failed assertion raises an exception, we never reach the call to io_loop.stop(), so the loop continues running and the process does not exit. We need to register an exception handler. Exception handling with callbacks is convoluted; we have to use a stack context to install a handler with Tornado:

from tornado.stack_context import ExceptionStackContext

class MyTestCase(unittest.TestCase):
    def test_delay(self):
        start = time.time()
        io_loop = IOLoop.instance()

        def done():
            duration = time.time() - start
            self.assertAlmostEqual(duration, 1, places=2)
            io_loop.stop()

        self.failure = None

        def handle_exception(typ, value, tb):
            io_loop.stop()
            self.failure = value
            return True  # Stop propagation.

        with ExceptionStackContext(handle_exception):
            delay_async(2, callback=done)

        io_loop.start()
        if self.failure:
            raise self.failure

The loop can now be stopped two ways: if the test passes, then done stops the loop as before. If it fails, handle_exception stores the error and stops the loop. At the end, if an error was stored we re-raise it to make the test fail:

=======================================================
FAIL: test_delay (delay4.MyTestCase)
------------------------------------------------------
Traceback (most recent call last):
  File "delay4.py", line 31, in test_delay
    raise self.failure
  File "tornado/ioloop.py", line 568, in _run_callback
    ret = callback()
  File "tornado/stack_context.py", line 343, in wrapped
    raise_exc_info(exc)
  File "<string>", line 3, in raise_exc_info
  File "tornado/stack_context.py", line 314, in wrapped
    ret = fn(*args, **kwargs)
  File "delay4.py", line 17, in done
    self.assertAlmostEqual(duration, 1, places=2)
AssertionError: 2.0015950202941895 != 1 within 2 places
------------------------------------------------------
Ran 1 test in 2.004s
FAILED (failures=1)

Now the test ends promptly, whether it succeeds or fails, with unittest's typical output.

This is a lot of tricky code to write just to test a trivial delay function, and it seems hard to get right each time. What does Tornado provide for us? Its AsyncTestCase gives us start and stop methods to control the event loop. If we then move the duration-testing outside the callback we radically simplify our test:

from tornado import testing

class MyTestCase(testing.AsyncTestCase):
    def test_delay(self):
        start = time.time()
        delay_async(1, callback=self.stop)
        self.wait()
        duration = time.time() - start
        self.assertAlmostEqual(duration, 1, places=2)

gen_test

But modern async code is not primarily written with callbacks: these days we use coroutines. Let us begin a new example test, one that uses Motor, my asynchronous MongoDB driver for Tornado. Although Motor supports the old callback style, it encourages you to use coroutines and "yield" statements, so we can write some Motor code to demonstrate Tornado coroutines and unittesting.

To begin, say we want to execute find_one and test its return value:

from motor import MotorClient
from tornado import testing

class MyTestCase(testing.AsyncTestCase):
    def setUp(self):
        super().setUp()
        self.client = MotorClient()

    def test_find_one(self):
        collection = self.client.test.collection
        document = yield collection.find_one({'_id': 1})
        self.assertEqual({'_id': 1, 'key': 'value'}, document)

Notice the "yield" statement: whenever you call a Motor method that does I/O, you must use "yield" to pause the current function and wait for the returned Future object to be resolved to a value. Including a yield statement makes this function a generator. But now there is a problem:

TypeError: Generator test methods should be decorated with tornado.testing.gen_test

Tornado smartly warns us that our test method is merely a generator—we must decorate it with gen_test. Otherwise the test method simply stops at the first yield, and never reaches the assert. It needs a coroutine driver to run it to completion:

from tornado.testing import gen_test

class MyTestCase(testing.AsyncTestCase):
    # ... same setup ...
    @gen_test
    def test_find_one(self):
        collection = self.client.test.collection
        document = yield collection.find_one({'_id': 1})
        self.assertEqual({'_id': 1, 'key': 'value'}, document)

But now when I run the test, it fails:

AssertionError: {'key': 'value', '_id': 1} != None

We need to insert some data in setUp so that find_one can find it! Since Motor is asynchronous, we cannot call its insert method directly from setUp, we must run the insertion in a coroutine as well:

from tornado import gen, testing

class MyTestCase(testing.AsyncTestCase):
    def setUp(self):
        super().setUp()
        self.client = MotorClient()
        self.setup_coro()

    @gen.coroutine
    def setup_coro(self):
        collection = self.client.test.collection

        # Clean up from prior runs:
        yield collection.remove()

        yield collection.insert({'_id': 0})
        yield collection.insert({'_id': 1, 'key': 'value'})
        yield collection.insert({'_id': 2})

Now when I run the test:

AssertionError: {'key': 'value', '_id': 1} != None

It still fails! When I check in the mongo shell whether my data was inserted, only two of the three expected documents are there:

> db.collection.find()
{ "_id" : 0 }
{ "_id" : 1, "key" : "value" }

Why is it incomplete? Furthermore, since the document I actually query is there, why did the test fail?

When I called self.setup_coro() in setUp, I launched it as a concurrent coroutine. It began running, but I did not wait for it to complete before beginning the test, so the test may reach its find_one statement before the second document is inserted. Furthermore, test_find_one can fail quickly enough that setup_coro does not insert its third document before the whole test suite finishes, stopping the event loop and preventing the final document from ever being inserted.

Clearly I must wait for the setup coroutine to complete before beginning the test. Tornado's run_sync method is designed for uses like this:

class MyTestCase(testing.AsyncTestCase):
    def setUp(self):
        super().setUp()
        self.client = MotorClient()
        self.io_loop.run_sync(self.setup_coro)

With my setup coroutine correctly executed, now test_find_one passes.

Further Study

Now we have seen two techniques that make async testing with Tornado as convenient and reliable as standard unittests. To learn more, see my page of links related to this article.

Plus, stay tuned for the next book in the Architecture of Open Source Applications series. It will be called "500 Lines or Less", and my chapter is devoted to the implementation of coroutines in asyncio and Python 3.