readers.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. import collections
  2. import itertools
  3. import pathlib
  4. import operator
  5. import zipfile
  6. from . import abc
  7. from ._itertools import only
  8. def remove_duplicates(items):
  9. return iter(collections.OrderedDict.fromkeys(items))
  10. class FileReader(abc.TraversableResources):
  11. def __init__(self, loader):
  12. self.path = pathlib.Path(loader.path).parent
  13. def resource_path(self, resource):
  14. """
  15. Return the file system path to prevent
  16. `resources.path()` from creating a temporary
  17. copy.
  18. """
  19. return str(self.path.joinpath(resource))
  20. def files(self):
  21. return self.path
  22. class ZipReader(abc.TraversableResources):
  23. def __init__(self, loader, module):
  24. _, _, name = module.rpartition('.')
  25. self.prefix = loader.prefix.replace('\\', '/') + name + '/'
  26. self.archive = loader.archive
  27. def open_resource(self, resource):
  28. try:
  29. return super().open_resource(resource)
  30. except KeyError as exc:
  31. raise FileNotFoundError(exc.args[0])
  32. def is_resource(self, path):
  33. """
  34. Workaround for `zipfile.Path.is_file` returning true
  35. for non-existent paths.
  36. """
  37. target = self.files().joinpath(path)
  38. return target.is_file() and target.exists()
  39. def files(self):
  40. return zipfile.Path(self.archive, self.prefix)
  41. class MultiplexedPath(abc.Traversable):
  42. """
  43. Given a series of Traversable objects, implement a merged
  44. version of the interface across all objects. Useful for
  45. namespace packages which may be multihomed at a single
  46. name.
  47. """
  48. def __init__(self, *paths):
  49. self._paths = list(map(pathlib.Path, remove_duplicates(paths)))
  50. if not self._paths:
  51. message = 'MultiplexedPath must contain at least one path'
  52. raise FileNotFoundError(message)
  53. if not all(path.is_dir() for path in self._paths):
  54. raise NotADirectoryError('MultiplexedPath only supports directories')
  55. def iterdir(self):
  56. children = (child for path in self._paths for child in path.iterdir())
  57. by_name = operator.attrgetter('name')
  58. groups = itertools.groupby(sorted(children, key=by_name), key=by_name)
  59. return map(self._follow, (locs for name, locs in groups))
  60. def read_bytes(self):
  61. raise FileNotFoundError(f'{self} is not a file')
  62. def read_text(self, *args, **kwargs):
  63. raise FileNotFoundError(f'{self} is not a file')
  64. def is_dir(self):
  65. return True
  66. def is_file(self):
  67. return False
  68. def joinpath(self, *descendants):
  69. try:
  70. return super().joinpath(*descendants)
  71. except abc.TraversalError:
  72. # One of the paths did not resolve (a directory does not exist).
  73. # Just return something that will not exist.
  74. return self._paths[0].joinpath(*descendants)
  75. @classmethod
  76. def _follow(cls, children):
  77. """
  78. Construct a MultiplexedPath if needed.
  79. If children contains a sole element, return it.
  80. Otherwise, return a MultiplexedPath of the items.
  81. Unless one of the items is not a Directory, then return the first.
  82. """
  83. subdirs, one_dir, one_file = itertools.tee(children, 3)
  84. try:
  85. return only(one_dir)
  86. except ValueError:
  87. try:
  88. return cls(*subdirs)
  89. except NotADirectoryError:
  90. return next(one_file)
  91. def open(self, *args, **kwargs):
  92. raise FileNotFoundError(f'{self} is not a file')
  93. @property
  94. def name(self):
  95. return self._paths[0].name
  96. def __repr__(self):
  97. paths = ', '.join(f"'{path}'" for path in self._paths)
  98. return f'MultiplexedPath({paths})'
  99. class NamespaceReader(abc.TraversableResources):
  100. def __init__(self, namespace_path):
  101. if 'NamespacePath' not in str(namespace_path):
  102. raise ValueError('Invalid path')
  103. self.path = MultiplexedPath(*list(namespace_path))
  104. def resource_path(self, resource):
  105. """
  106. Return the file system path to prevent
  107. `resources.path()` from creating a temporary
  108. copy.
  109. """
  110. return str(self.path.joinpath(resource))
  111. def files(self):
  112. return self.path