123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500 |
- from libc.stdlib cimport (
- free,
- malloc,
- )
- import numpy as np
- cimport numpy as cnp
- from numpy cimport (
- int64_t,
- intp_t,
- ndarray,
- )
- cnp.import_array()
- from pandas._libs.util cimport (
- is_array,
- set_array_not_contiguous,
- )
- from pandas._libs.lib import is_scalar
- cdef cnp.dtype _dtype_obj = np.dtype("object")
- cpdef check_result_array(object obj, object dtype):
- # Our operation is supposed to be an aggregation/reduction. If
- # it returns an ndarray, this likely means an invalid operation has
- # been passed. See test_apply_without_aggregation, test_agg_must_agg
- if is_array(obj):
- if dtype != _dtype_obj:
- # If it is object dtype, the function can be a reduction/aggregation
- # and still return an ndarray e.g. test_agg_over_numpy_arrays
- raise ValueError("Must produce aggregated value")
- cdef class _BaseGrouper:
- cdef _check_dummy(self, object dummy):
- # both values and index must be an ndarray!
- values = dummy.values
- # GH 23683: datetimetz types are equivalent to datetime types here
- if (dummy.dtype != self.arr.dtype
- and values.dtype != self.arr.dtype):
- raise ValueError('Dummy array must be same dtype')
- if is_array(values) and not values.flags.contiguous:
- # e.g. Categorical has no `flags` attribute
- values = values.copy()
- index = dummy.index.values
- if not index.flags.contiguous:
- index = index.copy()
- return values, index
- cdef _init_dummy_series_and_index(self, Slider islider, Slider vslider):
- """
- Create Series and Index objects that we will alter in-place while iterating.
- """
- cached_index = self.ityp(islider.buf, dtype=self.idtype)
- cached_series = self.typ(
- vslider.buf, dtype=vslider.buf.dtype, index=cached_index, name=self.name
- )
- return cached_index, cached_series
- cdef inline _update_cached_objs(self, object cached_series, object cached_index,
- Slider islider, Slider vslider):
- # See the comment in indexes/base.py about _index_data.
- # We need this for EA-backed indexes that have a reference
- # to a 1-d ndarray like datetime / timedelta / period.
- cached_index._engine.clear_mapping()
- cached_index._cache.clear() # e.g. inferred_freq must go
- cached_series._mgr.set_values(vslider.buf)
- cdef inline object _apply_to_group(self,
- object cached_series, object cached_index,
- bint initialized):
- """
- Call self.f on our new group, then update to the next group.
- """
- cdef:
- object res
- # NB: we assume that _update_cached_objs has already cleared cleared
- # the cache and engine mapping
- res = self.f(cached_series)
- res = extract_result(res)
- if not initialized:
- # On the first pass, we check the output shape to see
- # if this looks like a reduction.
- initialized = True
- check_result_array(res, cached_series.dtype)
- return res, initialized
- cdef class SeriesBinGrouper(_BaseGrouper):
- """
- Performs grouping operation according to bin edges, rather than labels
- """
- cdef:
- Py_ssize_t nresults, ngroups
- cdef public:
- ndarray bins # ndarray[int64_t]
- ndarray arr, index, dummy_arr, dummy_index
- object values, f, typ, ityp, name, idtype
- def __init__(self, object series, object f, ndarray[int64_t] bins):
- assert len(bins) > 0 # otherwise we get IndexError in get_result
- self.bins = bins
- self.f = f
- values = series.values
- if is_array(values) and not values.flags.c_contiguous:
- # e.g. Categorical has no `flags` attribute
- values = values.copy('C')
- self.arr = values
- self.typ = series._constructor
- self.ityp = series.index._constructor
- self.idtype = series.index.dtype
- self.index = series.index.values
- self.name = series.name
- dummy = series.iloc[:0]
- self.dummy_arr, self.dummy_index = self._check_dummy(dummy)
- # kludge for #1688
- if len(bins) > 0 and bins[-1] == len(series):
- self.ngroups = len(bins)
- else:
- # TODO: not reached except in test_series_bin_grouper directly
- # constructing SeriesBinGrouper; can we rule this case out?
- self.ngroups = len(bins) + 1
- def get_result(self):
- cdef:
- ndarray arr, result
- ndarray[int64_t] counts
- Py_ssize_t i, n, group_size, start, end
- object res
- bint initialized = 0
- Slider vslider, islider
- object cached_series = None, cached_index = None
- counts = np.zeros(self.ngroups, dtype=np.int64)
- if self.ngroups > 0:
- counts[0] = self.bins[0]
- for i in range(1, self.ngroups):
- if i == self.ngroups - 1:
- counts[i] = len(self.arr) - self.bins[i - 1]
- else:
- counts[i] = self.bins[i] - self.bins[i - 1]
- group_size = 0
- n = len(self.arr)
- vslider = Slider(self.arr, self.dummy_arr)
- islider = Slider(self.index, self.dummy_index)
- result = np.empty(self.ngroups, dtype='O')
- cached_index, cached_series = self._init_dummy_series_and_index(
- islider, vslider
- )
- start = 0
- try:
- for i in range(self.ngroups):
- group_size = counts[i]
- end = start + group_size
- islider.move(start, end)
- vslider.move(start, end)
- self._update_cached_objs(
- cached_series, cached_index, islider, vslider)
- res, initialized = self._apply_to_group(cached_series, cached_index,
- initialized)
- start += group_size
- result[i] = res
- finally:
- # so we don't free the wrong memory
- islider.reset()
- vslider.reset()
- return result, counts
- cdef class SeriesGrouper(_BaseGrouper):
- """
- Performs generic grouping operation while avoiding ndarray construction
- overhead
- """
- cdef:
- Py_ssize_t nresults, ngroups
- cdef public:
- ndarray arr, index, dummy_arr, dummy_index
- object f, labels, values, typ, ityp, name, idtype
- def __init__(self, object series, object f, ndarray[intp_t] labels,
- Py_ssize_t ngroups):
- if len(series) == 0:
- # get_result would never assign `result`
- raise ValueError("SeriesGrouper requires non-empty `series`")
- self.labels = labels
- self.f = f
- values = series.values
- if is_array(values) and not values.flags.c_contiguous:
- # e.g. Categorical has no `flags` attribute
- values = values.copy('C')
- self.arr = values
- self.typ = series._constructor
- self.ityp = series.index._constructor
- self.idtype = series.index.dtype
- self.index = series.index.values
- self.name = series.name
- dummy = series.iloc[:0]
- self.dummy_arr, self.dummy_index = self._check_dummy(dummy)
- self.ngroups = ngroups
- def get_result(self):
- cdef:
- # Define result to avoid UnboundLocalError
- ndarray arr, result = None
- ndarray[intp_t] labels
- ndarray[int64_t] counts
- Py_ssize_t i, n, group_size, lab, start, end
- object res
- bint initialized = 0
- Slider vslider, islider
- object cached_series = None, cached_index = None
- labels = self.labels
- counts = np.zeros(self.ngroups, dtype=np.int64)
- group_size = 0
- n = len(self.arr)
- vslider = Slider(self.arr, self.dummy_arr)
- islider = Slider(self.index, self.dummy_index)
- result = np.empty(self.ngroups, dtype='O')
- cached_index, cached_series = self._init_dummy_series_and_index(
- islider, vslider
- )
- start = 0
- try:
- for i in range(n):
- group_size += 1
- lab = labels[i]
- if i == n - 1 or lab != labels[i + 1]:
- if lab == -1:
- start += group_size
- group_size = 0
- continue
- end = start + group_size
- islider.move(start, end)
- vslider.move(start, end)
- self._update_cached_objs(
- cached_series, cached_index, islider, vslider)
- res, initialized = self._apply_to_group(cached_series, cached_index,
- initialized)
- start += group_size
- result[lab] = res
- counts[lab] = group_size
- group_size = 0
- finally:
- # so we don't free the wrong memory
- islider.reset()
- vslider.reset()
- # We check for empty series in the constructor, so should always
- # have result initialized by this point.
- assert initialized, "`result` has not been initialized."
- return result, counts
- cpdef inline extract_result(object res):
- """ extract the result object, it might be a 0-dim ndarray
- or a len-1 0-dim, or a scalar """
- if hasattr(res, "_values"):
- # Preserve EA
- res = res._values
- if res.ndim == 1 and len(res) == 1:
- # see test_agg_lambda_with_timezone, test_resampler_grouper.py::test_apply
- res = res[0]
- if is_array(res):
- if res.ndim == 1 and len(res) == 1:
- # see test_resampler_grouper.py::test_apply
- res = res[0]
- return res
- cdef class Slider:
- """
- Only handles contiguous data for now
- """
- cdef:
- ndarray values, buf
- Py_ssize_t stride
- char *orig_data
- def __init__(self, ndarray values, ndarray buf):
- assert values.ndim == 1
- assert values.dtype == buf.dtype
- if not values.flags.contiguous:
- values = values.copy()
- self.values = values
- self.buf = buf
- self.stride = values.strides[0]
- self.orig_data = self.buf.data
- self.buf.data = self.values.data
- self.buf.strides[0] = self.stride
- cdef move(self, int start, int end):
- """
- For slicing
- """
- self.buf.data = self.values.data + self.stride * start
- self.buf.shape[0] = end - start
- cdef reset(self):
- self.buf.data = self.orig_data
- self.buf.shape[0] = 0
- def apply_frame_axis0(object frame, object f, object names,
- const int64_t[:] starts, const int64_t[:] ends):
- cdef:
- BlockSlider slider
- Py_ssize_t i, n = len(starts)
- list results
- object piece
- dict item_cache
- # We have already checked that we don't have a MultiIndex before calling
- assert frame.index.nlevels == 1
- results = []
- slider = BlockSlider(frame)
- mutated = False
- item_cache = slider.dummy._item_cache
- try:
- for i in range(n):
- slider.move(starts[i], ends[i])
- item_cache.clear() # ugh
- chunk = slider.dummy
- object.__setattr__(chunk, 'name', names[i])
- piece = f(chunk)
- # Need to infer if low level index slider will cause segfaults
- require_slow_apply = i == 0 and piece is chunk
- try:
- if piece.index is not chunk.index:
- mutated = True
- except AttributeError:
- # `piece` might not have an index, could be e.g. an int
- pass
- if not is_scalar(piece):
- # Need to copy data to avoid appending references
- try:
- piece = piece.copy(deep="all")
- except (TypeError, AttributeError):
- pass
- results.append(piece)
- # If the data was modified inplace we need to
- # take the slow path to not risk segfaults
- # we have already computed the first piece
- if require_slow_apply:
- break
- finally:
- slider.reset()
- return results, mutated
- cdef class BlockSlider:
- """
- Only capable of sliding on axis=0
- """
- cdef:
- object frame, dummy, index, block
- list blocks, blk_values
- ndarray orig_blklocs, orig_blknos
- ndarray values
- Slider idx_slider
- char **base_ptrs
- int nblocks
- Py_ssize_t i
- def __init__(self, object frame):
- self.frame = frame
- self.dummy = frame[:0]
- self.index = self.dummy.index
- # GH#35417 attributes we need to restore at each step in case
- # the function modified them.
- mgr = self.dummy._mgr
- self.orig_blklocs = mgr.blklocs
- self.orig_blknos = mgr.blknos
- self.blocks = [x for x in self.dummy._mgr.blocks]
- self.blk_values = [block.values for block in self.dummy._mgr.blocks]
- for values in self.blk_values:
- set_array_not_contiguous(values)
- self.nblocks = len(self.blk_values)
- # See the comment in indexes/base.py about _index_data.
- # We need this for EA-backed indexes that have a reference to a 1-d
- # ndarray like datetime / timedelta / period.
- self.idx_slider = Slider(
- self.frame.index._index_data, self.dummy.index._index_data)
- self.base_ptrs = <char**>malloc(sizeof(char*) * self.nblocks)
- for i, block in enumerate(self.blk_values):
- self.base_ptrs[i] = (<ndarray>block).data
- def __dealloc__(self):
- free(self.base_ptrs)
- cdef move(self, int start, int end):
- cdef:
- ndarray arr
- Py_ssize_t i
- self._restore_blocks()
- # move blocks
- for i in range(self.nblocks):
- arr = self.blk_values[i]
- # axis=1 is the frame's axis=0
- arr.data = self.base_ptrs[i] + arr.strides[1] * start
- arr.shape[1] = end - start
- # move and set the index
- self.idx_slider.move(start, end)
- object.__setattr__(self.index, '_index_data', self.idx_slider.buf)
- self.index._engine.clear_mapping()
- self.index._cache.clear() # e.g. inferred_freq must go
- cdef reset(self):
- cdef:
- ndarray arr
- Py_ssize_t i
- self._restore_blocks()
- for i in range(self.nblocks):
- arr = self.blk_values[i]
- # axis=1 is the frame's axis=0
- arr.data = self.base_ptrs[i]
- arr.shape[1] = 0
- cdef _restore_blocks(self):
- """
- Ensure that we have the original blocks, blknos, and blklocs.
- """
- mgr = self.dummy._mgr
- mgr.blocks = tuple(self.blocks)
- mgr._blklocs = self.orig_blklocs
- mgr._blknos = self.orig_blknos
|