mardi 28 juin 2016

Run tornado.testing.AsyncTestCase using asyncio event loop

I have an asyncio based class which I want to unit test. Using tornado.testing.AsyncTestCase this works quite well and easily. However, one specific method of my class uses asyncio.ensure_future to schedule execution of another method. This never finishes in the AsyncTestCase, because the default test runner uses the tornado KQueueIOLoop event loop, not an asyncio event loop.

class TestSubject:
    def foo(self):
        asyncio.ensure_future(self.bar())

    async def bar(self):
        pass

class TestSubjectTest(AsyncTestCase):
    def test_foo(self):
        t = TestSubject()
        # here be somewhat involved setup with MagicMock and self.stop
        t.foo()
        self.wait()

$ python -m tornado.testing baz.testsubject_test
...
[E 160627 17:48:22 testing:731] FAIL
[E 160627 17:48:22 base_events:1090] Task was destroyed but it is pending!
    task: <Task pending coro=<TestSubject.bar() running at ...>>
.../asyncio/base_events.py:362: RuntimeWarning: coroutine 'TestSubject.bar' was never awaited

How can I use a different event loop to run the tests on to ensure my task will actually be executed? Alternatively, how can I make my implementation event loop-independent and cross-compatible?

Aucun commentaire:

Enregistrer un commentaire