123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303 |
- import asyncio
- import unittest
- from test import support
- class MyException(Exception):
- pass
- def tearDownModule():
- asyncio.set_event_loop_policy(None)
- class TestAsyncCase(unittest.TestCase):
- maxDiff = None
- def tearDown(self):
- # Ensure that IsolatedAsyncioTestCase instances are destroyed before
- # starting a new event loop
- support.gc_collect()
- def test_full_cycle(self):
- class Test(unittest.IsolatedAsyncioTestCase):
- def setUp(self):
- self.assertEqual(events, [])
- events.append('setUp')
- async def asyncSetUp(self):
- self.assertEqual(events, ['setUp'])
- events.append('asyncSetUp')
- self.addAsyncCleanup(self.on_cleanup1)
- async def test_func(self):
- self.assertEqual(events, ['setUp',
- 'asyncSetUp'])
- events.append('test')
- self.addAsyncCleanup(self.on_cleanup2)
- async def asyncTearDown(self):
- self.assertEqual(events, ['setUp',
- 'asyncSetUp',
- 'test'])
- events.append('asyncTearDown')
- def tearDown(self):
- self.assertEqual(events, ['setUp',
- 'asyncSetUp',
- 'test',
- 'asyncTearDown'])
- events.append('tearDown')
- async def on_cleanup1(self):
- self.assertEqual(events, ['setUp',
- 'asyncSetUp',
- 'test',
- 'asyncTearDown',
- 'tearDown',
- 'cleanup2'])
- events.append('cleanup1')
- async def on_cleanup2(self):
- self.assertEqual(events, ['setUp',
- 'asyncSetUp',
- 'test',
- 'asyncTearDown',
- 'tearDown'])
- events.append('cleanup2')
- events = []
- test = Test("test_func")
- result = test.run()
- self.assertEqual(result.errors, [])
- self.assertEqual(result.failures, [])
- expected = ['setUp', 'asyncSetUp', 'test',
- 'asyncTearDown', 'tearDown', 'cleanup2', 'cleanup1']
- self.assertEqual(events, expected)
- events = []
- test = Test("test_func")
- test.debug()
- self.assertEqual(events, expected)
- test.doCleanups()
- self.assertEqual(events, expected)
- def test_exception_in_setup(self):
- class Test(unittest.IsolatedAsyncioTestCase):
- async def asyncSetUp(self):
- events.append('asyncSetUp')
- self.addAsyncCleanup(self.on_cleanup)
- raise MyException()
- async def test_func(self):
- events.append('test')
- async def asyncTearDown(self):
- events.append('asyncTearDown')
- async def on_cleanup(self):
- events.append('cleanup')
- events = []
- test = Test("test_func")
- result = test.run()
- self.assertEqual(events, ['asyncSetUp', 'cleanup'])
- self.assertIs(result.errors[0][0], test)
- self.assertIn('MyException', result.errors[0][1])
- events = []
- test = Test("test_func")
- try:
- test.debug()
- except MyException:
- pass
- else:
- self.fail('Expected a MyException exception')
- self.assertEqual(events, ['asyncSetUp'])
- test.doCleanups()
- self.assertEqual(events, ['asyncSetUp', 'cleanup'])
- def test_exception_in_test(self):
- class Test(unittest.IsolatedAsyncioTestCase):
- async def asyncSetUp(self):
- events.append('asyncSetUp')
- async def test_func(self):
- events.append('test')
- self.addAsyncCleanup(self.on_cleanup)
- raise MyException()
- async def asyncTearDown(self):
- events.append('asyncTearDown')
- async def on_cleanup(self):
- events.append('cleanup')
- events = []
- test = Test("test_func")
- result = test.run()
- self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup'])
- self.assertIs(result.errors[0][0], test)
- self.assertIn('MyException', result.errors[0][1])
- events = []
- test = Test("test_func")
- try:
- test.debug()
- except MyException:
- pass
- else:
- self.fail('Expected a MyException exception')
- self.assertEqual(events, ['asyncSetUp', 'test'])
- test.doCleanups()
- self.assertEqual(events, ['asyncSetUp', 'test', 'cleanup'])
- def test_exception_in_tear_down(self):
- class Test(unittest.IsolatedAsyncioTestCase):
- async def asyncSetUp(self):
- events.append('asyncSetUp')
- async def test_func(self):
- events.append('test')
- self.addAsyncCleanup(self.on_cleanup)
- async def asyncTearDown(self):
- events.append('asyncTearDown')
- raise MyException()
- async def on_cleanup(self):
- events.append('cleanup')
- events = []
- test = Test("test_func")
- result = test.run()
- self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup'])
- self.assertIs(result.errors[0][0], test)
- self.assertIn('MyException', result.errors[0][1])
- events = []
- test = Test("test_func")
- try:
- test.debug()
- except MyException:
- pass
- else:
- self.fail('Expected a MyException exception')
- self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown'])
- test.doCleanups()
- self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup'])
- def test_exception_in_tear_clean_up(self):
- class Test(unittest.IsolatedAsyncioTestCase):
- async def asyncSetUp(self):
- events.append('asyncSetUp')
- async def test_func(self):
- events.append('test')
- self.addAsyncCleanup(self.on_cleanup1)
- self.addAsyncCleanup(self.on_cleanup2)
- async def asyncTearDown(self):
- events.append('asyncTearDown')
- async def on_cleanup1(self):
- events.append('cleanup1')
- raise MyException('some error')
- async def on_cleanup2(self):
- events.append('cleanup2')
- raise MyException('other error')
- events = []
- test = Test("test_func")
- result = test.run()
- self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup2', 'cleanup1'])
- self.assertIs(result.errors[0][0], test)
- self.assertIn('MyException: other error', result.errors[0][1])
- self.assertIn('MyException: some error', result.errors[1][1])
- events = []
- test = Test("test_func")
- try:
- test.debug()
- except MyException:
- pass
- else:
- self.fail('Expected a MyException exception')
- self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup2'])
- test.doCleanups()
- self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup2', 'cleanup1'])
- def test_cleanups_interleave_order(self):
- events = []
- class Test(unittest.IsolatedAsyncioTestCase):
- async def test_func(self):
- self.addAsyncCleanup(self.on_sync_cleanup, 1)
- self.addAsyncCleanup(self.on_async_cleanup, 2)
- self.addAsyncCleanup(self.on_sync_cleanup, 3)
- self.addAsyncCleanup(self.on_async_cleanup, 4)
- async def on_sync_cleanup(self, val):
- events.append(f'sync_cleanup {val}')
- async def on_async_cleanup(self, val):
- events.append(f'async_cleanup {val}')
- test = Test("test_func")
- test.run()
- self.assertEqual(events, ['async_cleanup 4',
- 'sync_cleanup 3',
- 'async_cleanup 2',
- 'sync_cleanup 1'])
- def test_base_exception_from_async_method(self):
- events = []
- class Test(unittest.IsolatedAsyncioTestCase):
- async def test_base(self):
- events.append("test_base")
- raise BaseException()
- events.append("not it")
- async def test_no_err(self):
- events.append("test_no_err")
- async def test_cancel(self):
- raise asyncio.CancelledError()
- test = Test("test_base")
- output = test.run()
- self.assertFalse(output.wasSuccessful())
- test = Test("test_no_err")
- test.run()
- self.assertEqual(events, ['test_base', 'test_no_err'])
- test = Test("test_cancel")
- output = test.run()
- self.assertFalse(output.wasSuccessful())
- def test_cancellation_hanging_tasks(self):
- cancelled = False
- class Test(unittest.IsolatedAsyncioTestCase):
- async def test_leaking_task(self):
- async def coro():
- nonlocal cancelled
- try:
- await asyncio.sleep(1)
- except asyncio.CancelledError:
- cancelled = True
- raise
- # Leave this running in the background
- asyncio.create_task(coro())
- test = Test("test_leaking_task")
- output = test.run()
- self.assertTrue(cancelled)
- if __name__ == "__main__":
- unittest.main()
|