DaskDataFrame.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. """
  2. This file is part of the openPMD-api.
  3. Copyright 2021 openPMD contributors
  4. Authors: Axel Huebl, Dmitry Ganyushin, John Kirkham
  5. License: LGPLv3+
  6. """
  7. import numpy as np
  8. try:
  9. import dask.dataframe as dd
  10. from dask.delayed import delayed
  11. found_dask = True
  12. except ImportError:
  13. found_dask = False
  14. try:
  15. import pandas # noqa
  16. found_pandas = True
  17. except ImportError:
  18. found_pandas = False
  19. def read_chunk_to_df(species, chunk):
  20. stride = np.s_[chunk.offset[0]:chunk.offset[0]+chunk.extent[0]]
  21. return species.to_df(stride)
  22. def particles_to_daskdataframe(particle_species):
  23. """
  24. Load all records of a particle species into a Dask DataFrame.
  25. Parameters
  26. ----------
  27. particle_species : openpmd_api.ParticleSpecies
  28. A ParticleSpecies class in openPMD-api.
  29. Returns
  30. -------
  31. dask.dataframe
  32. A dask dataframe with particles as index and openPMD record
  33. components of the particle_species as columns.
  34. Raises
  35. ------
  36. ImportError
  37. Raises an exception if dask or pandas are not installed
  38. See Also
  39. --------
  40. openpmd_api.BaseRecordComponent.available_chunks : available chunks that
  41. are used internally to parallelize particle processing
  42. dask.dataframe : the central dataframe object created here
  43. """
  44. if not found_dask:
  45. raise ImportError("dask NOT found. Install dask for Dask DataFrame "
  46. "support.")
  47. if not found_pandas: # catch this early: before delayed functions
  48. raise ImportError("pandas NOT found. Install pandas for DataFrame "
  49. "support.")
  50. # get optimal chunks: query first non-constant record component and
  51. # assume the same chunking applies for all of them
  52. # in a particle species
  53. chunks = None
  54. for k_r, r in particle_species.items():
  55. for k_rc, rc in r.items():
  56. if not rc.constant:
  57. chunks = rc.available_chunks()
  58. break
  59. if chunks:
  60. break
  61. # only constant record components:
  62. # fall back to a single, big chunk here
  63. if chunks is None:
  64. for k_r, r in particle_species.items():
  65. for k_rc, rc in r.items():
  66. chunks = rc.available_chunks()
  67. break
  68. if chunks:
  69. break
  70. # merge DataFrames
  71. dfs = [
  72. delayed(read_chunk_to_df)(particle_species, chunk) for chunk in chunks
  73. ]
  74. df = dd.from_delayed(dfs)
  75. return df