123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- import asyncio
- import inspect
- from .case import TestCase
- class IsolatedAsyncioTestCase(TestCase):
- # Names intentionally have a long prefix
- # to reduce a chance of clashing with user-defined attributes
- # from inherited test case
- #
- # The class doesn't call loop.run_until_complete(self.setUp()) and family
- # but uses a different approach:
- # 1. create a long-running task that reads self.setUp()
- # awaitable from queue along with a future
- # 2. await the awaitable object passing in and set the result
- # into the future object
- # 3. Outer code puts the awaitable and the future object into a queue
- # with waiting for the future
- # The trick is necessary because every run_until_complete() call
- # creates a new task with embedded ContextVar context.
- # To share contextvars between setUp(), test and tearDown() we need to execute
- # them inside the same task.
- # Note: the test case modifies event loop policy if the policy was not instantiated
- # yet.
- # asyncio.get_event_loop_policy() creates a default policy on demand but never
- # returns None
- # I believe this is not an issue in user level tests but python itself for testing
- # should reset a policy in every test module
- # by calling asyncio.set_event_loop_policy(None) in tearDownModule()
- def __init__(self, methodName='runTest'):
- super().__init__(methodName)
- self._asyncioTestLoop = None
- self._asyncioCallsQueue = None
- async def asyncSetUp(self):
- pass
- async def asyncTearDown(self):
- pass
- def addAsyncCleanup(self, func, /, *args, **kwargs):
- # A trivial trampoline to addCleanup()
- # the function exists because it has a different semantics
- # and signature:
- # addCleanup() accepts regular functions
- # but addAsyncCleanup() accepts coroutines
- #
- # We intentionally don't add inspect.iscoroutinefunction() check
- # for func argument because there is no way
- # to check for async function reliably:
- # 1. It can be "async def func()" itself
- # 2. Class can implement "async def __call__()" method
- # 3. Regular "def func()" that returns awaitable object
- self.addCleanup(*(func, *args), **kwargs)
- def _callSetUp(self):
- self.setUp()
- self._callAsync(self.asyncSetUp)
- def _callTestMethod(self, method):
- self._callMaybeAsync(method)
- def _callTearDown(self):
- self._callAsync(self.asyncTearDown)
- self.tearDown()
- def _callCleanup(self, function, *args, **kwargs):
- self._callMaybeAsync(function, *args, **kwargs)
- def _callAsync(self, func, /, *args, **kwargs):
- assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized'
- ret = func(*args, **kwargs)
- assert inspect.isawaitable(ret), f'{func!r} returned non-awaitable'
- fut = self._asyncioTestLoop.create_future()
- self._asyncioCallsQueue.put_nowait((fut, ret))
- return self._asyncioTestLoop.run_until_complete(fut)
- def _callMaybeAsync(self, func, /, *args, **kwargs):
- assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized'
- ret = func(*args, **kwargs)
- if inspect.isawaitable(ret):
- fut = self._asyncioTestLoop.create_future()
- self._asyncioCallsQueue.put_nowait((fut, ret))
- return self._asyncioTestLoop.run_until_complete(fut)
- else:
- return ret
- async def _asyncioLoopRunner(self, fut):
- self._asyncioCallsQueue = queue = asyncio.Queue()
- fut.set_result(None)
- while True:
- query = await queue.get()
- queue.task_done()
- if query is None:
- return
- fut, awaitable = query
- try:
- ret = await awaitable
- if not fut.cancelled():
- fut.set_result(ret)
- except (SystemExit, KeyboardInterrupt):
- raise
- except (BaseException, asyncio.CancelledError) as ex:
- if not fut.cancelled():
- fut.set_exception(ex)
- def _setupAsyncioLoop(self):
- assert self._asyncioTestLoop is None, 'asyncio test loop already initialized'
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- loop.set_debug(True)
- self._asyncioTestLoop = loop
- fut = loop.create_future()
- self._asyncioCallsTask = loop.create_task(self._asyncioLoopRunner(fut))
- loop.run_until_complete(fut)
- def _tearDownAsyncioLoop(self):
- assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized'
- loop = self._asyncioTestLoop
- self._asyncioTestLoop = None
- self._asyncioCallsQueue.put_nowait(None)
- loop.run_until_complete(self._asyncioCallsQueue.join())
- try:
- # cancel all tasks
- to_cancel = asyncio.all_tasks(loop)
- if not to_cancel:
- return
- for task in to_cancel:
- task.cancel()
- loop.run_until_complete(
- asyncio.gather(*to_cancel, return_exceptions=True))
- for task in to_cancel:
- if task.cancelled():
- continue
- if task.exception() is not None:
- loop.call_exception_handler({
- 'message': 'unhandled exception during test shutdown',
- 'exception': task.exception(),
- 'task': task,
- })
- # shutdown asyncgens
- loop.run_until_complete(loop.shutdown_asyncgens())
- finally:
- # Prevent our executor environment from leaking to future tests.
- loop.run_until_complete(loop.shutdown_default_executor())
- asyncio.set_event_loop(None)
- loop.close()
- def run(self, result=None):
- self._setupAsyncioLoop()
- try:
- return super().run(result)
- finally:
- self._tearDownAsyncioLoop()
- def debug(self):
- self._setupAsyncioLoop()
- super().debug()
- self._tearDownAsyncioLoop()
- def __del__(self):
- if self._asyncioTestLoop is not None:
- self._tearDownAsyncioLoop()
|