connection.py 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. #
  2. # Analogue of `multiprocessing.connection` which uses queues instead of sockets
  3. #
  4. # multiprocessing/dummy/connection.py
  5. #
  6. # Copyright (c) 2006-2008, R Oudkerk
  7. # Licensed to PSF under a Contributor Agreement.
  8. #
  9. __all__ = [ 'Client', 'Listener', 'Pipe' ]
  10. from queue import Queue
  11. families = [None]
  12. class Listener(object):
  13. def __init__(self, address=None, family=None, backlog=1):
  14. self._backlog_queue = Queue(backlog)
  15. def accept(self):
  16. return Connection(*self._backlog_queue.get())
  17. def close(self):
  18. self._backlog_queue = None
  19. @property
  20. def address(self):
  21. return self._backlog_queue
  22. def __enter__(self):
  23. return self
  24. def __exit__(self, exc_type, exc_value, exc_tb):
  25. self.close()
  26. def Client(address):
  27. _in, _out = Queue(), Queue()
  28. address.put((_out, _in))
  29. return Connection(_in, _out)
  30. def Pipe(duplex=True):
  31. a, b = Queue(), Queue()
  32. return Connection(a, b), Connection(b, a)
  33. class Connection(object):
  34. def __init__(self, _in, _out):
  35. self._out = _out
  36. self._in = _in
  37. self.send = self.send_bytes = _out.put
  38. self.recv = self.recv_bytes = _in.get
  39. def poll(self, timeout=0.0):
  40. if self._in.qsize() > 0:
  41. return True
  42. if timeout <= 0.0:
  43. return False
  44. with self._in.not_empty:
  45. self._in.not_empty.wait(timeout)
  46. return self._in.qsize() > 0
  47. def close(self):
  48. pass
  49. def __enter__(self):
  50. return self
  51. def __exit__(self, exc_type, exc_value, exc_tb):
  52. self.close()