test_async_case.py 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. import asyncio
  2. import unittest
  3. from test import support
  4. class MyException(Exception):
  5. pass
  6. def tearDownModule():
  7. asyncio.set_event_loop_policy(None)
  8. class TestAsyncCase(unittest.TestCase):
  9. maxDiff = None
  10. def tearDown(self):
  11. # Ensure that IsolatedAsyncioTestCase instances are destroyed before
  12. # starting a new event loop
  13. support.gc_collect()
  14. def test_full_cycle(self):
  15. class Test(unittest.IsolatedAsyncioTestCase):
  16. def setUp(self):
  17. self.assertEqual(events, [])
  18. events.append('setUp')
  19. async def asyncSetUp(self):
  20. self.assertEqual(events, ['setUp'])
  21. events.append('asyncSetUp')
  22. self.addAsyncCleanup(self.on_cleanup1)
  23. async def test_func(self):
  24. self.assertEqual(events, ['setUp',
  25. 'asyncSetUp'])
  26. events.append('test')
  27. self.addAsyncCleanup(self.on_cleanup2)
  28. async def asyncTearDown(self):
  29. self.assertEqual(events, ['setUp',
  30. 'asyncSetUp',
  31. 'test'])
  32. events.append('asyncTearDown')
  33. def tearDown(self):
  34. self.assertEqual(events, ['setUp',
  35. 'asyncSetUp',
  36. 'test',
  37. 'asyncTearDown'])
  38. events.append('tearDown')
  39. async def on_cleanup1(self):
  40. self.assertEqual(events, ['setUp',
  41. 'asyncSetUp',
  42. 'test',
  43. 'asyncTearDown',
  44. 'tearDown',
  45. 'cleanup2'])
  46. events.append('cleanup1')
  47. async def on_cleanup2(self):
  48. self.assertEqual(events, ['setUp',
  49. 'asyncSetUp',
  50. 'test',
  51. 'asyncTearDown',
  52. 'tearDown'])
  53. events.append('cleanup2')
  54. events = []
  55. test = Test("test_func")
  56. result = test.run()
  57. self.assertEqual(result.errors, [])
  58. self.assertEqual(result.failures, [])
  59. expected = ['setUp', 'asyncSetUp', 'test',
  60. 'asyncTearDown', 'tearDown', 'cleanup2', 'cleanup1']
  61. self.assertEqual(events, expected)
  62. events = []
  63. test = Test("test_func")
  64. test.debug()
  65. self.assertEqual(events, expected)
  66. test.doCleanups()
  67. self.assertEqual(events, expected)
  68. def test_exception_in_setup(self):
  69. class Test(unittest.IsolatedAsyncioTestCase):
  70. async def asyncSetUp(self):
  71. events.append('asyncSetUp')
  72. self.addAsyncCleanup(self.on_cleanup)
  73. raise MyException()
  74. async def test_func(self):
  75. events.append('test')
  76. async def asyncTearDown(self):
  77. events.append('asyncTearDown')
  78. async def on_cleanup(self):
  79. events.append('cleanup')
  80. events = []
  81. test = Test("test_func")
  82. result = test.run()
  83. self.assertEqual(events, ['asyncSetUp', 'cleanup'])
  84. self.assertIs(result.errors[0][0], test)
  85. self.assertIn('MyException', result.errors[0][1])
  86. events = []
  87. test = Test("test_func")
  88. try:
  89. test.debug()
  90. except MyException:
  91. pass
  92. else:
  93. self.fail('Expected a MyException exception')
  94. self.assertEqual(events, ['asyncSetUp'])
  95. test.doCleanups()
  96. self.assertEqual(events, ['asyncSetUp', 'cleanup'])
  97. def test_exception_in_test(self):
  98. class Test(unittest.IsolatedAsyncioTestCase):
  99. async def asyncSetUp(self):
  100. events.append('asyncSetUp')
  101. async def test_func(self):
  102. events.append('test')
  103. self.addAsyncCleanup(self.on_cleanup)
  104. raise MyException()
  105. async def asyncTearDown(self):
  106. events.append('asyncTearDown')
  107. async def on_cleanup(self):
  108. events.append('cleanup')
  109. events = []
  110. test = Test("test_func")
  111. result = test.run()
  112. self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup'])
  113. self.assertIs(result.errors[0][0], test)
  114. self.assertIn('MyException', result.errors[0][1])
  115. events = []
  116. test = Test("test_func")
  117. try:
  118. test.debug()
  119. except MyException:
  120. pass
  121. else:
  122. self.fail('Expected a MyException exception')
  123. self.assertEqual(events, ['asyncSetUp', 'test'])
  124. test.doCleanups()
  125. self.assertEqual(events, ['asyncSetUp', 'test', 'cleanup'])
  126. def test_exception_in_tear_down(self):
  127. class Test(unittest.IsolatedAsyncioTestCase):
  128. async def asyncSetUp(self):
  129. events.append('asyncSetUp')
  130. async def test_func(self):
  131. events.append('test')
  132. self.addAsyncCleanup(self.on_cleanup)
  133. async def asyncTearDown(self):
  134. events.append('asyncTearDown')
  135. raise MyException()
  136. async def on_cleanup(self):
  137. events.append('cleanup')
  138. events = []
  139. test = Test("test_func")
  140. result = test.run()
  141. self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup'])
  142. self.assertIs(result.errors[0][0], test)
  143. self.assertIn('MyException', result.errors[0][1])
  144. events = []
  145. test = Test("test_func")
  146. try:
  147. test.debug()
  148. except MyException:
  149. pass
  150. else:
  151. self.fail('Expected a MyException exception')
  152. self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown'])
  153. test.doCleanups()
  154. self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup'])
  155. def test_exception_in_tear_clean_up(self):
  156. class Test(unittest.IsolatedAsyncioTestCase):
  157. async def asyncSetUp(self):
  158. events.append('asyncSetUp')
  159. async def test_func(self):
  160. events.append('test')
  161. self.addAsyncCleanup(self.on_cleanup1)
  162. self.addAsyncCleanup(self.on_cleanup2)
  163. async def asyncTearDown(self):
  164. events.append('asyncTearDown')
  165. async def on_cleanup1(self):
  166. events.append('cleanup1')
  167. raise MyException('some error')
  168. async def on_cleanup2(self):
  169. events.append('cleanup2')
  170. raise MyException('other error')
  171. events = []
  172. test = Test("test_func")
  173. result = test.run()
  174. self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup2', 'cleanup1'])
  175. self.assertIs(result.errors[0][0], test)
  176. self.assertIn('MyException: other error', result.errors[0][1])
  177. self.assertIn('MyException: some error', result.errors[1][1])
  178. events = []
  179. test = Test("test_func")
  180. try:
  181. test.debug()
  182. except MyException:
  183. pass
  184. else:
  185. self.fail('Expected a MyException exception')
  186. self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup2'])
  187. test.doCleanups()
  188. self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup2', 'cleanup1'])
  189. def test_cleanups_interleave_order(self):
  190. events = []
  191. class Test(unittest.IsolatedAsyncioTestCase):
  192. async def test_func(self):
  193. self.addAsyncCleanup(self.on_sync_cleanup, 1)
  194. self.addAsyncCleanup(self.on_async_cleanup, 2)
  195. self.addAsyncCleanup(self.on_sync_cleanup, 3)
  196. self.addAsyncCleanup(self.on_async_cleanup, 4)
  197. async def on_sync_cleanup(self, val):
  198. events.append(f'sync_cleanup {val}')
  199. async def on_async_cleanup(self, val):
  200. events.append(f'async_cleanup {val}')
  201. test = Test("test_func")
  202. test.run()
  203. self.assertEqual(events, ['async_cleanup 4',
  204. 'sync_cleanup 3',
  205. 'async_cleanup 2',
  206. 'sync_cleanup 1'])
  207. def test_base_exception_from_async_method(self):
  208. events = []
  209. class Test(unittest.IsolatedAsyncioTestCase):
  210. async def test_base(self):
  211. events.append("test_base")
  212. raise BaseException()
  213. events.append("not it")
  214. async def test_no_err(self):
  215. events.append("test_no_err")
  216. async def test_cancel(self):
  217. raise asyncio.CancelledError()
  218. test = Test("test_base")
  219. output = test.run()
  220. self.assertFalse(output.wasSuccessful())
  221. test = Test("test_no_err")
  222. test.run()
  223. self.assertEqual(events, ['test_base', 'test_no_err'])
  224. test = Test("test_cancel")
  225. output = test.run()
  226. self.assertFalse(output.wasSuccessful())
  227. def test_cancellation_hanging_tasks(self):
  228. cancelled = False
  229. class Test(unittest.IsolatedAsyncioTestCase):
  230. async def test_leaking_task(self):
  231. async def coro():
  232. nonlocal cancelled
  233. try:
  234. await asyncio.sleep(1)
  235. except asyncio.CancelledError:
  236. cancelled = True
  237. raise
  238. # Leave this running in the background
  239. asyncio.create_task(coro())
  240. test = Test("test_leaking_task")
  241. output = test.run()
  242. self.assertTrue(cancelled)
  243. if __name__ == "__main__":
  244. unittest.main()