timeouts.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. import enum
  2. from types import TracebackType
  3. from typing import final, Optional, Type
  4. from . import events
  5. from . import exceptions
  6. from . import tasks
  7. __all__ = (
  8. "Timeout",
  9. "timeout",
  10. "timeout_at",
  11. )
  12. class _State(enum.Enum):
  13. CREATED = "created"
  14. ENTERED = "active"
  15. EXPIRING = "expiring"
  16. EXPIRED = "expired"
  17. EXITED = "finished"
  18. @final
  19. class Timeout:
  20. """Asynchronous context manager for cancelling overdue coroutines.
  21. Use `timeout()` or `timeout_at()` rather than instantiating this class directly.
  22. """
  23. def __init__(self, when: Optional[float]) -> None:
  24. """Schedule a timeout that will trigger at a given loop time.
  25. - If `when` is `None`, the timeout will never trigger.
  26. - If `when < loop.time()`, the timeout will trigger on the next
  27. iteration of the event loop.
  28. """
  29. self._state = _State.CREATED
  30. self._timeout_handler: Optional[events.TimerHandle] = None
  31. self._task: Optional[tasks.Task] = None
  32. self._when = when
  33. def when(self) -> Optional[float]:
  34. """Return the current deadline."""
  35. return self._when
  36. def reschedule(self, when: Optional[float]) -> None:
  37. """Reschedule the timeout."""
  38. assert self._state is not _State.CREATED
  39. if self._state is not _State.ENTERED:
  40. raise RuntimeError(
  41. f"Cannot change state of {self._state.value} Timeout",
  42. )
  43. self._when = when
  44. if self._timeout_handler is not None:
  45. self._timeout_handler.cancel()
  46. if when is None:
  47. self._timeout_handler = None
  48. else:
  49. loop = events.get_running_loop()
  50. if when <= loop.time():
  51. self._timeout_handler = loop.call_soon(self._on_timeout)
  52. else:
  53. self._timeout_handler = loop.call_at(when, self._on_timeout)
  54. def expired(self) -> bool:
  55. """Is timeout expired during execution?"""
  56. return self._state in (_State.EXPIRING, _State.EXPIRED)
  57. def __repr__(self) -> str:
  58. info = ['']
  59. if self._state is _State.ENTERED:
  60. when = round(self._when, 3) if self._when is not None else None
  61. info.append(f"when={when}")
  62. info_str = ' '.join(info)
  63. return f"<Timeout [{self._state.value}]{info_str}>"
  64. async def __aenter__(self) -> "Timeout":
  65. self._state = _State.ENTERED
  66. self._task = tasks.current_task()
  67. self._cancelling = self._task.cancelling()
  68. if self._task is None:
  69. raise RuntimeError("Timeout should be used inside a task")
  70. self.reschedule(self._when)
  71. return self
  72. async def __aexit__(
  73. self,
  74. exc_type: Optional[Type[BaseException]],
  75. exc_val: Optional[BaseException],
  76. exc_tb: Optional[TracebackType],
  77. ) -> Optional[bool]:
  78. assert self._state in (_State.ENTERED, _State.EXPIRING)
  79. if self._timeout_handler is not None:
  80. self._timeout_handler.cancel()
  81. self._timeout_handler = None
  82. if self._state is _State.EXPIRING:
  83. self._state = _State.EXPIRED
  84. if self._task.uncancel() <= self._cancelling and exc_type is exceptions.CancelledError:
  85. # Since there are no new cancel requests, we're
  86. # handling this.
  87. raise TimeoutError from exc_val
  88. elif self._state is _State.ENTERED:
  89. self._state = _State.EXITED
  90. return None
  91. def _on_timeout(self) -> None:
  92. assert self._state is _State.ENTERED
  93. self._task.cancel()
  94. self._state = _State.EXPIRING
  95. # drop the reference early
  96. self._timeout_handler = None
  97. def timeout(delay: Optional[float]) -> Timeout:
  98. """Timeout async context manager.
  99. Useful in cases when you want to apply timeout logic around block
  100. of code or in cases when asyncio.wait_for is not suitable. For example:
  101. >>> async with asyncio.timeout(10): # 10 seconds timeout
  102. ... await long_running_task()
  103. delay - value in seconds or None to disable timeout logic
  104. long_running_task() is interrupted by raising asyncio.CancelledError,
  105. the top-most affected timeout() context manager converts CancelledError
  106. into TimeoutError.
  107. """
  108. loop = events.get_running_loop()
  109. return Timeout(loop.time() + delay if delay is not None else None)
  110. def timeout_at(when: Optional[float]) -> Timeout:
  111. """Schedule the timeout at absolute time.
  112. Like timeout() but argument gives absolute time in the same clock system
  113. as loop.time().
  114. Please note: it is not POSIX time but a time with
  115. undefined starting base, e.g. the time of the system power on.
  116. >>> async with asyncio.timeout_at(loop.time() + 10):
  117. ... await long_running_task()
  118. when - a deadline when timeout occurs or None to disable timeout logic
  119. long_running_task() is interrupted by raising asyncio.CancelledError,
  120. the top-most affected timeout() context manager converts CancelledError
  121. into TimeoutError.
  122. """
  123. return Timeout(when)