mixins.py 481 B

123456789101112131415161718192021
  1. """Event loop mixins."""
  2. import threading
  3. from . import events
  4. _global_lock = threading.Lock()
  5. class _LoopBoundMixin:
  6. _loop = None
  7. def _get_loop(self):
  8. loop = events._get_running_loop()
  9. if self._loop is None:
  10. with _global_lock:
  11. if self._loop is None:
  12. self._loop = loop
  13. if loop is not self._loop:
  14. raise RuntimeError(f'{self!r} is bound to a different event loop')
  15. return loop