123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953 |
- """Common IO api utilities"""
- from __future__ import annotations
- import bz2
- import codecs
- from collections import abc
- import dataclasses
- import gzip
- from io import (
- BufferedIOBase,
- BytesIO,
- RawIOBase,
- StringIO,
- TextIOWrapper,
- )
- import mmap
- import os
- from typing import (
- IO,
- Any,
- AnyStr,
- Mapping,
- cast,
- )
- from urllib.parse import (
- urljoin,
- urlparse as parse_url,
- uses_netloc,
- uses_params,
- uses_relative,
- )
- import warnings
- import zipfile
- from pandas._typing import (
- Buffer,
- CompressionDict,
- CompressionOptions,
- FileOrBuffer,
- FilePathOrBuffer,
- StorageOptions,
- )
- from pandas.compat import (
- get_lzma_file,
- import_lzma,
- )
- from pandas.compat._optional import import_optional_dependency
- from pandas.core.dtypes.common import is_file_like
- lzma = import_lzma()
- _VALID_URLS = set(uses_relative + uses_netloc + uses_params)
- _VALID_URLS.discard("")
- @dataclasses.dataclass
- class IOArgs:
- """
- Return value of io/common.py:_get_filepath_or_buffer.
- Note (copy&past from io/parsers):
- filepath_or_buffer can be Union[FilePathOrBuffer, s3fs.S3File, gcsfs.GCSFile]
- though mypy handling of conditional imports is difficult.
- See https://github.com/python/mypy/issues/1297
- """
- filepath_or_buffer: FileOrBuffer
- encoding: str
- mode: str
- compression: CompressionDict
- should_close: bool = False
- @dataclasses.dataclass
- class IOHandles:
- """
- Return value of io/common.py:get_handle
- Can be used as a context manager.
- This is used to easily close created buffers and to handle corner cases when
- TextIOWrapper is inserted.
- handle: The file handle to be used.
- created_handles: All file handles that are created by get_handle
- is_wrapped: Whether a TextIOWrapper needs to be detached.
- """
- handle: Buffer
- compression: CompressionDict
- created_handles: list[Buffer] = dataclasses.field(default_factory=list)
- is_wrapped: bool = False
- is_mmap: bool = False
- def close(self) -> None:
- """
- Close all created buffers.
- Note: If a TextIOWrapper was inserted, it is flushed and detached to
- avoid closing the potentially user-created buffer.
- """
- if self.is_wrapped:
- assert isinstance(self.handle, TextIOWrapper)
- self.handle.flush()
- self.handle.detach()
- self.created_handles.remove(self.handle)
- try:
- for handle in self.created_handles:
- handle.close()
- except (OSError, ValueError):
- pass
- self.created_handles = []
- self.is_wrapped = False
- def __enter__(self) -> IOHandles:
- return self
- def __exit__(self, *args: Any) -> None:
- self.close()
- def is_url(url) -> bool:
- """
- Check to see if a URL has a valid protocol.
- Parameters
- ----------
- url : str or unicode
- Returns
- -------
- isurl : bool
- If `url` has a valid protocol return True otherwise False.
- """
- if not isinstance(url, str):
- return False
- return parse_url(url).scheme in _VALID_URLS
- def _expand_user(filepath_or_buffer: FileOrBuffer[AnyStr]) -> FileOrBuffer[AnyStr]:
- """
- Return the argument with an initial component of ~ or ~user
- replaced by that user's home directory.
- Parameters
- ----------
- filepath_or_buffer : object to be converted if possible
- Returns
- -------
- expanded_filepath_or_buffer : an expanded filepath or the
- input if not expandable
- """
- if isinstance(filepath_or_buffer, str):
- return os.path.expanduser(filepath_or_buffer)
- return filepath_or_buffer
- def validate_header_arg(header) -> None:
- if isinstance(header, bool):
- raise TypeError(
- "Passing a bool to header is invalid. Use header=None for no header or "
- "header=int or list-like of ints to specify "
- "the row(s) making up the column names"
- )
- def stringify_path(
- filepath_or_buffer: FilePathOrBuffer[AnyStr],
- convert_file_like: bool = False,
- ) -> FileOrBuffer[AnyStr]:
- """
- Attempt to convert a path-like object to a string.
- Parameters
- ----------
- filepath_or_buffer : object to be converted
- Returns
- -------
- str_filepath_or_buffer : maybe a string version of the object
- Notes
- -----
- Objects supporting the fspath protocol (python 3.6+) are coerced
- according to its __fspath__ method.
- Any other object is passed through unchanged, which includes bytes,
- strings, buffers, or anything else that's not even path-like.
- """
- if not convert_file_like and is_file_like(filepath_or_buffer):
- # GH 38125: some fsspec objects implement os.PathLike but have already opened a
- # file. This prevents opening the file a second time. infer_compression calls
- # this function with convert_file_like=True to infer the compression.
- return cast(FileOrBuffer[AnyStr], filepath_or_buffer)
- if isinstance(filepath_or_buffer, os.PathLike):
- filepath_or_buffer = filepath_or_buffer.__fspath__()
- return _expand_user(filepath_or_buffer)
- def urlopen(*args, **kwargs):
- """
- Lazy-import wrapper for stdlib urlopen, as that imports a big chunk of
- the stdlib.
- """
- import urllib.request
- return urllib.request.urlopen(*args, **kwargs)
- def is_fsspec_url(url: FilePathOrBuffer) -> bool:
- """
- Returns true if the given URL looks like
- something fsspec can handle
- """
- return (
- isinstance(url, str)
- and "://" in url
- and not url.startswith(("http://", "https://"))
- )
- def _get_filepath_or_buffer(
- filepath_or_buffer: FilePathOrBuffer,
- encoding: str = "utf-8",
- compression: CompressionOptions = None,
- mode: str = "r",
- storage_options: StorageOptions = None,
- ) -> IOArgs:
- """
- If the filepath_or_buffer is a url, translate and return the buffer.
- Otherwise passthrough.
- Parameters
- ----------
- filepath_or_buffer : a url, filepath (str, py.path.local or pathlib.Path),
- or buffer
- compression : {{'gzip', 'bz2', 'zip', 'xz', None}}, optional
- encoding : the encoding to use to decode bytes, default is 'utf-8'
- mode : str, optional
- storage_options : dict, optional
- Extra options that make sense for a particular storage connection, e.g.
- host, port, username, password, etc., if using a URL that will
- be parsed by ``fsspec``, e.g., starting "s3://", "gcs://". An error
- will be raised if providing this argument with a local path or
- a file-like buffer. See the fsspec and backend storage implementation
- docs for the set of allowed keys and values
- .. versionadded:: 1.2.0
- ..versionchange:: 1.2.0
- Returns the dataclass IOArgs.
- """
- filepath_or_buffer = stringify_path(filepath_or_buffer)
- # handle compression dict
- compression_method, compression = get_compression_method(compression)
- compression_method = infer_compression(filepath_or_buffer, compression_method)
- # GH21227 internal compression is not used for non-binary handles.
- if compression_method and hasattr(filepath_or_buffer, "write") and "b" not in mode:
- warnings.warn(
- "compression has no effect when passing a non-binary object as input.",
- RuntimeWarning,
- stacklevel=2,
- )
- compression_method = None
- compression = dict(compression, method=compression_method)
- # uniform encoding names
- if encoding is not None:
- encoding = encoding.replace("_", "-").lower()
- # bz2 and xz do not write the byte order mark for utf-16 and utf-32
- # print a warning when writing such files
- if (
- "w" in mode
- and compression_method in ["bz2", "xz"]
- and encoding in ["utf-16", "utf-32"]
- ):
- warnings.warn(
- f"{compression} will not write the byte order mark for {encoding}",
- UnicodeWarning,
- )
- # Use binary mode when converting path-like objects to file-like objects (fsspec)
- # except when text mode is explicitly requested. The original mode is returned if
- # fsspec is not used.
- fsspec_mode = mode
- if "t" not in fsspec_mode and "b" not in fsspec_mode:
- fsspec_mode += "b"
- if isinstance(filepath_or_buffer, str) and is_url(filepath_or_buffer):
- # TODO: fsspec can also handle HTTP via requests, but leaving this
- # unchanged. using fsspec appears to break the ability to infer if the
- # server responded with gzipped data
- storage_options = storage_options or {}
- # waiting until now for importing to match intended lazy logic of
- # urlopen function defined elsewhere in this module
- import urllib.request
- # assuming storage_options is to be interpreted as headers
- req_info = urllib.request.Request(filepath_or_buffer, headers=storage_options)
- with urlopen(req_info) as req:
- content_encoding = req.headers.get("Content-Encoding", None)
- if content_encoding == "gzip":
- # Override compression based on Content-Encoding header
- compression = {"method": "gzip"}
- reader = BytesIO(req.read())
- return IOArgs(
- filepath_or_buffer=reader,
- encoding=encoding,
- compression=compression,
- should_close=True,
- mode=fsspec_mode,
- )
- if is_fsspec_url(filepath_or_buffer):
- assert isinstance(
- filepath_or_buffer, str
- ) # just to appease mypy for this branch
- # two special-case s3-like protocols; these have special meaning in Hadoop,
- # but are equivalent to just "s3" from fsspec's point of view
- # cc #11071
- if filepath_or_buffer.startswith("s3a://"):
- filepath_or_buffer = filepath_or_buffer.replace("s3a://", "s3://")
- if filepath_or_buffer.startswith("s3n://"):
- filepath_or_buffer = filepath_or_buffer.replace("s3n://", "s3://")
- fsspec = import_optional_dependency("fsspec")
- # If botocore is installed we fallback to reading with anon=True
- # to allow reads from public buckets
- err_types_to_retry_with_anon: list[Any] = []
- try:
- import_optional_dependency("botocore")
- from botocore.exceptions import (
- ClientError,
- NoCredentialsError,
- )
- err_types_to_retry_with_anon = [
- ClientError,
- NoCredentialsError,
- PermissionError,
- ]
- except ImportError:
- pass
- try:
- file_obj = fsspec.open(
- filepath_or_buffer, mode=fsspec_mode, **(storage_options or {})
- ).open()
- # GH 34626 Reads from Public Buckets without Credentials needs anon=True
- except tuple(err_types_to_retry_with_anon):
- if storage_options is None:
- storage_options = {"anon": True}
- else:
- # don't mutate user input.
- storage_options = dict(storage_options)
- storage_options["anon"] = True
- file_obj = fsspec.open(
- filepath_or_buffer, mode=fsspec_mode, **(storage_options or {})
- ).open()
- return IOArgs(
- filepath_or_buffer=file_obj,
- encoding=encoding,
- compression=compression,
- should_close=True,
- mode=fsspec_mode,
- )
- elif storage_options:
- raise ValueError(
- "storage_options passed with file object or non-fsspec file path"
- )
- if isinstance(filepath_or_buffer, (str, bytes, mmap.mmap)):
- return IOArgs(
- filepath_or_buffer=_expand_user(filepath_or_buffer),
- encoding=encoding,
- compression=compression,
- should_close=False,
- mode=mode,
- )
- if not is_file_like(filepath_or_buffer):
- msg = f"Invalid file path or buffer object type: {type(filepath_or_buffer)}"
- raise ValueError(msg)
- return IOArgs(
- filepath_or_buffer=filepath_or_buffer,
- encoding=encoding,
- compression=compression,
- should_close=False,
- mode=mode,
- )
- def file_path_to_url(path: str) -> str:
- """
- converts an absolute native path to a FILE URL.
- Parameters
- ----------
- path : a path in native format
- Returns
- -------
- a valid FILE URL
- """
- # lazify expensive import (~30ms)
- from urllib.request import pathname2url
- return urljoin("file:", pathname2url(path))
- _compression_to_extension = {"gzip": ".gz", "bz2": ".bz2", "zip": ".zip", "xz": ".xz"}
- def get_compression_method(
- compression: CompressionOptions,
- ) -> tuple[str | None, CompressionDict]:
- """
- Simplifies a compression argument to a compression method string and
- a mapping containing additional arguments.
- Parameters
- ----------
- compression : str or mapping
- If string, specifies the compression method. If mapping, value at key
- 'method' specifies compression method.
- Returns
- -------
- tuple of ({compression method}, Optional[str]
- {compression arguments}, Dict[str, Any])
- Raises
- ------
- ValueError on mapping missing 'method' key
- """
- compression_method: str | None
- if isinstance(compression, Mapping):
- compression_args = dict(compression)
- try:
- compression_method = compression_args.pop("method")
- except KeyError as err:
- raise ValueError("If mapping, compression must have key 'method'") from err
- else:
- compression_args = {}
- compression_method = compression
- return compression_method, compression_args
- def infer_compression(
- filepath_or_buffer: FilePathOrBuffer, compression: str | None
- ) -> str | None:
- """
- Get the compression method for filepath_or_buffer. If compression='infer',
- the inferred compression method is returned. Otherwise, the input
- compression method is returned unchanged, unless it's invalid, in which
- case an error is raised.
- Parameters
- ----------
- filepath_or_buffer : str or file handle
- File path or object.
- compression : {'infer', 'gzip', 'bz2', 'zip', 'xz', None}
- If 'infer' and `filepath_or_buffer` is path-like, then detect
- compression from the following extensions: '.gz', '.bz2', '.zip',
- or '.xz' (otherwise no compression).
- Returns
- -------
- string or None
- Raises
- ------
- ValueError on invalid compression specified.
- """
- if compression is None:
- return None
- # Infer compression
- if compression == "infer":
- # Convert all path types (e.g. pathlib.Path) to strings
- filepath_or_buffer = stringify_path(filepath_or_buffer, convert_file_like=True)
- if not isinstance(filepath_or_buffer, str):
- # Cannot infer compression of a buffer, assume no compression
- return None
- # Infer compression from the filename/URL extension
- for compression, extension in _compression_to_extension.items():
- if filepath_or_buffer.lower().endswith(extension):
- return compression
- return None
- # Compression has been specified. Check that it's valid
- if compression in _compression_to_extension:
- return compression
- # https://github.com/python/mypy/issues/5492
- # Unsupported operand types for + ("List[Optional[str]]" and "List[str]")
- valid = ["infer", None] + sorted(
- _compression_to_extension
- ) # type: ignore[operator]
- msg = (
- f"Unrecognized compression type: {compression}\n"
- f"Valid compression types are {valid}"
- )
- raise ValueError(msg)
- def get_handle(
- path_or_buf: FilePathOrBuffer,
- mode: str,
- encoding: str | None = None,
- compression: CompressionOptions = None,
- memory_map: bool = False,
- is_text: bool = True,
- errors: str | None = None,
- storage_options: StorageOptions = None,
- ) -> IOHandles:
- """
- Get file handle for given path/buffer and mode.
- Parameters
- ----------
- path_or_buf : str or file handle
- File path or object.
- mode : str
- Mode to open path_or_buf with.
- encoding : str or None
- Encoding to use.
- compression : str or dict, default None
- If string, specifies compression mode. If dict, value at key 'method'
- specifies compression mode. Compression mode must be one of {'infer',
- 'gzip', 'bz2', 'zip', 'xz', None}. If compression mode is 'infer'
- and `filepath_or_buffer` is path-like, then detect compression from
- the following extensions: '.gz', '.bz2', '.zip', or '.xz' (otherwise
- no compression). If dict and compression mode is one of
- {'zip', 'gzip', 'bz2'}, or inferred as one of the above,
- other entries passed as additional compression options.
- .. versionchanged:: 1.0.0
- May now be a dict with key 'method' as compression mode
- and other keys as compression options if compression
- mode is 'zip'.
- .. versionchanged:: 1.1.0
- Passing compression options as keys in dict is now
- supported for compression modes 'gzip' and 'bz2' as well as 'zip'.
- memory_map : bool, default False
- See parsers._parser_params for more information.
- is_text : bool, default True
- Whether the type of the content passed to the file/buffer is string or
- bytes. This is not the same as `"b" not in mode`. If a string content is
- passed to a binary file/buffer, a wrapper is inserted.
- errors : str, default 'strict'
- Specifies how encoding and decoding errors are to be handled.
- See the errors argument for :func:`open` for a full list
- of options.
- storage_options: StorageOptions = None
- Passed to _get_filepath_or_buffer
- .. versionchanged:: 1.2.0
- Returns the dataclass IOHandles
- """
- # Windows does not default to utf-8. Set to utf-8 for a consistent behavior
- encoding = encoding or "utf-8"
- # read_csv does not know whether the buffer is opened in binary/text mode
- if _is_binary_mode(path_or_buf, mode) and "b" not in mode:
- mode += "b"
- # valdiate errors
- if isinstance(errors, str):
- errors = errors.lower()
- if errors not in (
- None,
- "strict",
- "ignore",
- "replace",
- "xmlcharrefreplace",
- "backslashreplace",
- "namereplace",
- "surrogateescape",
- "surrogatepass",
- ):
- raise ValueError(
- f"Invalid value for `encoding_errors` ({errors}). Please see "
- + "https://docs.python.org/3/library/codecs.html#error-handlers "
- + "for valid values."
- )
- # open URLs
- ioargs = _get_filepath_or_buffer(
- path_or_buf,
- encoding=encoding,
- compression=compression,
- mode=mode,
- storage_options=storage_options,
- )
- handle = ioargs.filepath_or_buffer
- handles: list[Buffer]
- # memory mapping needs to be the first step
- handle, memory_map, handles = _maybe_memory_map(
- handle,
- memory_map,
- ioargs.encoding,
- ioargs.mode,
- errors,
- ioargs.compression["method"] not in _compression_to_extension,
- )
- is_path = isinstance(handle, str)
- compression_args = dict(ioargs.compression)
- compression = compression_args.pop("method")
- if compression:
- # compression libraries do not like an explicit text-mode
- ioargs.mode = ioargs.mode.replace("t", "")
- # GZ Compression
- if compression == "gzip":
- if is_path:
- assert isinstance(handle, str)
- handle = gzip.GzipFile(
- filename=handle,
- mode=ioargs.mode,
- **compression_args,
- )
- else:
- handle = gzip.GzipFile(
- # error: Argument "fileobj" to "GzipFile" has incompatible type
- # "Union[str, Union[IO[Any], RawIOBase, BufferedIOBase, TextIOBase,
- # TextIOWrapper, mmap]]"; expected "Optional[IO[bytes]]"
- fileobj=handle, # type: ignore[arg-type]
- mode=ioargs.mode,
- **compression_args,
- )
- # BZ Compression
- elif compression == "bz2":
- handle = bz2.BZ2File(
- # Argument 1 to "BZ2File" has incompatible type "Union[str,
- # Union[IO[Any], RawIOBase, BufferedIOBase, TextIOBase, TextIOWrapper,
- # mmap]]"; expected "Union[Union[str, bytes, _PathLike[str],
- # _PathLike[bytes]], IO[bytes]]"
- handle, # type: ignore[arg-type]
- mode=ioargs.mode,
- **compression_args,
- )
- # ZIP Compression
- elif compression == "zip":
- handle = _BytesZipFile(handle, ioargs.mode, **compression_args)
- if handle.mode == "r":
- handles.append(handle)
- zip_names = handle.namelist()
- if len(zip_names) == 1:
- handle = handle.open(zip_names.pop())
- elif len(zip_names) == 0:
- raise ValueError(f"Zero files found in ZIP file {path_or_buf}")
- else:
- raise ValueError(
- "Multiple files found in ZIP file. "
- f"Only one file per ZIP: {zip_names}"
- )
- # XZ Compression
- elif compression == "xz":
- handle = get_lzma_file(lzma)(handle, ioargs.mode)
- # Unrecognized Compression
- else:
- msg = f"Unrecognized compression type: {compression}"
- raise ValueError(msg)
- assert not isinstance(handle, str)
- handles.append(handle)
- elif isinstance(handle, str):
- # Check whether the filename is to be opened in binary mode.
- # Binary mode does not support 'encoding' and 'newline'.
- if ioargs.encoding and "b" not in ioargs.mode:
- # Encoding
- handle = open(
- handle,
- ioargs.mode,
- encoding=ioargs.encoding,
- errors=errors,
- newline="",
- )
- else:
- # Binary mode
- handle = open(handle, ioargs.mode)
- handles.append(handle)
- # Convert BytesIO or file objects passed with an encoding
- is_wrapped = False
- if is_text and (compression or _is_binary_mode(handle, ioargs.mode)):
- handle = TextIOWrapper(
- # error: Argument 1 to "TextIOWrapper" has incompatible type
- # "Union[IO[bytes], IO[Any], RawIOBase, BufferedIOBase, TextIOBase, mmap]";
- # expected "IO[bytes]"
- handle, # type: ignore[arg-type]
- encoding=ioargs.encoding,
- errors=errors,
- newline="",
- )
- handles.append(handle)
- # only marked as wrapped when the caller provided a handle
- is_wrapped = not (
- isinstance(ioargs.filepath_or_buffer, str) or ioargs.should_close
- )
- handles.reverse() # close the most recently added buffer first
- if ioargs.should_close:
- assert not isinstance(ioargs.filepath_or_buffer, str)
- handles.append(ioargs.filepath_or_buffer)
- assert not isinstance(handle, str)
- return IOHandles(
- handle=handle,
- created_handles=handles,
- is_wrapped=is_wrapped,
- is_mmap=memory_map,
- compression=ioargs.compression,
- )
- # error: Definition of "__exit__" in base class "ZipFile" is incompatible with
- # definition in base class "BytesIO" [misc]
- # error: Definition of "__enter__" in base class "ZipFile" is incompatible with
- # definition in base class "BytesIO" [misc]
- # error: Definition of "__enter__" in base class "ZipFile" is incompatible with
- # definition in base class "BinaryIO" [misc]
- # error: Definition of "__enter__" in base class "ZipFile" is incompatible with
- # definition in base class "IO" [misc]
- # error: Definition of "read" in base class "ZipFile" is incompatible with
- # definition in base class "BytesIO" [misc]
- # error: Definition of "read" in base class "ZipFile" is incompatible with
- # definition in base class "IO" [misc]
- class _BytesZipFile(zipfile.ZipFile, BytesIO): # type: ignore[misc]
- """
- Wrapper for standard library class ZipFile and allow the returned file-like
- handle to accept byte strings via `write` method.
- BytesIO provides attributes of file-like object and ZipFile.writestr writes
- bytes strings into a member of the archive.
- """
- # GH 17778
- def __init__(
- self,
- file: FilePathOrBuffer,
- mode: str,
- archive_name: str | None = None,
- **kwargs,
- ):
- mode = mode.replace("b", "")
- self.archive_name = archive_name
- self.multiple_write_buffer: StringIO | BytesIO | None = None
- kwargs_zip: dict[str, Any] = {"compression": zipfile.ZIP_DEFLATED}
- kwargs_zip.update(kwargs)
- # error: Argument 1 to "__init__" of "ZipFile" has incompatible type
- # "Union[_PathLike[str], Union[str, Union[IO[Any], RawIOBase, BufferedIOBase,
- # TextIOBase, TextIOWrapper, mmap]]]"; expected "Union[Union[str,
- # _PathLike[str]], IO[bytes]]"
- super().__init__(file, mode, **kwargs_zip) # type: ignore[arg-type]
- def write(self, data):
- # buffer multiple write calls, write on flush
- if self.multiple_write_buffer is None:
- self.multiple_write_buffer = (
- BytesIO() if isinstance(data, bytes) else StringIO()
- )
- self.multiple_write_buffer.write(data)
- def flush(self) -> None:
- # write to actual handle and close write buffer
- if self.multiple_write_buffer is None or self.multiple_write_buffer.closed:
- return
- # ZipFile needs a non-empty string
- archive_name = self.archive_name or self.filename or "zip"
- with self.multiple_write_buffer:
- super().writestr(archive_name, self.multiple_write_buffer.getvalue())
- def close(self):
- self.flush()
- super().close()
- @property
- def closed(self):
- return self.fp is None
- class _MMapWrapper(abc.Iterator):
- """
- Wrapper for the Python's mmap class so that it can be properly read in
- by Python's csv.reader class.
- Parameters
- ----------
- f : file object
- File object to be mapped onto memory. Must support the 'fileno'
- method or have an equivalent attribute
- """
- def __init__(
- self,
- f: IO,
- encoding: str = "utf-8",
- errors: str = "strict",
- decode: bool = True,
- ):
- self.encoding = encoding
- self.errors = errors
- self.decoder = codecs.getincrementaldecoder(encoding)(errors=errors)
- self.decode = decode
- self.attributes = {}
- for attribute in ("seekable", "readable", "writeable"):
- if not hasattr(f, attribute):
- continue
- self.attributes[attribute] = getattr(f, attribute)()
- self.mmap = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
- def __getattr__(self, name: str):
- if name in self.attributes:
- return lambda: self.attributes[name]
- return getattr(self.mmap, name)
- def __iter__(self) -> _MMapWrapper:
- return self
- def read(self, size: int = -1) -> str | bytes:
- # CSV c-engine uses read instead of iterating
- content: bytes = self.mmap.read(size)
- if self.decode:
- # memory mapping is applied before compression. Encoding should
- # be applied to the de-compressed data.
- return content.decode(self.encoding, errors=self.errors)
- return content
- def __next__(self) -> str:
- newbytes = self.mmap.readline()
- # readline returns bytes, not str, but Python's CSV reader
- # expects str, so convert the output to str before continuing
- newline = self.decoder.decode(newbytes)
- # mmap doesn't raise if reading past the allocated
- # data but instead returns an empty string, so raise
- # if that is returned
- if newline == "":
- raise StopIteration
- # IncrementalDecoder seems to push newline to the next line
- return newline.lstrip("\n")
- def _maybe_memory_map(
- handle: FileOrBuffer,
- memory_map: bool,
- encoding: str,
- mode: str,
- errors: str | None,
- decode: bool,
- ) -> tuple[FileOrBuffer, bool, list[Buffer]]:
- """Try to memory map file/buffer."""
- handles: list[Buffer] = []
- memory_map &= hasattr(handle, "fileno") or isinstance(handle, str)
- if not memory_map:
- return handle, memory_map, handles
- # need to open the file first
- if isinstance(handle, str):
- if encoding and "b" not in mode:
- # Encoding
- handle = open(handle, mode, encoding=encoding, errors=errors, newline="")
- else:
- # Binary mode
- handle = open(handle, mode)
- handles.append(handle)
- try:
- # error: Argument 1 to "_MMapWrapper" has incompatible type "Union[IO[Any],
- # RawIOBase, BufferedIOBase, TextIOBase, mmap]"; expected "IO[Any]"
- wrapped = cast(
- mmap.mmap,
- _MMapWrapper(handle, encoding, errors, decode), # type: ignore[arg-type]
- )
- handle.close()
- handles.remove(handle)
- handles.append(wrapped)
- handle = wrapped
- except Exception:
- # we catch any errors that may have occurred
- # because that is consistent with the lower-level
- # functionality of the C engine (pd.read_csv), so
- # leave the file handler as is then
- memory_map = False
- return handle, memory_map, handles
- def file_exists(filepath_or_buffer: FilePathOrBuffer) -> bool:
- """Test whether file exists."""
- exists = False
- filepath_or_buffer = stringify_path(filepath_or_buffer)
- if not isinstance(filepath_or_buffer, str):
- return exists
- try:
- exists = os.path.exists(filepath_or_buffer)
- # gh-5874: if the filepath is too long will raise here
- except (TypeError, ValueError):
- pass
- return exists
- def _is_binary_mode(handle: FilePathOrBuffer, mode: str) -> bool:
- """Whether the handle is opened in binary mode"""
- # specified by user
- if "t" in mode or "b" in mode:
- return "b" in mode
- # classes that expect string but have 'b' in mode
- text_classes = (codecs.StreamWriter, codecs.StreamReader, codecs.StreamReaderWriter)
- if issubclass(type(handle), text_classes):
- return False
- # classes that expect bytes
- binary_classes = (BufferedIOBase, RawIOBase)
- return isinstance(handle, binary_classes) or "b" in getattr(handle, "mode", mode)
|