123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- """
- This file is part of the openPMD-api.
- Copyright 2021 openPMD contributors
- Authors: Axel Huebl
- License: LGPLv3+
- """
- import math
- import numpy as np
- try:
- from dask.array import from_array
- found_dask = True
- except ImportError:
- found_dask = False
- class DaskRecordComponent:
- # shape, .ndim, .dtype and support numpy-style slicing
- def __init__(self, record_component):
- self.rc = record_component
- @property
- def shape(self):
- # fixme: https://github.com/openPMD/openPMD-api/issues/808
- return tuple(self.rc.shape)
- @property
- def ndim(self):
- return self.rc.ndim
- @property
- def dtype(self):
- return self.rc.dtype
- def __getitem__(self, slices):
- """here we support what Record_Component implements: a tuple of slices,
- a slice or an index; we do not support fancy indexing
- """
- # FIXME: implement handling of zero-slices in Record_Component
- # https://github.com/openPMD/openPMD-api/issues/957
- all_zero = True
- for s in slices:
- if s != np.s_[0:0]:
- all_zero = False
- if all_zero:
- return np.array([], dtype=self.dtype)
- data = self.rc[slices]
- self.rc.series_flush()
- if not math.isclose(1.0, self.rc.unit_SI):
- data = np.multiply(data, self.rc.unit_SI)
- return data
- def record_component_to_daskarray(record_component):
- """
- Load a RecordComponent into a Dask.array.
- Parameters
- ----------
- record_component : openpmd_api.Record_Component
- A record component class in openPMD-api.
- Returns
- -------
- dask.array
- A dask array.
- Raises
- ------
- ImportError
- Raises an exception if dask is not installed
- See Also
- --------
- openpmd_api.BaseRecordComponent.available_chunks : available chunks that
- are used internally to parallelize reading
- dask.array : the (potentially distributed) array object created here
- """
- if not found_dask:
- raise ImportError("dask NOT found. Install dask for Dask DataFrame "
- "support.")
- # get optimal chunks
- chunks = record_component.available_chunks()
- # sort and prepare the chunks for Dask's array API
- # https://docs.dask.org/en/latest/array-chunks.html
- # https://docs.dask.org/en/latest/array-api.html?highlight=from_array#other-functions
- # sorted and unique
- offsets_per_dim = list(map(list, zip(*[chunk.offset for chunk in chunks])))
- offsets_sorted_unique_per_dim = [sorted(set(o)) for o in offsets_per_dim]
- # print("offsets_sorted_unique_per_dim=",
- # list(offsets_sorted_unique_per_dim))
- # case 1: PIConGPU static load balancing (works with Dask assumptions,
- # chunk option no. 3)
- # all chunks in the same column have the same column width although
- # individual columns have different widths
- # case 2: AMReX boxes
- # all chunks are multiple of a common block size, offsets are a multiple
- # of a common blocksize
- # problem: too limited description in Dask
- # https://github.com/dask/dask/issues/7475
- # work-around: create smaller chunks (this incurs a read cost) by forcing
- # into case 1
- # (this can lead to larger blocks than using the gcd of the
- # extents aka AMReX block size)
- common_chunk_widths_per_dim = list()
- for d, offsets_in_dim in enumerate(offsets_sorted_unique_per_dim):
- # print("d=", d, offsets_in_dim, record_component.shape[d])
- offsets_in_dim_arr = np.array(offsets_in_dim)
- # note: this is in the right order of rows/columns, contrary to a
- # sorted extent list from chunks
- extents_in_dim = np.zeros_like(offsets_in_dim_arr)
- extents_in_dim[:-1] = offsets_in_dim_arr[1:]
- extents_in_dim[-1] = record_component.shape[d]
- if len(extents_in_dim) > 1:
- extents_in_dim[:-1] -= offsets_in_dim_arr[:-1]
- extents_in_dim[-1] -= offsets_in_dim_arr[-1]
- # print("extents_in_dim=", extents_in_dim)
- common_chunk_widths_per_dim.append(tuple(extents_in_dim))
- common_chunk_widths_per_dim = tuple(common_chunk_widths_per_dim)
- # print("common_chunk_widths_per_dim=", common_chunk_widths_per_dim)
- da = from_array(
- DaskRecordComponent(record_component),
- chunks=common_chunk_widths_per_dim,
- # name=None,
- asarray=True,
- fancy=False,
- # getitem=None,
- # meta=None,
- # inline_array=False
- )
- return da
|