heap.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. #
  2. # Module which supports allocation of memory from an mmap
  3. #
  4. # multiprocessing/heap.py
  5. #
  6. # Copyright (c) 2006-2008, R Oudkerk
  7. # Licensed to PSF under a Contributor Agreement.
  8. #
  9. import bisect
  10. from collections import defaultdict
  11. import mmap
  12. import os
  13. import sys
  14. import tempfile
  15. import threading
  16. from .context import reduction, assert_spawning
  17. from . import util
  18. __all__ = ['BufferWrapper']
  19. #
  20. # Inheritable class which wraps an mmap, and from which blocks can be allocated
  21. #
  22. if sys.platform == 'win32':
  23. import _winapi
  24. class Arena(object):
  25. """
  26. A shared memory area backed by anonymous memory (Windows).
  27. """
  28. _rand = tempfile._RandomNameSequence()
  29. def __init__(self, size):
  30. self.size = size
  31. for i in range(100):
  32. name = 'pym-%d-%s' % (os.getpid(), next(self._rand))
  33. buf = mmap.mmap(-1, size, tagname=name)
  34. if _winapi.GetLastError() == 0:
  35. break
  36. # We have reopened a preexisting mmap.
  37. buf.close()
  38. else:
  39. raise FileExistsError('Cannot find name for new mmap')
  40. self.name = name
  41. self.buffer = buf
  42. self._state = (self.size, self.name)
  43. def __getstate__(self):
  44. assert_spawning(self)
  45. return self._state
  46. def __setstate__(self, state):
  47. self.size, self.name = self._state = state
  48. # Reopen existing mmap
  49. self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
  50. # XXX Temporarily preventing buildbot failures while determining
  51. # XXX the correct long-term fix. See issue 23060
  52. #assert _winapi.GetLastError() == _winapi.ERROR_ALREADY_EXISTS
  53. else:
  54. class Arena(object):
  55. """
  56. A shared memory area backed by a temporary file (POSIX).
  57. """
  58. if sys.platform == 'linux':
  59. _dir_candidates = ['/dev/shm']
  60. else:
  61. _dir_candidates = []
  62. def __init__(self, size, fd=-1):
  63. self.size = size
  64. self.fd = fd
  65. if fd == -1:
  66. # Arena is created anew (if fd != -1, it means we're coming
  67. # from rebuild_arena() below)
  68. self.fd, name = tempfile.mkstemp(
  69. prefix='pym-%d-'%os.getpid(),
  70. dir=self._choose_dir(size))
  71. os.unlink(name)
  72. util.Finalize(self, os.close, (self.fd,))
  73. os.ftruncate(self.fd, size)
  74. self.buffer = mmap.mmap(self.fd, self.size)
  75. def _choose_dir(self, size):
  76. # Choose a non-storage backed directory if possible,
  77. # to improve performance
  78. for d in self._dir_candidates:
  79. st = os.statvfs(d)
  80. if st.f_bavail * st.f_frsize >= size: # enough free space?
  81. return d
  82. return util.get_temp_dir()
  83. def reduce_arena(a):
  84. if a.fd == -1:
  85. raise ValueError('Arena is unpicklable because '
  86. 'forking was enabled when it was created')
  87. return rebuild_arena, (a.size, reduction.DupFd(a.fd))
  88. def rebuild_arena(size, dupfd):
  89. return Arena(size, dupfd.detach())
  90. reduction.register(Arena, reduce_arena)
  91. #
  92. # Class allowing allocation of chunks of memory from arenas
  93. #
  94. class Heap(object):
  95. # Minimum malloc() alignment
  96. _alignment = 8
  97. _DISCARD_FREE_SPACE_LARGER_THAN = 4 * 1024 ** 2 # 4 MB
  98. _DOUBLE_ARENA_SIZE_UNTIL = 4 * 1024 ** 2
  99. def __init__(self, size=mmap.PAGESIZE):
  100. self._lastpid = os.getpid()
  101. self._lock = threading.Lock()
  102. # Current arena allocation size
  103. self._size = size
  104. # A sorted list of available block sizes in arenas
  105. self._lengths = []
  106. # Free block management:
  107. # - map each block size to a list of `(Arena, start, stop)` blocks
  108. self._len_to_seq = {}
  109. # - map `(Arena, start)` tuple to the `(Arena, start, stop)` block
  110. # starting at that offset
  111. self._start_to_block = {}
  112. # - map `(Arena, stop)` tuple to the `(Arena, start, stop)` block
  113. # ending at that offset
  114. self._stop_to_block = {}
  115. # Map arenas to their `(Arena, start, stop)` blocks in use
  116. self._allocated_blocks = defaultdict(set)
  117. self._arenas = []
  118. # List of pending blocks to free - see comment in free() below
  119. self._pending_free_blocks = []
  120. # Statistics
  121. self._n_mallocs = 0
  122. self._n_frees = 0
  123. @staticmethod
  124. def _roundup(n, alignment):
  125. # alignment must be a power of 2
  126. mask = alignment - 1
  127. return (n + mask) & ~mask
  128. def _new_arena(self, size):
  129. # Create a new arena with at least the given *size*
  130. length = self._roundup(max(self._size, size), mmap.PAGESIZE)
  131. # We carve larger and larger arenas, for efficiency, until we
  132. # reach a large-ish size (roughly L3 cache-sized)
  133. if self._size < self._DOUBLE_ARENA_SIZE_UNTIL:
  134. self._size *= 2
  135. util.info('allocating a new mmap of length %d', length)
  136. arena = Arena(length)
  137. self._arenas.append(arena)
  138. return (arena, 0, length)
  139. def _discard_arena(self, arena):
  140. # Possibly delete the given (unused) arena
  141. length = arena.size
  142. # Reusing an existing arena is faster than creating a new one, so
  143. # we only reclaim space if it's large enough.
  144. if length < self._DISCARD_FREE_SPACE_LARGER_THAN:
  145. return
  146. blocks = self._allocated_blocks.pop(arena)
  147. assert not blocks
  148. del self._start_to_block[(arena, 0)]
  149. del self._stop_to_block[(arena, length)]
  150. self._arenas.remove(arena)
  151. seq = self._len_to_seq[length]
  152. seq.remove((arena, 0, length))
  153. if not seq:
  154. del self._len_to_seq[length]
  155. self._lengths.remove(length)
  156. def _malloc(self, size):
  157. # returns a large enough block -- it might be much larger
  158. i = bisect.bisect_left(self._lengths, size)
  159. if i == len(self._lengths):
  160. return self._new_arena(size)
  161. else:
  162. length = self._lengths[i]
  163. seq = self._len_to_seq[length]
  164. block = seq.pop()
  165. if not seq:
  166. del self._len_to_seq[length], self._lengths[i]
  167. (arena, start, stop) = block
  168. del self._start_to_block[(arena, start)]
  169. del self._stop_to_block[(arena, stop)]
  170. return block
  171. def _add_free_block(self, block):
  172. # make block available and try to merge with its neighbours in the arena
  173. (arena, start, stop) = block
  174. try:
  175. prev_block = self._stop_to_block[(arena, start)]
  176. except KeyError:
  177. pass
  178. else:
  179. start, _ = self._absorb(prev_block)
  180. try:
  181. next_block = self._start_to_block[(arena, stop)]
  182. except KeyError:
  183. pass
  184. else:
  185. _, stop = self._absorb(next_block)
  186. block = (arena, start, stop)
  187. length = stop - start
  188. try:
  189. self._len_to_seq[length].append(block)
  190. except KeyError:
  191. self._len_to_seq[length] = [block]
  192. bisect.insort(self._lengths, length)
  193. self._start_to_block[(arena, start)] = block
  194. self._stop_to_block[(arena, stop)] = block
  195. def _absorb(self, block):
  196. # deregister this block so it can be merged with a neighbour
  197. (arena, start, stop) = block
  198. del self._start_to_block[(arena, start)]
  199. del self._stop_to_block[(arena, stop)]
  200. length = stop - start
  201. seq = self._len_to_seq[length]
  202. seq.remove(block)
  203. if not seq:
  204. del self._len_to_seq[length]
  205. self._lengths.remove(length)
  206. return start, stop
  207. def _remove_allocated_block(self, block):
  208. arena, start, stop = block
  209. blocks = self._allocated_blocks[arena]
  210. blocks.remove((start, stop))
  211. if not blocks:
  212. # Arena is entirely free, discard it from this process
  213. self._discard_arena(arena)
  214. def _free_pending_blocks(self):
  215. # Free all the blocks in the pending list - called with the lock held.
  216. while True:
  217. try:
  218. block = self._pending_free_blocks.pop()
  219. except IndexError:
  220. break
  221. self._add_free_block(block)
  222. self._remove_allocated_block(block)
  223. def free(self, block):
  224. # free a block returned by malloc()
  225. # Since free() can be called asynchronously by the GC, it could happen
  226. # that it's called while self._lock is held: in that case,
  227. # self._lock.acquire() would deadlock (issue #12352). To avoid that, a
  228. # trylock is used instead, and if the lock can't be acquired
  229. # immediately, the block is added to a list of blocks to be freed
  230. # synchronously sometimes later from malloc() or free(), by calling
  231. # _free_pending_blocks() (appending and retrieving from a list is not
  232. # strictly thread-safe but under CPython it's atomic thanks to the GIL).
  233. if os.getpid() != self._lastpid:
  234. raise ValueError(
  235. "My pid ({0:n}) is not last pid {1:n}".format(
  236. os.getpid(),self._lastpid))
  237. if not self._lock.acquire(False):
  238. # can't acquire the lock right now, add the block to the list of
  239. # pending blocks to free
  240. self._pending_free_blocks.append(block)
  241. else:
  242. # we hold the lock
  243. try:
  244. self._n_frees += 1
  245. self._free_pending_blocks()
  246. self._add_free_block(block)
  247. self._remove_allocated_block(block)
  248. finally:
  249. self._lock.release()
  250. def malloc(self, size):
  251. # return a block of right size (possibly rounded up)
  252. if size < 0:
  253. raise ValueError("Size {0:n} out of range".format(size))
  254. if sys.maxsize <= size:
  255. raise OverflowError("Size {0:n} too large".format(size))
  256. if os.getpid() != self._lastpid:
  257. self.__init__() # reinitialize after fork
  258. with self._lock:
  259. self._n_mallocs += 1
  260. # allow pending blocks to be marked available
  261. self._free_pending_blocks()
  262. size = self._roundup(max(size, 1), self._alignment)
  263. (arena, start, stop) = self._malloc(size)
  264. real_stop = start + size
  265. if real_stop < stop:
  266. # if the returned block is larger than necessary, mark
  267. # the remainder available
  268. self._add_free_block((arena, real_stop, stop))
  269. self._allocated_blocks[arena].add((start, real_stop))
  270. return (arena, start, real_stop)
  271. #
  272. # Class wrapping a block allocated out of a Heap -- can be inherited by child process
  273. #
  274. class BufferWrapper(object):
  275. _heap = Heap()
  276. def __init__(self, size):
  277. if size < 0:
  278. raise ValueError("Size {0:n} out of range".format(size))
  279. if sys.maxsize <= size:
  280. raise OverflowError("Size {0:n} too large".format(size))
  281. block = BufferWrapper._heap.malloc(size)
  282. self._state = (block, size)
  283. util.Finalize(self, BufferWrapper._heap.free, args=(block,))
  284. def create_memoryview(self):
  285. (arena, start, stop), size = self._state
  286. return memoryview(arena.buffer)[start:start+size]