123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292 |
- """Cache Management
- """
- import hashlib
- import json
- import logging
- import os
- from pathlib import Path
- from typing import Any, Dict, List, Optional
- from pip._vendor.packaging.tags import Tag, interpreter_name, interpreter_version
- from pip._vendor.packaging.utils import canonicalize_name
- from pip._internal.exceptions import InvalidWheelFilename
- from pip._internal.models.direct_url import DirectUrl
- from pip._internal.models.link import Link
- from pip._internal.models.wheel import Wheel
- from pip._internal.utils.temp_dir import TempDirectory, tempdir_kinds
- from pip._internal.utils.urls import path_to_url
- logger = logging.getLogger(__name__)
- ORIGIN_JSON_NAME = "origin.json"
- def _hash_dict(d: Dict[str, str]) -> str:
- """Return a stable sha224 of a dictionary."""
- s = json.dumps(d, sort_keys=True, separators=(",", ":"), ensure_ascii=True)
- return hashlib.sha224(s.encode("ascii")).hexdigest()
- class Cache:
- """An abstract class - provides cache directories for data from links
- :param cache_dir: The root of the cache.
- """
- def __init__(self, cache_dir: str) -> None:
- super().__init__()
- assert not cache_dir or os.path.isabs(cache_dir)
- self.cache_dir = cache_dir or None
- def _get_cache_path_parts(self, link: Link) -> List[str]:
- """Get parts of part that must be os.path.joined with cache_dir"""
- # We want to generate an url to use as our cache key, we don't want to
- # just re-use the URL because it might have other items in the fragment
- # and we don't care about those.
- key_parts = {"url": link.url_without_fragment}
- if link.hash_name is not None and link.hash is not None:
- key_parts[link.hash_name] = link.hash
- if link.subdirectory_fragment:
- key_parts["subdirectory"] = link.subdirectory_fragment
- # Include interpreter name, major and minor version in cache key
- # to cope with ill-behaved sdists that build a different wheel
- # depending on the python version their setup.py is being run on,
- # and don't encode the difference in compatibility tags.
- # https://github.com/pypa/pip/issues/7296
- key_parts["interpreter_name"] = interpreter_name()
- key_parts["interpreter_version"] = interpreter_version()
- # Encode our key url with sha224, we'll use this because it has similar
- # security properties to sha256, but with a shorter total output (and
- # thus less secure). However the differences don't make a lot of
- # difference for our use case here.
- hashed = _hash_dict(key_parts)
- # We want to nest the directories some to prevent having a ton of top
- # level directories where we might run out of sub directories on some
- # FS.
- parts = [hashed[:2], hashed[2:4], hashed[4:6], hashed[6:]]
- return parts
- def _get_candidates(self, link: Link, canonical_package_name: str) -> List[Any]:
- can_not_cache = not self.cache_dir or not canonical_package_name or not link
- if can_not_cache:
- return []
- candidates = []
- path = self.get_path_for_link(link)
- if os.path.isdir(path):
- for candidate in os.listdir(path):
- candidates.append((candidate, path))
- return candidates
- def get_path_for_link(self, link: Link) -> str:
- """Return a directory to store cached items in for link."""
- raise NotImplementedError()
- def get(
- self,
- link: Link,
- package_name: Optional[str],
- supported_tags: List[Tag],
- ) -> Link:
- """Returns a link to a cached item if it exists, otherwise returns the
- passed link.
- """
- raise NotImplementedError()
- class SimpleWheelCache(Cache):
- """A cache of wheels for future installs."""
- def __init__(self, cache_dir: str) -> None:
- super().__init__(cache_dir)
- def get_path_for_link(self, link: Link) -> str:
- """Return a directory to store cached wheels for link
- Because there are M wheels for any one sdist, we provide a directory
- to cache them in, and then consult that directory when looking up
- cache hits.
- We only insert things into the cache if they have plausible version
- numbers, so that we don't contaminate the cache with things that were
- not unique. E.g. ./package might have dozens of installs done for it
- and build a version of 0.0...and if we built and cached a wheel, we'd
- end up using the same wheel even if the source has been edited.
- :param link: The link of the sdist for which this will cache wheels.
- """
- parts = self._get_cache_path_parts(link)
- assert self.cache_dir
- # Store wheels within the root cache_dir
- return os.path.join(self.cache_dir, "wheels", *parts)
- def get(
- self,
- link: Link,
- package_name: Optional[str],
- supported_tags: List[Tag],
- ) -> Link:
- candidates = []
- if not package_name:
- return link
- canonical_package_name = canonicalize_name(package_name)
- for wheel_name, wheel_dir in self._get_candidates(link, canonical_package_name):
- try:
- wheel = Wheel(wheel_name)
- except InvalidWheelFilename:
- continue
- if canonicalize_name(wheel.name) != canonical_package_name:
- logger.debug(
- "Ignoring cached wheel %s for %s as it "
- "does not match the expected distribution name %s.",
- wheel_name,
- link,
- package_name,
- )
- continue
- if not wheel.supported(supported_tags):
- # Built for a different python/arch/etc
- continue
- candidates.append(
- (
- wheel.support_index_min(supported_tags),
- wheel_name,
- wheel_dir,
- )
- )
- if not candidates:
- return link
- _, wheel_name, wheel_dir = min(candidates)
- return Link(path_to_url(os.path.join(wheel_dir, wheel_name)))
- class EphemWheelCache(SimpleWheelCache):
- """A SimpleWheelCache that creates it's own temporary cache directory"""
- def __init__(self) -> None:
- self._temp_dir = TempDirectory(
- kind=tempdir_kinds.EPHEM_WHEEL_CACHE,
- globally_managed=True,
- )
- super().__init__(self._temp_dir.path)
- class CacheEntry:
- def __init__(
- self,
- link: Link,
- persistent: bool,
- ):
- self.link = link
- self.persistent = persistent
- self.origin: Optional[DirectUrl] = None
- origin_direct_url_path = Path(self.link.file_path).parent / ORIGIN_JSON_NAME
- if origin_direct_url_path.exists():
- try:
- self.origin = DirectUrl.from_json(
- origin_direct_url_path.read_text(encoding="utf-8")
- )
- except Exception as e:
- logger.warning(
- "Ignoring invalid cache entry origin file %s for %s (%s)",
- origin_direct_url_path,
- link.filename,
- e,
- )
- class WheelCache(Cache):
- """Wraps EphemWheelCache and SimpleWheelCache into a single Cache
- This Cache allows for gracefully degradation, using the ephem wheel cache
- when a certain link is not found in the simple wheel cache first.
- """
- def __init__(self, cache_dir: str) -> None:
- super().__init__(cache_dir)
- self._wheel_cache = SimpleWheelCache(cache_dir)
- self._ephem_cache = EphemWheelCache()
- def get_path_for_link(self, link: Link) -> str:
- return self._wheel_cache.get_path_for_link(link)
- def get_ephem_path_for_link(self, link: Link) -> str:
- return self._ephem_cache.get_path_for_link(link)
- def get(
- self,
- link: Link,
- package_name: Optional[str],
- supported_tags: List[Tag],
- ) -> Link:
- cache_entry = self.get_cache_entry(link, package_name, supported_tags)
- if cache_entry is None:
- return link
- return cache_entry.link
- def get_cache_entry(
- self,
- link: Link,
- package_name: Optional[str],
- supported_tags: List[Tag],
- ) -> Optional[CacheEntry]:
- """Returns a CacheEntry with a link to a cached item if it exists or
- None. The cache entry indicates if the item was found in the persistent
- or ephemeral cache.
- """
- retval = self._wheel_cache.get(
- link=link,
- package_name=package_name,
- supported_tags=supported_tags,
- )
- if retval is not link:
- return CacheEntry(retval, persistent=True)
- retval = self._ephem_cache.get(
- link=link,
- package_name=package_name,
- supported_tags=supported_tags,
- )
- if retval is not link:
- return CacheEntry(retval, persistent=False)
- return None
- @staticmethod
- def record_download_origin(cache_dir: str, download_info: DirectUrl) -> None:
- origin_path = Path(cache_dir) / ORIGIN_JSON_NAME
- if origin_path.exists():
- try:
- origin = DirectUrl.from_json(origin_path.read_text(encoding="utf-8"))
- except Exception as e:
- logger.warning(
- "Could not read origin file %s in cache entry (%s). "
- "Will attempt to overwrite it.",
- origin_path,
- e,
- )
- else:
- # TODO: use DirectUrl.equivalent when
- # https://github.com/pypa/pip/pull/10564 is merged.
- if origin.url != download_info.url:
- logger.warning(
- "Origin URL %s in cache entry %s does not match download URL "
- "%s. This is likely a pip bug or a cache corruption issue. "
- "Will overwrite it with the new value.",
- origin.url,
- cache_dir,
- download_info.url,
- )
- origin_path.write_text(download_info.to_json(), encoding="utf-8")
|