locks.py 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. import asyncio
  2. import collections
  3. from typing import Any, Optional
  4. try:
  5. from typing import Deque
  6. except ImportError:
  7. from typing_extensions import Deque
  8. class EventResultOrError:
  9. """
  10. This class wrappers the Event asyncio lock allowing either awake the
  11. locked Tasks without any error or raising an exception.
  12. thanks to @vorpalsmith for the simple design.
  13. """
  14. def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
  15. self._loop = loop
  16. self._exc = None # type: Optional[BaseException]
  17. self._event = asyncio.Event()
  18. self._waiters = collections.deque() # type: Deque[asyncio.Future[Any]]
  19. def set(self, exc: Optional[BaseException] = None) -> None:
  20. self._exc = exc
  21. self._event.set()
  22. async def wait(self) -> Any:
  23. waiter = self._loop.create_task(self._event.wait())
  24. self._waiters.append(waiter)
  25. try:
  26. val = await waiter
  27. finally:
  28. self._waiters.remove(waiter)
  29. if self._exc is not None:
  30. raise self._exc
  31. return val
  32. def cancel(self) -> None:
  33. """ Cancel all waiters """
  34. for waiter in self._waiters:
  35. waiter.cancel()