signals.py 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. import signal
  2. import weakref
  3. from functools import wraps
  4. __unittest = True
  5. class _InterruptHandler(object):
  6. def __init__(self, default_handler):
  7. self.called = False
  8. self.original_handler = default_handler
  9. if isinstance(default_handler, int):
  10. if default_handler == signal.SIG_DFL:
  11. # Pretend it's signal.default_int_handler instead.
  12. default_handler = signal.default_int_handler
  13. elif default_handler == signal.SIG_IGN:
  14. # Not quite the same thing as SIG_IGN, but the closest we
  15. # can make it: do nothing.
  16. def default_handler(unused_signum, unused_frame):
  17. pass
  18. else:
  19. raise TypeError("expected SIGINT signal handler to be "
  20. "signal.SIG_IGN, signal.SIG_DFL, or a "
  21. "callable object")
  22. self.default_handler = default_handler
  23. def __call__(self, signum, frame):
  24. installed_handler = signal.getsignal(signal.SIGINT)
  25. if installed_handler is not self:
  26. # if we aren't the installed handler, then delegate immediately
  27. # to the default handler
  28. self.default_handler(signum, frame)
  29. if self.called:
  30. self.default_handler(signum, frame)
  31. self.called = True
  32. for result in _results.keys():
  33. result.stop()
  34. _results = weakref.WeakKeyDictionary()
  35. def registerResult(result):
  36. _results[result] = 1
  37. def removeResult(result):
  38. return bool(_results.pop(result, None))
  39. _interrupt_handler = None
  40. def installHandler():
  41. global _interrupt_handler
  42. if _interrupt_handler is None:
  43. default_handler = signal.getsignal(signal.SIGINT)
  44. _interrupt_handler = _InterruptHandler(default_handler)
  45. signal.signal(signal.SIGINT, _interrupt_handler)
  46. def removeHandler(method=None):
  47. if method is not None:
  48. @wraps(method)
  49. def inner(*args, **kwargs):
  50. initial = signal.getsignal(signal.SIGINT)
  51. removeHandler()
  52. try:
  53. return method(*args, **kwargs)
  54. finally:
  55. signal.signal(signal.SIGINT, initial)
  56. return inner
  57. global _interrupt_handler
  58. if _interrupt_handler is not None:
  59. signal.signal(signal.SIGINT, _interrupt_handler.original_handler)