123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
- import collections
- import itertools
- import pathlib
- import operator
- import zipfile
- from . import abc
- from ._itertools import only
- def remove_duplicates(items):
- return iter(collections.OrderedDict.fromkeys(items))
- class FileReader(abc.TraversableResources):
- def __init__(self, loader):
- self.path = pathlib.Path(loader.path).parent
- def resource_path(self, resource):
- """
- Return the file system path to prevent
- `resources.path()` from creating a temporary
- copy.
- """
- return str(self.path.joinpath(resource))
- def files(self):
- return self.path
- class ZipReader(abc.TraversableResources):
- def __init__(self, loader, module):
- _, _, name = module.rpartition('.')
- self.prefix = loader.prefix.replace('\\', '/') + name + '/'
- self.archive = loader.archive
- def open_resource(self, resource):
- try:
- return super().open_resource(resource)
- except KeyError as exc:
- raise FileNotFoundError(exc.args[0])
- def is_resource(self, path):
- """
- Workaround for `zipfile.Path.is_file` returning true
- for non-existent paths.
- """
- target = self.files().joinpath(path)
- return target.is_file() and target.exists()
- def files(self):
- return zipfile.Path(self.archive, self.prefix)
- class MultiplexedPath(abc.Traversable):
- """
- Given a series of Traversable objects, implement a merged
- version of the interface across all objects. Useful for
- namespace packages which may be multihomed at a single
- name.
- """
- def __init__(self, *paths):
- self._paths = list(map(pathlib.Path, remove_duplicates(paths)))
- if not self._paths:
- message = 'MultiplexedPath must contain at least one path'
- raise FileNotFoundError(message)
- if not all(path.is_dir() for path in self._paths):
- raise NotADirectoryError('MultiplexedPath only supports directories')
- def iterdir(self):
- children = (child for path in self._paths for child in path.iterdir())
- by_name = operator.attrgetter('name')
- groups = itertools.groupby(sorted(children, key=by_name), key=by_name)
- return map(self._follow, (locs for name, locs in groups))
- def read_bytes(self):
- raise FileNotFoundError(f'{self} is not a file')
- def read_text(self, *args, **kwargs):
- raise FileNotFoundError(f'{self} is not a file')
- def is_dir(self):
- return True
- def is_file(self):
- return False
- def joinpath(self, *descendants):
- try:
- return super().joinpath(*descendants)
- except abc.TraversalError:
- # One of the paths did not resolve (a directory does not exist).
- # Just return something that will not exist.
- return self._paths[0].joinpath(*descendants)
- @classmethod
- def _follow(cls, children):
- """
- Construct a MultiplexedPath if needed.
- If children contains a sole element, return it.
- Otherwise, return a MultiplexedPath of the items.
- Unless one of the items is not a Directory, then return the first.
- """
- subdirs, one_dir, one_file = itertools.tee(children, 3)
- try:
- return only(one_dir)
- except ValueError:
- try:
- return cls(*subdirs)
- except NotADirectoryError:
- return next(one_file)
- def open(self, *args, **kwargs):
- raise FileNotFoundError(f'{self} is not a file')
- @property
- def name(self):
- return self._paths[0].name
- def __repr__(self):
- paths = ', '.join(f"'{path}'" for path in self._paths)
- return f'MultiplexedPath({paths})'
- class NamespaceReader(abc.TraversableResources):
- def __init__(self, namespace_path):
- if 'NamespacePath' not in str(namespace_path):
- raise ValueError('Invalid path')
- self.path = MultiplexedPath(*list(namespace_path))
- def resource_path(self, resource):
- """
- Return the file system path to prevent
- `resources.path()` from creating a temporary
- copy.
- """
- return str(self.path.joinpath(resource))
- def files(self):
- return self.path
|