Waiting For Multiple Events With Tornado
Recently I saw a question on Stack Overflow about waiting for multiple events with a Tornado coroutine, until one of the events completes. The inquirer wanted to do something like this:
result = yield Any([future1, future2, future3])
If the middle future has resolved and the other two are still pending, the result should be like:
[None, "<some result>", None]
Tornado doesn't provide a class like Any. How would you implement one?
Contents:
A Bad Beginning
You could make a class that inherits from Future, and wraps a list of futures. The class waits until one of its futures resolves, then gives you the list of results:
class Any(Future):
def __init__(self, futures):
super(Any, self).__init__()
self.futures = futures
for future in futures:
# done_callback is defined just below.
future.add_done_callback(self.done_callback)
def done_callback(self, future):
"""Called when any future resolves."""
try:
self.set_result(self.make_result())
except Exception as e:
self.set_exception(e)
def make_result(self):
"""A list of results.
Includes None for each pending future, and a result for each
resolved future. Raises an exception for the first future
that has an exception.
"""
return [f.result() if f.done() else None
for f in self.futures]
def clear(self):
"""Break reference cycle with any pending futures."""
self.futures = None
Here's an example use of Any:
@gen.coroutine
def delayed_msg(seconds, msg):
yield gen.Task(IOLoop.current().add_timeout,
time.time() + seconds)
raise gen.Return(msg)
@gen.coroutine
def f():
start = time.time()
future1 = delayed_msg(2, '2')
future2 = delayed_msg(3, '3')
future3 = delayed_msg(1, '1')
# future3 will resolve first.
results = yield Any([future1, future2, future3])
end = time.time()
print "finished in %.1f sec: %r" % (end - start, results)
# Wait for any of the remaining futures.
results = yield Any([future1, future2])
end = time.time()
print "finished in %.1f sec: %r" % (end - start, results)
IOLoop.current().run_sync(f)
As expected, this prints:
finished in 1.0 sec: [None, None, '1']
finished in 2.0 sec: ['2', None]
But you can see there are some complications with this approach. For one thing, if you want to wait for the rest of the futures after the first one resolves, it's complicated to construct the list of still-pending futures. I suppose you could do:
futures = [future1, future2, future3]
results = yield Any(f for f in futures
if not f.done())
Not pretty. And not correct, either! There's a race condition: if a future is resolved in between consecutive executions of this code, you may never receive its result. On the first call, you get the result of some other future that resolves faster, but by the time you're constructing the list to pass to the second Any, your future is now "done" and you omit it from the list.
Another complication is the reference cycle: Any refers to each future, which refers to a callback which refers back to Any. For prompt garbage collection, you should call clear()
on Any before it goes out of scope. This is very awkward.
Additionally, you can't distinguish between a pending future, and a future that resolved to None. You'd need a special sentinel value distinct from None to represent a pending future.
The final complication is the worst. If multiple futures are resolved and some of them have exceptions, there's no obvious way for Any to communicate all that information to you. Mixing exceptions and results in a list would be perverse.
A Better Way
Fortunately, there's a better way. We can make Any return just the first future that resolves, instead of a list of results:
class Any(Future):
def __init__(self, futures):
super(Any, self).__init__()
for future in futures:
future.add_done_callback(self.done_callback)
def done_callback(self, future):
self.set_result(future)
The reference cycle is gone, and the exception-handling question is answered: The Any class returns the whole future to you, instead of its result or exception. You can inspect it as you like.
It's also easy to wait for the remaining futures after some are resolved:
@gen.coroutine
def f():
start = time.time()
future1 = delayed_msg(2, '2')
future2 = delayed_msg(3, '3')
future3 = delayed_msg(1, '1')
futures = set([future1, future2, future3])
while futures:
resolved = yield Any(futures)
end = time.time()
print "finished in %.1f sec: %r" % (
end - start, resolved.result())
futures.remove(resolved)
As desired, this prints:
finished in 1.0 sec: '1'
finished in 2.0 sec: '2'
finished in 3.0 sec: '3'
There's no race condition now. You can't miss a result, because you don't remove a future from the list unless you've received its result.
Exceptions
To test the exception-handling behavior, let's make a function that raises an exception after a delay:
@gen.coroutine
def delayed_exception(seconds, msg):
yield gen.Task(IOLoop.current().add_timeout,
time.time() + seconds)
raise Exception(msg)
Now, instead of returning a result, one of our futures will raise an exception:
@gen.coroutine
def f():
start = time.time()
future1 = delayed_msg(2, '2')
# Exception!
future2 = delayed_exception(3, '3')
future3 = delayed_msg(1, '1')
futures = set([future1, future2, future3])
while futures:
resolved = yield Any(futures)
end = time.time()
try:
outcome = resolved.result()
except Exception as e:
outcome = e
print "finished in %.1f sec: %r" % (
end - start, outcome)
futures.remove(resolved)
Now, the script prints:
finished in 1.0 sec: '1'
finished in 2.0 sec: '2'
finished in 3.0 sec: Exception('3',)
Conclusion
It took a bit of thinking, but our final Any class is simple. It lets you launch many concurrent operations and process them in the order they complete. Not bad.