signals.py 852 B

12345678910111213141516171819202122232425262728293031323334
  1. from aiohttp.frozenlist import FrozenList
  2. __all__ = ("Signal",)
  3. class Signal(FrozenList):
  4. """Coroutine-based signal implementation.
  5. To connect a callback to a signal, use any list method.
  6. Signals are fired using the send() coroutine, which takes named
  7. arguments.
  8. """
  9. __slots__ = ("_owner",)
  10. def __init__(self, owner):
  11. super().__init__()
  12. self._owner = owner
  13. def __repr__(self):
  14. return "<Signal owner={}, frozen={}, {!r}>".format(
  15. self._owner, self.frozen, list(self)
  16. )
  17. async def send(self, *args, **kwargs):
  18. """
  19. Sends data to all registered receivers.
  20. """
  21. if not self.frozen:
  22. raise RuntimeError("Cannot send non-frozen signal.")
  23. for receiver in self:
  24. await receiver(*args, **kwargs) # type: ignore