I have an asyncio based class which I want to unit test. Using tornado.testing.AsyncTestCase
this works quite well and easily. However, one specific method of my class uses asyncio.ensure_future
to schedule execution of another method. This never finishes in the AsyncTestCase
, because the default test runner uses the tornado KQueueIOLoop
event loop, not an asyncio event loop.
class TestSubject:
def foo(self):
asyncio.ensure_future(self.bar())
async def bar(self):
pass
class TestSubjectTest(AsyncTestCase):
def test_foo(self):
t = TestSubject()
# here be somewhat involved setup with MagicMock and self.stop
t.foo()
self.wait()
$ python -m tornado.testing baz.testsubject_test
...
[E 160627 17:48:22 testing:731] FAIL
[E 160627 17:48:22 base_events:1090] Task was destroyed but it is pending!
task: <Task pending coro=<TestSubject.bar() running at ...>>
.../asyncio/base_events.py:362: RuntimeWarning: coroutine 'TestSubject.bar' was never awaited
How can I use a different event loop to run the tests on to ensure my task will actually be executed? Alternatively, how can I make my implementation event loop-independent and cross-compatible?
Aucun commentaire:
Enregistrer un commentaire