123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280 |
- import gc
- import io
- import os
- import sys
- import signal
- import weakref
- import unittest
- @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
- @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
- class TestBreak(unittest.TestCase):
- int_handler = None
- def setUp(self):
- self._default_handler = signal.getsignal(signal.SIGINT)
- if self.int_handler is not None:
- signal.signal(signal.SIGINT, self.int_handler)
- def tearDown(self):
- signal.signal(signal.SIGINT, self._default_handler)
- unittest.signals._results = weakref.WeakKeyDictionary()
- unittest.signals._interrupt_handler = None
- def testInstallHandler(self):
- default_handler = signal.getsignal(signal.SIGINT)
- unittest.installHandler()
- self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
- try:
- pid = os.getpid()
- os.kill(pid, signal.SIGINT)
- except KeyboardInterrupt:
- self.fail("KeyboardInterrupt not handled")
- self.assertTrue(unittest.signals._interrupt_handler.called)
- def testRegisterResult(self):
- result = unittest.TestResult()
- self.assertNotIn(result, unittest.signals._results)
- unittest.registerResult(result)
- try:
- self.assertIn(result, unittest.signals._results)
- finally:
- unittest.removeResult(result)
- def testInterruptCaught(self):
- default_handler = signal.getsignal(signal.SIGINT)
- result = unittest.TestResult()
- unittest.installHandler()
- unittest.registerResult(result)
- self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
- def test(result):
- pid = os.getpid()
- os.kill(pid, signal.SIGINT)
- result.breakCaught = True
- self.assertTrue(result.shouldStop)
- try:
- test(result)
- except KeyboardInterrupt:
- self.fail("KeyboardInterrupt not handled")
- self.assertTrue(result.breakCaught)
- def testSecondInterrupt(self):
- # Can't use skipIf decorator because the signal handler may have
- # been changed after defining this method.
- if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
- self.skipTest("test requires SIGINT to not be ignored")
- result = unittest.TestResult()
- unittest.installHandler()
- unittest.registerResult(result)
- def test(result):
- pid = os.getpid()
- os.kill(pid, signal.SIGINT)
- result.breakCaught = True
- self.assertTrue(result.shouldStop)
- os.kill(pid, signal.SIGINT)
- self.fail("Second KeyboardInterrupt not raised")
- try:
- test(result)
- except KeyboardInterrupt:
- pass
- else:
- self.fail("Second KeyboardInterrupt not raised")
- self.assertTrue(result.breakCaught)
- def testTwoResults(self):
- unittest.installHandler()
- result = unittest.TestResult()
- unittest.registerResult(result)
- new_handler = signal.getsignal(signal.SIGINT)
- result2 = unittest.TestResult()
- unittest.registerResult(result2)
- self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)
- result3 = unittest.TestResult()
- def test(result):
- pid = os.getpid()
- os.kill(pid, signal.SIGINT)
- try:
- test(result)
- except KeyboardInterrupt:
- self.fail("KeyboardInterrupt not handled")
- self.assertTrue(result.shouldStop)
- self.assertTrue(result2.shouldStop)
- self.assertFalse(result3.shouldStop)
- def testHandlerReplacedButCalled(self):
- # Can't use skipIf decorator because the signal handler may have
- # been changed after defining this method.
- if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
- self.skipTest("test requires SIGINT to not be ignored")
- # If our handler has been replaced (is no longer installed) but is
- # called by the *new* handler, then it isn't safe to delay the
- # SIGINT and we should immediately delegate to the default handler
- unittest.installHandler()
- handler = signal.getsignal(signal.SIGINT)
- def new_handler(frame, signum):
- handler(frame, signum)
- signal.signal(signal.SIGINT, new_handler)
- try:
- pid = os.getpid()
- os.kill(pid, signal.SIGINT)
- except KeyboardInterrupt:
- pass
- else:
- self.fail("replaced but delegated handler doesn't raise interrupt")
- def testRunner(self):
- # Creating a TextTestRunner with the appropriate argument should
- # register the TextTestResult it creates
- runner = unittest.TextTestRunner(stream=io.StringIO())
- result = runner.run(unittest.TestSuite())
- self.assertIn(result, unittest.signals._results)
- def testWeakReferences(self):
- # Calling registerResult on a result should not keep it alive
- result = unittest.TestResult()
- unittest.registerResult(result)
- ref = weakref.ref(result)
- del result
- # For non-reference counting implementations
- gc.collect();gc.collect()
- self.assertIsNone(ref())
- def testRemoveResult(self):
- result = unittest.TestResult()
- unittest.registerResult(result)
- unittest.installHandler()
- self.assertTrue(unittest.removeResult(result))
- # Should this raise an error instead?
- self.assertFalse(unittest.removeResult(unittest.TestResult()))
- try:
- pid = os.getpid()
- os.kill(pid, signal.SIGINT)
- except KeyboardInterrupt:
- pass
- self.assertFalse(result.shouldStop)
- def testMainInstallsHandler(self):
- failfast = object()
- test = object()
- verbosity = object()
- result = object()
- default_handler = signal.getsignal(signal.SIGINT)
- class FakeRunner(object):
- initArgs = []
- runArgs = []
- def __init__(self, *args, **kwargs):
- self.initArgs.append((args, kwargs))
- def run(self, test):
- self.runArgs.append(test)
- return result
- class Program(unittest.TestProgram):
- def __init__(self, catchbreak):
- self.exit = False
- self.verbosity = verbosity
- self.failfast = failfast
- self.catchbreak = catchbreak
- self.tb_locals = False
- self.testRunner = FakeRunner
- self.test = test
- self.result = None
- p = Program(False)
- p.runTests()
- self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
- 'verbosity': verbosity,
- 'failfast': failfast,
- 'tb_locals': False,
- 'warnings': None})])
- self.assertEqual(FakeRunner.runArgs, [test])
- self.assertEqual(p.result, result)
- self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
- FakeRunner.initArgs = []
- FakeRunner.runArgs = []
- p = Program(True)
- p.runTests()
- self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
- 'verbosity': verbosity,
- 'failfast': failfast,
- 'tb_locals': False,
- 'warnings': None})])
- self.assertEqual(FakeRunner.runArgs, [test])
- self.assertEqual(p.result, result)
- self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
- def testRemoveHandler(self):
- default_handler = signal.getsignal(signal.SIGINT)
- unittest.installHandler()
- unittest.removeHandler()
- self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
- # check that calling removeHandler multiple times has no ill-effect
- unittest.removeHandler()
- self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
- def testRemoveHandlerAsDecorator(self):
- default_handler = signal.getsignal(signal.SIGINT)
- unittest.installHandler()
- @unittest.removeHandler
- def test():
- self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
- test()
- self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
- @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
- @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
- class TestBreakDefaultIntHandler(TestBreak):
- int_handler = signal.default_int_handler
- @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
- @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
- class TestBreakSignalIgnored(TestBreak):
- int_handler = signal.SIG_IGN
- @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
- @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
- class TestBreakSignalDefault(TestBreak):
- int_handler = signal.SIG_DFL
- if __name__ == "__main__":
- unittest.main()
|