test_break.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. import gc
  2. import io
  3. import os
  4. import sys
  5. import signal
  6. import weakref
  7. import unittest
  8. @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
  9. @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
  10. class TestBreak(unittest.TestCase):
  11. int_handler = None
  12. def setUp(self):
  13. self._default_handler = signal.getsignal(signal.SIGINT)
  14. if self.int_handler is not None:
  15. signal.signal(signal.SIGINT, self.int_handler)
  16. def tearDown(self):
  17. signal.signal(signal.SIGINT, self._default_handler)
  18. unittest.signals._results = weakref.WeakKeyDictionary()
  19. unittest.signals._interrupt_handler = None
  20. def testInstallHandler(self):
  21. default_handler = signal.getsignal(signal.SIGINT)
  22. unittest.installHandler()
  23. self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
  24. try:
  25. pid = os.getpid()
  26. os.kill(pid, signal.SIGINT)
  27. except KeyboardInterrupt:
  28. self.fail("KeyboardInterrupt not handled")
  29. self.assertTrue(unittest.signals._interrupt_handler.called)
  30. def testRegisterResult(self):
  31. result = unittest.TestResult()
  32. self.assertNotIn(result, unittest.signals._results)
  33. unittest.registerResult(result)
  34. try:
  35. self.assertIn(result, unittest.signals._results)
  36. finally:
  37. unittest.removeResult(result)
  38. def testInterruptCaught(self):
  39. default_handler = signal.getsignal(signal.SIGINT)
  40. result = unittest.TestResult()
  41. unittest.installHandler()
  42. unittest.registerResult(result)
  43. self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
  44. def test(result):
  45. pid = os.getpid()
  46. os.kill(pid, signal.SIGINT)
  47. result.breakCaught = True
  48. self.assertTrue(result.shouldStop)
  49. try:
  50. test(result)
  51. except KeyboardInterrupt:
  52. self.fail("KeyboardInterrupt not handled")
  53. self.assertTrue(result.breakCaught)
  54. def testSecondInterrupt(self):
  55. # Can't use skipIf decorator because the signal handler may have
  56. # been changed after defining this method.
  57. if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
  58. self.skipTest("test requires SIGINT to not be ignored")
  59. result = unittest.TestResult()
  60. unittest.installHandler()
  61. unittest.registerResult(result)
  62. def test(result):
  63. pid = os.getpid()
  64. os.kill(pid, signal.SIGINT)
  65. result.breakCaught = True
  66. self.assertTrue(result.shouldStop)
  67. os.kill(pid, signal.SIGINT)
  68. self.fail("Second KeyboardInterrupt not raised")
  69. try:
  70. test(result)
  71. except KeyboardInterrupt:
  72. pass
  73. else:
  74. self.fail("Second KeyboardInterrupt not raised")
  75. self.assertTrue(result.breakCaught)
  76. def testTwoResults(self):
  77. unittest.installHandler()
  78. result = unittest.TestResult()
  79. unittest.registerResult(result)
  80. new_handler = signal.getsignal(signal.SIGINT)
  81. result2 = unittest.TestResult()
  82. unittest.registerResult(result2)
  83. self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)
  84. result3 = unittest.TestResult()
  85. def test(result):
  86. pid = os.getpid()
  87. os.kill(pid, signal.SIGINT)
  88. try:
  89. test(result)
  90. except KeyboardInterrupt:
  91. self.fail("KeyboardInterrupt not handled")
  92. self.assertTrue(result.shouldStop)
  93. self.assertTrue(result2.shouldStop)
  94. self.assertFalse(result3.shouldStop)
  95. def testHandlerReplacedButCalled(self):
  96. # Can't use skipIf decorator because the signal handler may have
  97. # been changed after defining this method.
  98. if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
  99. self.skipTest("test requires SIGINT to not be ignored")
  100. # If our handler has been replaced (is no longer installed) but is
  101. # called by the *new* handler, then it isn't safe to delay the
  102. # SIGINT and we should immediately delegate to the default handler
  103. unittest.installHandler()
  104. handler = signal.getsignal(signal.SIGINT)
  105. def new_handler(frame, signum):
  106. handler(frame, signum)
  107. signal.signal(signal.SIGINT, new_handler)
  108. try:
  109. pid = os.getpid()
  110. os.kill(pid, signal.SIGINT)
  111. except KeyboardInterrupt:
  112. pass
  113. else:
  114. self.fail("replaced but delegated handler doesn't raise interrupt")
  115. def testRunner(self):
  116. # Creating a TextTestRunner with the appropriate argument should
  117. # register the TextTestResult it creates
  118. runner = unittest.TextTestRunner(stream=io.StringIO())
  119. result = runner.run(unittest.TestSuite())
  120. self.assertIn(result, unittest.signals._results)
  121. def testWeakReferences(self):
  122. # Calling registerResult on a result should not keep it alive
  123. result = unittest.TestResult()
  124. unittest.registerResult(result)
  125. ref = weakref.ref(result)
  126. del result
  127. # For non-reference counting implementations
  128. gc.collect();gc.collect()
  129. self.assertIsNone(ref())
  130. def testRemoveResult(self):
  131. result = unittest.TestResult()
  132. unittest.registerResult(result)
  133. unittest.installHandler()
  134. self.assertTrue(unittest.removeResult(result))
  135. # Should this raise an error instead?
  136. self.assertFalse(unittest.removeResult(unittest.TestResult()))
  137. try:
  138. pid = os.getpid()
  139. os.kill(pid, signal.SIGINT)
  140. except KeyboardInterrupt:
  141. pass
  142. self.assertFalse(result.shouldStop)
  143. def testMainInstallsHandler(self):
  144. failfast = object()
  145. test = object()
  146. verbosity = object()
  147. result = object()
  148. default_handler = signal.getsignal(signal.SIGINT)
  149. class FakeRunner(object):
  150. initArgs = []
  151. runArgs = []
  152. def __init__(self, *args, **kwargs):
  153. self.initArgs.append((args, kwargs))
  154. def run(self, test):
  155. self.runArgs.append(test)
  156. return result
  157. class Program(unittest.TestProgram):
  158. def __init__(self, catchbreak):
  159. self.exit = False
  160. self.verbosity = verbosity
  161. self.failfast = failfast
  162. self.catchbreak = catchbreak
  163. self.tb_locals = False
  164. self.testRunner = FakeRunner
  165. self.test = test
  166. self.result = None
  167. p = Program(False)
  168. p.runTests()
  169. self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
  170. 'verbosity': verbosity,
  171. 'failfast': failfast,
  172. 'tb_locals': False,
  173. 'warnings': None})])
  174. self.assertEqual(FakeRunner.runArgs, [test])
  175. self.assertEqual(p.result, result)
  176. self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
  177. FakeRunner.initArgs = []
  178. FakeRunner.runArgs = []
  179. p = Program(True)
  180. p.runTests()
  181. self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
  182. 'verbosity': verbosity,
  183. 'failfast': failfast,
  184. 'tb_locals': False,
  185. 'warnings': None})])
  186. self.assertEqual(FakeRunner.runArgs, [test])
  187. self.assertEqual(p.result, result)
  188. self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
  189. def testRemoveHandler(self):
  190. default_handler = signal.getsignal(signal.SIGINT)
  191. unittest.installHandler()
  192. unittest.removeHandler()
  193. self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
  194. # check that calling removeHandler multiple times has no ill-effect
  195. unittest.removeHandler()
  196. self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
  197. def testRemoveHandlerAsDecorator(self):
  198. default_handler = signal.getsignal(signal.SIGINT)
  199. unittest.installHandler()
  200. @unittest.removeHandler
  201. def test():
  202. self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
  203. test()
  204. self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
  205. @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
  206. @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
  207. class TestBreakDefaultIntHandler(TestBreak):
  208. int_handler = signal.default_int_handler
  209. @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
  210. @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
  211. class TestBreakSignalIgnored(TestBreak):
  212. int_handler = signal.SIG_IGN
  213. @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
  214. @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
  215. class TestBreakSignalDefault(TestBreak):
  216. int_handler = signal.SIG_DFL
  217. if __name__ == "__main__":
  218. unittest.main()