resources.py 7.0 KB


  1. import os
  2. from . import abc as resources_abc
  3. from . import _common
  4. from ._common import as_file
  5. from contextlib import contextmanager, suppress
  6. from importlib import import_module
  7. from importlib.abc import ResourceLoader
  8. from io import BytesIO, TextIOWrapper
  9. from pathlib import Path
  10. from types import ModuleType
  11. from typing import ContextManager, Iterable, Optional, Union
  12. from typing import cast
  13. from typing.io import BinaryIO, TextIO
  14. __all__ = [
  15. 'Package',
  16. 'Resource',
  17. 'as_file',
  18. 'contents',
  19. 'files',
  20. 'is_resource',
  21. 'open_binary',
  22. 'open_text',
  23. 'path',
  24. 'read_binary',
  25. 'read_text',
  26. ]
  27. Package = Union[str, ModuleType]
  28. Resource = Union[str, os.PathLike]
  29. def _resolve(name) -> ModuleType:
  30. """If name is a string, resolve to a module."""
  31. if hasattr(name, '__spec__'):
  32. return name
  33. return import_module(name)
  34. def _get_package(package) -> ModuleType:
  35. """Take a package name or module object and return the module.
  36. If a name, the module is imported. If the resolved module
  37. object is not a package, raise an exception.
  38. """
  39. module = _resolve(package)
  40. if module.__spec__.submodule_search_locations is None:
  41. raise TypeError('{!r} is not a package'.format(package))
  42. return module
  43. def _normalize_path(path) -> str:
  44. """Normalize a path by ensuring it is a string.
  45. If the resulting string contains path separators, an exception is raised.
  46. """
  47. parent, file_name = os.path.split(path)
  48. if parent:
  49. raise ValueError('{!r} must be only a file name'.format(path))
  50. return file_name
  51. def _get_resource_reader(
  52. package: ModuleType) -> Optional[resources_abc.ResourceReader]:
  53. # Return the package's loader if it's a ResourceReader. We can't use
  54. # a issubclass() check here because apparently abc.'s __subclasscheck__()
  55. # hook wants to create a weak reference to the object, but
  56. # zipimport.zipimporter does not support weak references, resulting in a
  57. # TypeError. That seems terrible.
  58. spec = package.__spec__
  59. if hasattr(spec.loader, 'get_resource_reader'):
  60. return cast(resources_abc.ResourceReader,
  61. spec.loader.get_resource_reader(spec.name))
  62. return None
  63. def _check_location(package):
  64. if package.__spec__.origin is None or not package.__spec__.has_location:
  65. raise FileNotFoundError(f'Package has no location {package!r}')
  66. def open_binary(package: Package, resource: Resource) -> BinaryIO:
  67. """Return a file-like object opened for binary reading of the resource."""
  68. resource = _normalize_path(resource)
  69. package = _get_package(package)
  70. reader = _get_resource_reader(package)
  71. if reader is not None:
  72. return reader.open_resource(resource)
  73. absolute_package_path = os.path.abspath(
  74. package.__spec__.origin or 'non-existent file')
  75. package_path = os.path.dirname(absolute_package_path)
  76. full_path = os.path.join(package_path, resource)
  77. try:
  78. return open(full_path, mode='rb')
  79. except OSError:
  80. # Just assume the loader is a resource loader; all the relevant
  81. # importlib.machinery loaders are and an AttributeError for
  82. # get_data() will make it clear what is needed from the loader.
  83. loader = cast(ResourceLoader, package.__spec__.loader)
  84. data = None
  85. if hasattr(package.__spec__.loader, 'get_data'):
  86. with suppress(OSError):
  87. data = loader.get_data(full_path)
  88. if data is None:
  89. package_name = package.__spec__.name
  90. message = '{!r} resource not found in {!r}'.format(
  91. resource, package_name)
  92. raise FileNotFoundError(message)
  93. return BytesIO(data)
  94. def open_text(package: Package,
  95. resource: Resource,
  96. encoding: str = 'utf-8',
  97. errors: str = 'strict') -> TextIO:
  98. """Return a file-like object opened for text reading of the resource."""
  99. return TextIOWrapper(
  100. open_binary(package, resource), encoding=encoding, errors=errors)
  101. def read_binary(package: Package, resource: Resource) -> bytes:
  102. """Return the binary contents of the resource."""
  103. with open_binary(package, resource) as fp:
  104. return fp.read()
  105. def read_text(package: Package,
  106. resource: Resource,
  107. encoding: str = 'utf-8',
  108. errors: str = 'strict') -> str:
  109. """Return the decoded string of the resource.
  110. The decoding-related arguments have the same semantics as those of
  111. bytes.decode().
  112. """
  113. with open_text(package, resource, encoding, errors) as fp:
  114. return fp.read()
  115. def files(package: Package) -> resources_abc.Traversable:
  116. """
  117. Get a Traversable resource from a package
  118. """
  119. return _common.from_package(_get_package(package))
  120. def path(
  121. package: Package, resource: Resource,
  122. ) -> 'ContextManager[Path]':
  123. """A context manager providing a file path object to the resource.
  124. If the resource does not already exist on its own on the file system,
  125. a temporary file will be created. If the file was created, the file
  126. will be deleted upon exiting the context manager (no exception is
  127. raised if the file was deleted prior to the context manager
  128. exiting).
  129. """
  130. reader = _get_resource_reader(_get_package(package))
  131. return (
  132. _path_from_reader(reader, resource)
  133. if reader else
  134. _common.as_file(files(package).joinpath(_normalize_path(resource)))
  135. )
  136. @contextmanager
  137. def _path_from_reader(reader, resource):
  138. norm_resource = _normalize_path(resource)
  139. with suppress(FileNotFoundError):
  140. yield Path(reader.resource_path(norm_resource))
  141. return
  142. opener_reader = reader.open_resource(norm_resource)
  143. with _common._tempfile(opener_reader.read, suffix=norm_resource) as res:
  144. yield res
  145. def is_resource(package: Package, name: str) -> bool:
  146. """True if 'name' is a resource inside 'package'.
  147. Directories are *not* resources.
  148. """
  149. package = _get_package(package)
  150. _normalize_path(name)
  151. reader = _get_resource_reader(package)
  152. if reader is not None:
  153. return reader.is_resource(name)
  154. package_contents = set(contents(package))
  155. if name not in package_contents:
  156. return False
  157. return (_common.from_package(package) / name).is_file()
  158. def contents(package: Package) -> Iterable[str]:
  159. """Return an iterable of entries in 'package'.
  160. Note that not all entries are resources. Specifically, directories are
  161. not considered resources. Use `is_resource()` on each entry returned here
  162. to check if it is a resource or not.
  163. """
  164. package = _get_package(package)
  165. reader = _get_resource_reader(package)
  166. if reader is not None:
  167. return reader.contents()
  168. # Is the package a namespace package? By definition, namespace packages
  169. # cannot have resources.
  170. namespace = (
  171. package.__spec__.origin is None or
  172. package.__spec__.origin == 'namespace'
  173. )
  174. if namespace or not package.__spec__.has_location:
  175. return ()
  176. return list(item.name for item in _common.from_package(package).iterdir())