DaskArray.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. """
  2. This file is part of the openPMD-api.
  3. Copyright 2021 openPMD contributors
  4. Authors: Axel Huebl
  5. License: LGPLv3+
  6. """
  7. import math
  8. import numpy as np
  9. try:
  10. from dask.array import from_array
  11. found_dask = True
  12. except ImportError:
  13. found_dask = False
  14. class DaskRecordComponent:
  15. # shape, .ndim, .dtype and support numpy-style slicing
  16. def __init__(self, record_component):
  17. self.rc = record_component
  18. @property
  19. def shape(self):
  20. # fixme: https://github.com/openPMD/openPMD-api/issues/808
  21. return tuple(self.rc.shape)
  22. @property
  23. def ndim(self):
  24. return self.rc.ndim
  25. @property
  26. def dtype(self):
  27. return self.rc.dtype
  28. def __getitem__(self, slices):
  29. """here we support what Record_Component implements: a tuple of slices,
  30. a slice or an index; we do not support fancy indexing
  31. """
  32. # FIXME: implement handling of zero-slices in Record_Component
  33. # https://github.com/openPMD/openPMD-api/issues/957
  34. all_zero = True
  35. for s in slices:
  36. if s != np.s_[0:0]:
  37. all_zero = False
  38. if all_zero:
  39. return np.array([], dtype=self.dtype)
  40. data = self.rc[slices]
  41. self.rc.series_flush()
  42. if not math.isclose(1.0, self.rc.unit_SI):
  43. data = np.multiply(data, self.rc.unit_SI)
  44. return data
  45. def record_component_to_daskarray(record_component):
  46. """
  47. Load a RecordComponent into a Dask.array.
  48. Parameters
  49. ----------
  50. record_component : openpmd_api.Record_Component
  51. A record component class in openPMD-api.
  52. Returns
  53. -------
  54. dask.array
  55. A dask array.
  56. Raises
  57. ------
  58. ImportError
  59. Raises an exception if dask is not installed
  60. See Also
  61. --------
  62. openpmd_api.BaseRecordComponent.available_chunks : available chunks that
  63. are used internally to parallelize reading
  64. dask.array : the (potentially distributed) array object created here
  65. """
  66. if not found_dask:
  67. raise ImportError("dask NOT found. Install dask for Dask DataFrame "
  68. "support.")
  69. # get optimal chunks
  70. chunks = record_component.available_chunks()
  71. # sort and prepare the chunks for Dask's array API
  72. # https://docs.dask.org/en/latest/array-chunks.html
  73. # https://docs.dask.org/en/latest/array-api.html?highlight=from_array#other-functions
  74. # sorted and unique
  75. offsets_per_dim = list(map(list, zip(*[chunk.offset for chunk in chunks])))
  76. offsets_sorted_unique_per_dim = [sorted(set(o)) for o in offsets_per_dim]
  77. # print("offsets_sorted_unique_per_dim=",
  78. # list(offsets_sorted_unique_per_dim))
  79. # case 1: PIConGPU static load balancing (works with Dask assumptions,
  80. # chunk option no. 3)
  81. # all chunks in the same column have the same column width although
  82. # individual columns have different widths
  83. # case 2: AMReX boxes
  84. # all chunks are multiple of a common block size, offsets are a multiple
  85. # of a common blocksize
  86. # problem: too limited description in Dask
  87. # https://github.com/dask/dask/issues/7475
  88. # work-around: create smaller chunks (this incurs a read cost) by forcing
  89. # into case 1
  90. # (this can lead to larger blocks than using the gcd of the
  91. # extents aka AMReX block size)
  92. common_chunk_widths_per_dim = list()
  93. for d, offsets_in_dim in enumerate(offsets_sorted_unique_per_dim):
  94. # print("d=", d, offsets_in_dim, record_component.shape[d])
  95. offsets_in_dim_arr = np.array(offsets_in_dim)
  96. # note: this is in the right order of rows/columns, contrary to a
  97. # sorted extent list from chunks
  98. extents_in_dim = np.zeros_like(offsets_in_dim_arr)
  99. extents_in_dim[:-1] = offsets_in_dim_arr[1:]
  100. extents_in_dim[-1] = record_component.shape[d]
  101. if len(extents_in_dim) > 1:
  102. extents_in_dim[:-1] -= offsets_in_dim_arr[:-1]
  103. extents_in_dim[-1] -= offsets_in_dim_arr[-1]
  104. # print("extents_in_dim=", extents_in_dim)
  105. common_chunk_widths_per_dim.append(tuple(extents_in_dim))
  106. common_chunk_widths_per_dim = tuple(common_chunk_widths_per_dim)
  107. # print("common_chunk_widths_per_dim=", common_chunk_widths_per_dim)
  108. da = from_array(
  109. DaskRecordComponent(record_component),
  110. chunks=common_chunk_widths_per_dim,
  111. # name=None,
  112. asarray=True,
  113. fancy=False,
  114. # getitem=None,
  115. # meta=None,
  116. # inline_array=False
  117. )
  118. return da