123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252 |
- __all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty')
- import collections
- import heapq
- import warnings
- from types import GenericAlias
- from . import events
- from . import locks
- class QueueEmpty(Exception):
- """Raised when Queue.get_nowait() is called on an empty Queue."""
- pass
- class QueueFull(Exception):
- """Raised when the Queue.put_nowait() method is called on a full Queue."""
- pass
- class Queue:
- """A queue, useful for coordinating producer and consumer coroutines.
- If maxsize is less than or equal to zero, the queue size is infinite. If it
- is an integer greater than 0, then "await put()" will block when the
- queue reaches maxsize, until an item is removed by get().
- Unlike the standard library Queue, you can reliably know this Queue's size
- with qsize(), since your single-threaded asyncio application won't be
- interrupted between calling qsize() and doing an operation on the Queue.
- """
- def __init__(self, maxsize=0, *, loop=None):
- if loop is None:
- self._loop = events.get_event_loop()
- else:
- self._loop = loop
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
- self._maxsize = maxsize
- # Futures.
- self._getters = collections.deque()
- # Futures.
- self._putters = collections.deque()
- self._unfinished_tasks = 0
- self._finished = locks.Event(loop=loop)
- self._finished.set()
- self._init(maxsize)
- # These three are overridable in subclasses.
- def _init(self, maxsize):
- self._queue = collections.deque()
- def _get(self):
- return self._queue.popleft()
- def _put(self, item):
- self._queue.append(item)
- # End of the overridable methods.
- def _wakeup_next(self, waiters):
- # Wake up the next waiter (if any) that isn't cancelled.
- while waiters:
- waiter = waiters.popleft()
- if not waiter.done():
- waiter.set_result(None)
- break
- def __repr__(self):
- return f'<{type(self).__name__} at {id(self):#x} {self._format()}>'
- def __str__(self):
- return f'<{type(self).__name__} {self._format()}>'
- __class_getitem__ = classmethod(GenericAlias)
- def _format(self):
- result = f'maxsize={self._maxsize!r}'
- if getattr(self, '_queue', None):
- result += f' _queue={list(self._queue)!r}'
- if self._getters:
- result += f' _getters[{len(self._getters)}]'
- if self._putters:
- result += f' _putters[{len(self._putters)}]'
- if self._unfinished_tasks:
- result += f' tasks={self._unfinished_tasks}'
- return result
- def qsize(self):
- """Number of items in the queue."""
- return len(self._queue)
- @property
- def maxsize(self):
- """Number of items allowed in the queue."""
- return self._maxsize
- def empty(self):
- """Return True if the queue is empty, False otherwise."""
- return not self._queue
- def full(self):
- """Return True if there are maxsize items in the queue.
- Note: if the Queue was initialized with maxsize=0 (the default),
- then full() is never True.
- """
- if self._maxsize <= 0:
- return False
- else:
- return self.qsize() >= self._maxsize
- async def put(self, item):
- """Put an item into the queue.
- Put an item into the queue. If the queue is full, wait until a free
- slot is available before adding item.
- """
- while self.full():
- putter = self._loop.create_future()
- self._putters.append(putter)
- try:
- await putter
- except:
- putter.cancel() # Just in case putter is not done yet.
- try:
- # Clean self._putters from canceled putters.
- self._putters.remove(putter)
- except ValueError:
- # The putter could be removed from self._putters by a
- # previous get_nowait call.
- pass
- if not self.full() and not putter.cancelled():
- # We were woken up by get_nowait(), but can't take
- # the call. Wake up the next in line.
- self._wakeup_next(self._putters)
- raise
- return self.put_nowait(item)
- def put_nowait(self, item):
- """Put an item into the queue without blocking.
- If no free slot is immediately available, raise QueueFull.
- """
- if self.full():
- raise QueueFull
- self._put(item)
- self._unfinished_tasks += 1
- self._finished.clear()
- self._wakeup_next(self._getters)
- async def get(self):
- """Remove and return an item from the queue.
- If queue is empty, wait until an item is available.
- """
- while self.empty():
- getter = self._loop.create_future()
- self._getters.append(getter)
- try:
- await getter
- except:
- getter.cancel() # Just in case getter is not done yet.
- try:
- # Clean self._getters from canceled getters.
- self._getters.remove(getter)
- except ValueError:
- # The getter could be removed from self._getters by a
- # previous put_nowait call.
- pass
- if not self.empty() and not getter.cancelled():
- # We were woken up by put_nowait(), but can't take
- # the call. Wake up the next in line.
- self._wakeup_next(self._getters)
- raise
- return self.get_nowait()
- def get_nowait(self):
- """Remove and return an item from the queue.
- Return an item if one is immediately available, else raise QueueEmpty.
- """
- if self.empty():
- raise QueueEmpty
- item = self._get()
- self._wakeup_next(self._putters)
- return item
- def task_done(self):
- """Indicate that a formerly enqueued task is complete.
- Used by queue consumers. For each get() used to fetch a task,
- a subsequent call to task_done() tells the queue that the processing
- on the task is complete.
- If a join() is currently blocking, it will resume when all items have
- been processed (meaning that a task_done() call was received for every
- item that had been put() into the queue).
- Raises ValueError if called more times than there were items placed in
- the queue.
- """
- if self._unfinished_tasks <= 0:
- raise ValueError('task_done() called too many times')
- self._unfinished_tasks -= 1
- if self._unfinished_tasks == 0:
- self._finished.set()
- async def join(self):
- """Block until all items in the queue have been gotten and processed.
- The count of unfinished tasks goes up whenever an item is added to the
- queue. The count goes down whenever a consumer calls task_done() to
- indicate that the item was retrieved and all work on it is complete.
- When the count of unfinished tasks drops to zero, join() unblocks.
- """
- if self._unfinished_tasks > 0:
- await self._finished.wait()
- class PriorityQueue(Queue):
- """A subclass of Queue; retrieves entries in priority order (lowest first).
- Entries are typically tuples of the form: (priority number, data).
- """
- def _init(self, maxsize):
- self._queue = []
- def _put(self, item, heappush=heapq.heappush):
- heappush(self._queue, item)
- def _get(self, heappop=heapq.heappop):
- return heappop(self._queue)
- class LifoQueue(Queue):
- """A subclass of Queue that retrieves most recently added entries first."""
- def _init(self, maxsize):
- self._queue = []
- def _put(self, item):
- self._queue.append(item)
- def _get(self):
- return self._queue.pop()
|