parquet.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496
  1. """ parquet compat """
  2. from __future__ import annotations
  3. import io
  4. import os
  5. from typing import (
  6. Any,
  7. AnyStr,
  8. )
  9. from warnings import catch_warnings
  10. from pandas._typing import (
  11. FilePathOrBuffer,
  12. StorageOptions,
  13. )
  14. from pandas.compat._optional import import_optional_dependency
  15. from pandas.errors import AbstractMethodError
  16. from pandas.util._decorators import doc
  17. from pandas import (
  18. DataFrame,
  19. MultiIndex,
  20. get_option,
  21. )
  22. from pandas.core import generic
  23. from pandas.util.version import Version
  24. from pandas.io.common import (
  25. IOHandles,
  26. get_handle,
  27. is_fsspec_url,
  28. is_url,
  29. stringify_path,
  30. )
  31. def get_engine(engine: str) -> BaseImpl:
  32. """return our implementation"""
  33. if engine == "auto":
  34. engine = get_option("io.parquet.engine")
  35. if engine == "auto":
  36. # try engines in this order
  37. engine_classes = [PyArrowImpl, FastParquetImpl]
  38. error_msgs = ""
  39. for engine_class in engine_classes:
  40. try:
  41. return engine_class()
  42. except ImportError as err:
  43. error_msgs += "\n - " + str(err)
  44. raise ImportError(
  45. "Unable to find a usable engine; "
  46. "tried using: 'pyarrow', 'fastparquet'.\n"
  47. "A suitable version of "
  48. "pyarrow or fastparquet is required for parquet "
  49. "support.\n"
  50. "Trying to import the above resulted in these errors:"
  51. f"{error_msgs}"
  52. )
  53. if engine == "pyarrow":
  54. return PyArrowImpl()
  55. elif engine == "fastparquet":
  56. return FastParquetImpl()
  57. raise ValueError("engine must be one of 'pyarrow', 'fastparquet'")
  58. def _get_path_or_handle(
  59. path: FilePathOrBuffer,
  60. fs: Any,
  61. storage_options: StorageOptions = None,
  62. mode: str = "rb",
  63. is_dir: bool = False,
  64. ) -> tuple[FilePathOrBuffer, IOHandles | None, Any]:
  65. """File handling for PyArrow."""
  66. path_or_handle = stringify_path(path)
  67. if is_fsspec_url(path_or_handle) and fs is None:
  68. fsspec = import_optional_dependency("fsspec")
  69. fs, path_or_handle = fsspec.core.url_to_fs(
  70. path_or_handle, **(storage_options or {})
  71. )
  72. elif storage_options and (not is_url(path_or_handle) or mode != "rb"):
  73. # can't write to a remote url
  74. # without making use of fsspec at the moment
  75. raise ValueError("storage_options passed with buffer, or non-supported URL")
  76. handles = None
  77. if (
  78. not fs
  79. and not is_dir
  80. and isinstance(path_or_handle, str)
  81. and not os.path.isdir(path_or_handle)
  82. ):
  83. # use get_handle only when we are very certain that it is not a directory
  84. # fsspec resources can also point to directories
  85. # this branch is used for example when reading from non-fsspec URLs
  86. handles = get_handle(
  87. path_or_handle, mode, is_text=False, storage_options=storage_options
  88. )
  89. fs = None
  90. path_or_handle = handles.handle
  91. return path_or_handle, handles, fs
  92. class BaseImpl:
  93. @staticmethod
  94. def validate_dataframe(df: DataFrame):
  95. if not isinstance(df, DataFrame):
  96. raise ValueError("to_parquet only supports IO with DataFrames")
  97. # must have value column names for all index levels (strings only)
  98. if isinstance(df.columns, MultiIndex):
  99. if not all(
  100. x.inferred_type in {"string", "empty"} for x in df.columns.levels
  101. ):
  102. raise ValueError(
  103. """
  104. parquet must have string column names for all values in
  105. each level of the MultiIndex
  106. """
  107. )
  108. else:
  109. if df.columns.inferred_type not in {"string", "empty"}:
  110. raise ValueError("parquet must have string column names")
  111. # index level names must be strings
  112. valid_names = all(
  113. isinstance(name, str) for name in df.index.names if name is not None
  114. )
  115. if not valid_names:
  116. raise ValueError("Index level names must be strings")
  117. def write(self, df: DataFrame, path, compression, **kwargs):
  118. raise AbstractMethodError(self)
  119. def read(self, path, columns=None, **kwargs):
  120. raise AbstractMethodError(self)
  121. class PyArrowImpl(BaseImpl):
  122. def __init__(self):
  123. import_optional_dependency(
  124. "pyarrow", extra="pyarrow is required for parquet support."
  125. )
  126. import pyarrow.parquet
  127. # import utils to register the pyarrow extension types
  128. import pandas.core.arrays._arrow_utils # noqa
  129. self.api = pyarrow
  130. def write(
  131. self,
  132. df: DataFrame,
  133. path: FilePathOrBuffer[AnyStr],
  134. compression: str | None = "snappy",
  135. index: bool | None = None,
  136. storage_options: StorageOptions = None,
  137. partition_cols: list[str] | None = None,
  138. **kwargs,
  139. ):
  140. self.validate_dataframe(df)
  141. from_pandas_kwargs: dict[str, Any] = {"schema": kwargs.pop("schema", None)}
  142. if index is not None:
  143. from_pandas_kwargs["preserve_index"] = index
  144. table = self.api.Table.from_pandas(df, **from_pandas_kwargs)
  145. path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle(
  146. path,
  147. kwargs.pop("filesystem", None),
  148. storage_options=storage_options,
  149. mode="wb",
  150. is_dir=partition_cols is not None,
  151. )
  152. try:
  153. if partition_cols is not None:
  154. # writes to multiple files under the given path
  155. self.api.parquet.write_to_dataset(
  156. table,
  157. path_or_handle,
  158. compression=compression,
  159. partition_cols=partition_cols,
  160. **kwargs,
  161. )
  162. else:
  163. # write to single output file
  164. self.api.parquet.write_table(
  165. table, path_or_handle, compression=compression, **kwargs
  166. )
  167. finally:
  168. if handles is not None:
  169. handles.close()
  170. def read(
  171. self,
  172. path,
  173. columns=None,
  174. use_nullable_dtypes=False,
  175. storage_options: StorageOptions = None,
  176. **kwargs,
  177. ):
  178. kwargs["use_pandas_metadata"] = True
  179. to_pandas_kwargs = {}
  180. if use_nullable_dtypes:
  181. import pandas as pd
  182. mapping = {
  183. self.api.int8(): pd.Int8Dtype(),
  184. self.api.int16(): pd.Int16Dtype(),
  185. self.api.int32(): pd.Int32Dtype(),
  186. self.api.int64(): pd.Int64Dtype(),
  187. self.api.uint8(): pd.UInt8Dtype(),
  188. self.api.uint16(): pd.UInt16Dtype(),
  189. self.api.uint32(): pd.UInt32Dtype(),
  190. self.api.uint64(): pd.UInt64Dtype(),
  191. self.api.bool_(): pd.BooleanDtype(),
  192. self.api.string(): pd.StringDtype(),
  193. }
  194. to_pandas_kwargs["types_mapper"] = mapping.get
  195. manager = get_option("mode.data_manager")
  196. if manager == "array":
  197. to_pandas_kwargs["split_blocks"] = True # type: ignore[assignment]
  198. path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle(
  199. path,
  200. kwargs.pop("filesystem", None),
  201. storage_options=storage_options,
  202. mode="rb",
  203. )
  204. try:
  205. result = self.api.parquet.read_table(
  206. path_or_handle, columns=columns, **kwargs
  207. ).to_pandas(**to_pandas_kwargs)
  208. if manager == "array":
  209. result = result._as_manager("array", copy=False)
  210. return result
  211. finally:
  212. if handles is not None:
  213. handles.close()
  214. class FastParquetImpl(BaseImpl):
  215. def __init__(self):
  216. # since pandas is a dependency of fastparquet
  217. # we need to import on first use
  218. fastparquet = import_optional_dependency(
  219. "fastparquet", extra="fastparquet is required for parquet support."
  220. )
  221. self.api = fastparquet
  222. def write(
  223. self,
  224. df: DataFrame,
  225. path,
  226. compression="snappy",
  227. index=None,
  228. partition_cols=None,
  229. storage_options: StorageOptions = None,
  230. **kwargs,
  231. ):
  232. self.validate_dataframe(df)
  233. # thriftpy/protocol/compact.py:339:
  234. # DeprecationWarning: tostring() is deprecated.
  235. # Use tobytes() instead.
  236. if "partition_on" in kwargs and partition_cols is not None:
  237. raise ValueError(
  238. "Cannot use both partition_on and "
  239. "partition_cols. Use partition_cols for partitioning data"
  240. )
  241. elif "partition_on" in kwargs:
  242. partition_cols = kwargs.pop("partition_on")
  243. if partition_cols is not None:
  244. kwargs["file_scheme"] = "hive"
  245. # cannot use get_handle as write() does not accept file buffers
  246. path = stringify_path(path)
  247. if is_fsspec_url(path):
  248. fsspec = import_optional_dependency("fsspec")
  249. # if filesystem is provided by fsspec, file must be opened in 'wb' mode.
  250. kwargs["open_with"] = lambda path, _: fsspec.open(
  251. path, "wb", **(storage_options or {})
  252. ).open()
  253. elif storage_options:
  254. raise ValueError(
  255. "storage_options passed with file object or non-fsspec file path"
  256. )
  257. with catch_warnings(record=True):
  258. self.api.write(
  259. path,
  260. df,
  261. compression=compression,
  262. write_index=index,
  263. partition_on=partition_cols,
  264. **kwargs,
  265. )
  266. def read(
  267. self, path, columns=None, storage_options: StorageOptions = None, **kwargs
  268. ):
  269. use_nullable_dtypes = kwargs.pop("use_nullable_dtypes", False)
  270. if use_nullable_dtypes:
  271. raise ValueError(
  272. "The 'use_nullable_dtypes' argument is not supported for the "
  273. "fastparquet engine"
  274. )
  275. path = stringify_path(path)
  276. parquet_kwargs = {}
  277. handles = None
  278. if is_fsspec_url(path):
  279. fsspec = import_optional_dependency("fsspec")
  280. if Version(self.api.__version__) > Version("0.6.1"):
  281. parquet_kwargs["fs"] = fsspec.open(
  282. path, "rb", **(storage_options or {})
  283. ).fs
  284. else:
  285. parquet_kwargs["open_with"] = lambda path, _: fsspec.open(
  286. path, "rb", **(storage_options or {})
  287. ).open()
  288. elif isinstance(path, str) and not os.path.isdir(path):
  289. # use get_handle only when we are very certain that it is not a directory
  290. # fsspec resources can also point to directories
  291. # this branch is used for example when reading from non-fsspec URLs
  292. handles = get_handle(
  293. path, "rb", is_text=False, storage_options=storage_options
  294. )
  295. path = handles.handle
  296. parquet_file = self.api.ParquetFile(path, **parquet_kwargs)
  297. result = parquet_file.to_pandas(columns=columns, **kwargs)
  298. if handles is not None:
  299. handles.close()
  300. return result
  301. @doc(storage_options=generic._shared_docs["storage_options"])
  302. def to_parquet(
  303. df: DataFrame,
  304. path: FilePathOrBuffer | None = None,
  305. engine: str = "auto",
  306. compression: str | None = "snappy",
  307. index: bool | None = None,
  308. storage_options: StorageOptions = None,
  309. partition_cols: list[str] | None = None,
  310. **kwargs,
  311. ) -> bytes | None:
  312. """
  313. Write a DataFrame to the parquet format.
  314. Parameters
  315. ----------
  316. df : DataFrame
  317. path : str or file-like object, default None
  318. If a string, it will be used as Root Directory path
  319. when writing a partitioned dataset. By file-like object,
  320. we refer to objects with a write() method, such as a file handle
  321. (e.g. via builtin open function) or io.BytesIO. The engine
  322. fastparquet does not accept file-like objects. If path is None,
  323. a bytes object is returned.
  324. .. versionchanged:: 1.2.0
  325. engine : {{'auto', 'pyarrow', 'fastparquet'}}, default 'auto'
  326. Parquet library to use. If 'auto', then the option
  327. ``io.parquet.engine`` is used. The default ``io.parquet.engine``
  328. behavior is to try 'pyarrow', falling back to 'fastparquet' if
  329. 'pyarrow' is unavailable.
  330. compression : {{'snappy', 'gzip', 'brotli', None}}, default 'snappy'
  331. Name of the compression to use. Use ``None`` for no compression.
  332. index : bool, default None
  333. If ``True``, include the dataframe's index(es) in the file output. If
  334. ``False``, they will not be written to the file.
  335. If ``None``, similar to ``True`` the dataframe's index(es)
  336. will be saved. However, instead of being saved as values,
  337. the RangeIndex will be stored as a range in the metadata so it
  338. doesn't require much space and is faster. Other indexes will
  339. be included as columns in the file output.
  340. partition_cols : str or list, optional, default None
  341. Column names by which to partition the dataset.
  342. Columns are partitioned in the order they are given.
  343. Must be None if path is not a string.
  344. {storage_options}
  345. .. versionadded:: 1.2.0
  346. kwargs
  347. Additional keyword arguments passed to the engine
  348. Returns
  349. -------
  350. bytes if no path argument is provided else None
  351. """
  352. if isinstance(partition_cols, str):
  353. partition_cols = [partition_cols]
  354. impl = get_engine(engine)
  355. path_or_buf: FilePathOrBuffer = io.BytesIO() if path is None else path
  356. impl.write(
  357. df,
  358. path_or_buf,
  359. compression=compression,
  360. index=index,
  361. partition_cols=partition_cols,
  362. storage_options=storage_options,
  363. **kwargs,
  364. )
  365. if path is None:
  366. assert isinstance(path_or_buf, io.BytesIO)
  367. return path_or_buf.getvalue()
  368. else:
  369. return None
  370. @doc(storage_options=generic._shared_docs["storage_options"])
  371. def read_parquet(
  372. path,
  373. engine: str = "auto",
  374. columns=None,
  375. storage_options: StorageOptions = None,
  376. use_nullable_dtypes: bool = False,
  377. **kwargs,
  378. ):
  379. """
  380. Load a parquet object from the file path, returning a DataFrame.
  381. Parameters
  382. ----------
  383. path : str, path object or file-like object
  384. Any valid string path is acceptable. The string could be a URL. Valid
  385. URL schemes include http, ftp, s3, gs, and file. For file URLs, a host is
  386. expected. A local file could be:
  387. ``file://localhost/path/to/table.parquet``.
  388. A file URL can also be a path to a directory that contains multiple
  389. partitioned parquet files. Both pyarrow and fastparquet support
  390. paths to directories as well as file URLs. A directory path could be:
  391. ``file://localhost/path/to/tables`` or ``s3://bucket/partition_dir``
  392. If you want to pass in a path object, pandas accepts any
  393. ``os.PathLike``.
  394. By file-like object, we refer to objects with a ``read()`` method,
  395. such as a file handle (e.g. via builtin ``open`` function)
  396. or ``StringIO``.
  397. engine : {{'auto', 'pyarrow', 'fastparquet'}}, default 'auto'
  398. Parquet library to use. If 'auto', then the option
  399. ``io.parquet.engine`` is used. The default ``io.parquet.engine``
  400. behavior is to try 'pyarrow', falling back to 'fastparquet' if
  401. 'pyarrow' is unavailable.
  402. columns : list, default=None
  403. If not None, only these columns will be read from the file.
  404. {storage_options}
  405. .. versionadded:: 1.3.0
  406. use_nullable_dtypes : bool, default False
  407. If True, use dtypes that use ``pd.NA`` as missing value indicator
  408. for the resulting DataFrame (only applicable for ``engine="pyarrow"``).
  409. As new dtypes are added that support ``pd.NA`` in the future, the
  410. output with this option will change to use those dtypes.
  411. Note: this is an experimental option, and behaviour (e.g. additional
  412. support dtypes) may change without notice.
  413. .. versionadded:: 1.2.0
  414. **kwargs
  415. Any additional kwargs are passed to the engine.
  416. Returns
  417. -------
  418. DataFrame
  419. """
  420. impl = get_engine(engine)
  421. return impl.read(
  422. path,
  423. columns=columns,
  424. storage_options=storage_options,
  425. use_nullable_dtypes=use_nullable_dtypes,
  426. **kwargs,
  427. )