reduction.pyx 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500
  1. from libc.stdlib cimport (
  2. free,
  3. malloc,
  4. )
  5. import numpy as np
  6. cimport numpy as cnp
  7. from numpy cimport (
  8. int64_t,
  9. intp_t,
  10. ndarray,
  11. )
  12. cnp.import_array()
  13. from pandas._libs.util cimport (
  14. is_array,
  15. set_array_not_contiguous,
  16. )
  17. from pandas._libs.lib import is_scalar
  18. cdef cnp.dtype _dtype_obj = np.dtype("object")
  19. cpdef check_result_array(object obj, object dtype):
  20. # Our operation is supposed to be an aggregation/reduction. If
  21. # it returns an ndarray, this likely means an invalid operation has
  22. # been passed. See test_apply_without_aggregation, test_agg_must_agg
  23. if is_array(obj):
  24. if dtype != _dtype_obj:
  25. # If it is object dtype, the function can be a reduction/aggregation
  26. # and still return an ndarray e.g. test_agg_over_numpy_arrays
  27. raise ValueError("Must produce aggregated value")
  28. cdef class _BaseGrouper:
  29. cdef _check_dummy(self, object dummy):
  30. # both values and index must be an ndarray!
  31. values = dummy.values
  32. # GH 23683: datetimetz types are equivalent to datetime types here
  33. if (dummy.dtype != self.arr.dtype
  34. and values.dtype != self.arr.dtype):
  35. raise ValueError('Dummy array must be same dtype')
  36. if is_array(values) and not values.flags.contiguous:
  37. # e.g. Categorical has no `flags` attribute
  38. values = values.copy()
  39. index = dummy.index.values
  40. if not index.flags.contiguous:
  41. index = index.copy()
  42. return values, index
  43. cdef _init_dummy_series_and_index(self, Slider islider, Slider vslider):
  44. """
  45. Create Series and Index objects that we will alter in-place while iterating.
  46. """
  47. cached_index = self.ityp(islider.buf, dtype=self.idtype)
  48. cached_series = self.typ(
  49. vslider.buf, dtype=vslider.buf.dtype, index=cached_index, name=self.name
  50. )
  51. return cached_index, cached_series
  52. cdef inline _update_cached_objs(self, object cached_series, object cached_index,
  53. Slider islider, Slider vslider):
  54. # See the comment in indexes/base.py about _index_data.
  55. # We need this for EA-backed indexes that have a reference
  56. # to a 1-d ndarray like datetime / timedelta / period.
  57. cached_index._engine.clear_mapping()
  58. cached_index._cache.clear() # e.g. inferred_freq must go
  59. cached_series._mgr.set_values(vslider.buf)
  60. cdef inline object _apply_to_group(self,
  61. object cached_series, object cached_index,
  62. bint initialized):
  63. """
  64. Call self.f on our new group, then update to the next group.
  65. """
  66. cdef:
  67. object res
  68. # NB: we assume that _update_cached_objs has already cleared cleared
  69. # the cache and engine mapping
  70. res = self.f(cached_series)
  71. res = extract_result(res)
  72. if not initialized:
  73. # On the first pass, we check the output shape to see
  74. # if this looks like a reduction.
  75. initialized = True
  76. check_result_array(res, cached_series.dtype)
  77. return res, initialized
  78. cdef class SeriesBinGrouper(_BaseGrouper):
  79. """
  80. Performs grouping operation according to bin edges, rather than labels
  81. """
  82. cdef:
  83. Py_ssize_t nresults, ngroups
  84. cdef public:
  85. ndarray bins # ndarray[int64_t]
  86. ndarray arr, index, dummy_arr, dummy_index
  87. object values, f, typ, ityp, name, idtype
  88. def __init__(self, object series, object f, ndarray[int64_t] bins):
  89. assert len(bins) > 0 # otherwise we get IndexError in get_result
  90. self.bins = bins
  91. self.f = f
  92. values = series.values
  93. if is_array(values) and not values.flags.c_contiguous:
  94. # e.g. Categorical has no `flags` attribute
  95. values = values.copy('C')
  96. self.arr = values
  97. self.typ = series._constructor
  98. self.ityp = series.index._constructor
  99. self.idtype = series.index.dtype
  100. self.index = series.index.values
  101. self.name = series.name
  102. dummy = series.iloc[:0]
  103. self.dummy_arr, self.dummy_index = self._check_dummy(dummy)
  104. # kludge for #1688
  105. if len(bins) > 0 and bins[-1] == len(series):
  106. self.ngroups = len(bins)
  107. else:
  108. # TODO: not reached except in test_series_bin_grouper directly
  109. # constructing SeriesBinGrouper; can we rule this case out?
  110. self.ngroups = len(bins) + 1
  111. def get_result(self):
  112. cdef:
  113. ndarray arr, result
  114. ndarray[int64_t] counts
  115. Py_ssize_t i, n, group_size, start, end
  116. object res
  117. bint initialized = 0
  118. Slider vslider, islider
  119. object cached_series = None, cached_index = None
  120. counts = np.zeros(self.ngroups, dtype=np.int64)
  121. if self.ngroups > 0:
  122. counts[0] = self.bins[0]
  123. for i in range(1, self.ngroups):
  124. if i == self.ngroups - 1:
  125. counts[i] = len(self.arr) - self.bins[i - 1]
  126. else:
  127. counts[i] = self.bins[i] - self.bins[i - 1]
  128. group_size = 0
  129. n = len(self.arr)
  130. vslider = Slider(self.arr, self.dummy_arr)
  131. islider = Slider(self.index, self.dummy_index)
  132. result = np.empty(self.ngroups, dtype='O')
  133. cached_index, cached_series = self._init_dummy_series_and_index(
  134. islider, vslider
  135. )
  136. start = 0
  137. try:
  138. for i in range(self.ngroups):
  139. group_size = counts[i]
  140. end = start + group_size
  141. islider.move(start, end)
  142. vslider.move(start, end)
  143. self._update_cached_objs(
  144. cached_series, cached_index, islider, vslider)
  145. res, initialized = self._apply_to_group(cached_series, cached_index,
  146. initialized)
  147. start += group_size
  148. result[i] = res
  149. finally:
  150. # so we don't free the wrong memory
  151. islider.reset()
  152. vslider.reset()
  153. return result, counts
  154. cdef class SeriesGrouper(_BaseGrouper):
  155. """
  156. Performs generic grouping operation while avoiding ndarray construction
  157. overhead
  158. """
  159. cdef:
  160. Py_ssize_t nresults, ngroups
  161. cdef public:
  162. ndarray arr, index, dummy_arr, dummy_index
  163. object f, labels, values, typ, ityp, name, idtype
  164. def __init__(self, object series, object f, ndarray[intp_t] labels,
  165. Py_ssize_t ngroups):
  166. if len(series) == 0:
  167. # get_result would never assign `result`
  168. raise ValueError("SeriesGrouper requires non-empty `series`")
  169. self.labels = labels
  170. self.f = f
  171. values = series.values
  172. if is_array(values) and not values.flags.c_contiguous:
  173. # e.g. Categorical has no `flags` attribute
  174. values = values.copy('C')
  175. self.arr = values
  176. self.typ = series._constructor
  177. self.ityp = series.index._constructor
  178. self.idtype = series.index.dtype
  179. self.index = series.index.values
  180. self.name = series.name
  181. dummy = series.iloc[:0]
  182. self.dummy_arr, self.dummy_index = self._check_dummy(dummy)
  183. self.ngroups = ngroups
  184. def get_result(self):
  185. cdef:
  186. # Define result to avoid UnboundLocalError
  187. ndarray arr, result = None
  188. ndarray[intp_t] labels
  189. ndarray[int64_t] counts
  190. Py_ssize_t i, n, group_size, lab, start, end
  191. object res
  192. bint initialized = 0
  193. Slider vslider, islider
  194. object cached_series = None, cached_index = None
  195. labels = self.labels
  196. counts = np.zeros(self.ngroups, dtype=np.int64)
  197. group_size = 0
  198. n = len(self.arr)
  199. vslider = Slider(self.arr, self.dummy_arr)
  200. islider = Slider(self.index, self.dummy_index)
  201. result = np.empty(self.ngroups, dtype='O')
  202. cached_index, cached_series = self._init_dummy_series_and_index(
  203. islider, vslider
  204. )
  205. start = 0
  206. try:
  207. for i in range(n):
  208. group_size += 1
  209. lab = labels[i]
  210. if i == n - 1 or lab != labels[i + 1]:
  211. if lab == -1:
  212. start += group_size
  213. group_size = 0
  214. continue
  215. end = start + group_size
  216. islider.move(start, end)
  217. vslider.move(start, end)
  218. self._update_cached_objs(
  219. cached_series, cached_index, islider, vslider)
  220. res, initialized = self._apply_to_group(cached_series, cached_index,
  221. initialized)
  222. start += group_size
  223. result[lab] = res
  224. counts[lab] = group_size
  225. group_size = 0
  226. finally:
  227. # so we don't free the wrong memory
  228. islider.reset()
  229. vslider.reset()
  230. # We check for empty series in the constructor, so should always
  231. # have result initialized by this point.
  232. assert initialized, "`result` has not been initialized."
  233. return result, counts
  234. cpdef inline extract_result(object res):
  235. """ extract the result object, it might be a 0-dim ndarray
  236. or a len-1 0-dim, or a scalar """
  237. if hasattr(res, "_values"):
  238. # Preserve EA
  239. res = res._values
  240. if res.ndim == 1 and len(res) == 1:
  241. # see test_agg_lambda_with_timezone, test_resampler_grouper.py::test_apply
  242. res = res[0]
  243. if is_array(res):
  244. if res.ndim == 1 and len(res) == 1:
  245. # see test_resampler_grouper.py::test_apply
  246. res = res[0]
  247. return res
  248. cdef class Slider:
  249. """
  250. Only handles contiguous data for now
  251. """
  252. cdef:
  253. ndarray values, buf
  254. Py_ssize_t stride
  255. char *orig_data
  256. def __init__(self, ndarray values, ndarray buf):
  257. assert values.ndim == 1
  258. assert values.dtype == buf.dtype
  259. if not values.flags.contiguous:
  260. values = values.copy()
  261. self.values = values
  262. self.buf = buf
  263. self.stride = values.strides[0]
  264. self.orig_data = self.buf.data
  265. self.buf.data = self.values.data
  266. self.buf.strides[0] = self.stride
  267. cdef move(self, int start, int end):
  268. """
  269. For slicing
  270. """
  271. self.buf.data = self.values.data + self.stride * start
  272. self.buf.shape[0] = end - start
  273. cdef reset(self):
  274. self.buf.data = self.orig_data
  275. self.buf.shape[0] = 0
  276. def apply_frame_axis0(object frame, object f, object names,
  277. const int64_t[:] starts, const int64_t[:] ends):
  278. cdef:
  279. BlockSlider slider
  280. Py_ssize_t i, n = len(starts)
  281. list results
  282. object piece
  283. dict item_cache
  284. # We have already checked that we don't have a MultiIndex before calling
  285. assert frame.index.nlevels == 1
  286. results = []
  287. slider = BlockSlider(frame)
  288. mutated = False
  289. item_cache = slider.dummy._item_cache
  290. try:
  291. for i in range(n):
  292. slider.move(starts[i], ends[i])
  293. item_cache.clear() # ugh
  294. chunk = slider.dummy
  295. object.__setattr__(chunk, 'name', names[i])
  296. piece = f(chunk)
  297. # Need to infer if low level index slider will cause segfaults
  298. require_slow_apply = i == 0 and piece is chunk
  299. try:
  300. if piece.index is not chunk.index:
  301. mutated = True
  302. except AttributeError:
  303. # `piece` might not have an index, could be e.g. an int
  304. pass
  305. if not is_scalar(piece):
  306. # Need to copy data to avoid appending references
  307. try:
  308. piece = piece.copy(deep="all")
  309. except (TypeError, AttributeError):
  310. pass
  311. results.append(piece)
  312. # If the data was modified inplace we need to
  313. # take the slow path to not risk segfaults
  314. # we have already computed the first piece
  315. if require_slow_apply:
  316. break
  317. finally:
  318. slider.reset()
  319. return results, mutated
  320. cdef class BlockSlider:
  321. """
  322. Only capable of sliding on axis=0
  323. """
  324. cdef:
  325. object frame, dummy, index, block
  326. list blocks, blk_values
  327. ndarray orig_blklocs, orig_blknos
  328. ndarray values
  329. Slider idx_slider
  330. char **base_ptrs
  331. int nblocks
  332. Py_ssize_t i
  333. def __init__(self, object frame):
  334. self.frame = frame
  335. self.dummy = frame[:0]
  336. self.index = self.dummy.index
  337. # GH#35417 attributes we need to restore at each step in case
  338. # the function modified them.
  339. mgr = self.dummy._mgr
  340. self.orig_blklocs = mgr.blklocs
  341. self.orig_blknos = mgr.blknos
  342. self.blocks = [x for x in self.dummy._mgr.blocks]
  343. self.blk_values = [block.values for block in self.dummy._mgr.blocks]
  344. for values in self.blk_values:
  345. set_array_not_contiguous(values)
  346. self.nblocks = len(self.blk_values)
  347. # See the comment in indexes/base.py about _index_data.
  348. # We need this for EA-backed indexes that have a reference to a 1-d
  349. # ndarray like datetime / timedelta / period.
  350. self.idx_slider = Slider(
  351. self.frame.index._index_data, self.dummy.index._index_data)
  352. self.base_ptrs = <char**>malloc(sizeof(char*) * self.nblocks)
  353. for i, block in enumerate(self.blk_values):
  354. self.base_ptrs[i] = (<ndarray>block).data
  355. def __dealloc__(self):
  356. free(self.base_ptrs)
  357. cdef move(self, int start, int end):
  358. cdef:
  359. ndarray arr
  360. Py_ssize_t i
  361. self._restore_blocks()
  362. # move blocks
  363. for i in range(self.nblocks):
  364. arr = self.blk_values[i]
  365. # axis=1 is the frame's axis=0
  366. arr.data = self.base_ptrs[i] + arr.strides[1] * start
  367. arr.shape[1] = end - start
  368. # move and set the index
  369. self.idx_slider.move(start, end)
  370. object.__setattr__(self.index, '_index_data', self.idx_slider.buf)
  371. self.index._engine.clear_mapping()
  372. self.index._cache.clear() # e.g. inferred_freq must go
  373. cdef reset(self):
  374. cdef:
  375. ndarray arr
  376. Py_ssize_t i
  377. self._restore_blocks()
  378. for i in range(self.nblocks):
  379. arr = self.blk_values[i]
  380. # axis=1 is the frame's axis=0
  381. arr.data = self.base_ptrs[i]
  382. arr.shape[1] = 0
  383. cdef _restore_blocks(self):
  384. """
  385. Ensure that we have the original blocks, blknos, and blklocs.
  386. """
  387. mgr = self.dummy._mgr
  388. mgr.blocks = tuple(self.blocks)
  389. mgr._blklocs = self.orig_blklocs
  390. mgr._blknos = self.orig_blknos