123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752 |
- import bisect
- import calendar
- import collections
- import functools
- import re
- import weakref
- from datetime import datetime, timedelta, tzinfo
- from . import _common, _tzpath
- EPOCH = datetime(1970, 1, 1)
- EPOCHORDINAL = datetime(1970, 1, 1).toordinal()
- # It is relatively expensive to construct new timedelta objects, and in most
- # cases we're looking at the same deltas, like integer numbers of hours, etc.
- # To improve speed and memory use, we'll keep a dictionary with references
- # to the ones we've already used so far.
- #
- # Loading every time zone in the 2020a version of the time zone database
- # requires 447 timedeltas, which requires approximately the amount of space
- # that ZoneInfo("America/New_York") with 236 transitions takes up, so we will
- # set the cache size to 512 so that in the common case we always get cache
- # hits, but specifically crafted ZoneInfo objects don't leak arbitrary amounts
- # of memory.
- @functools.lru_cache(maxsize=512)
- def _load_timedelta(seconds):
- return timedelta(seconds=seconds)
- class ZoneInfo(tzinfo):
- _strong_cache_size = 8
- _strong_cache = collections.OrderedDict()
- _weak_cache = weakref.WeakValueDictionary()
- __module__ = "zoneinfo"
- def __init_subclass__(cls):
- cls._strong_cache = collections.OrderedDict()
- cls._weak_cache = weakref.WeakValueDictionary()
- def __new__(cls, key):
- instance = cls._weak_cache.get(key, None)
- if instance is None:
- instance = cls._weak_cache.setdefault(key, cls._new_instance(key))
- instance._from_cache = True
- # Update the "strong" cache
- cls._strong_cache[key] = cls._strong_cache.pop(key, instance)
- if len(cls._strong_cache) > cls._strong_cache_size:
- cls._strong_cache.popitem(last=False)
- return instance
- @classmethod
- def no_cache(cls, key):
- obj = cls._new_instance(key)
- obj._from_cache = False
- return obj
- @classmethod
- def _new_instance(cls, key):
- obj = super().__new__(cls)
- obj._key = key
- obj._file_path = obj._find_tzfile(key)
- if obj._file_path is not None:
- file_obj = open(obj._file_path, "rb")
- else:
- file_obj = _common.load_tzdata(key)
- with file_obj as f:
- obj._load_file(f)
- return obj
- @classmethod
- def from_file(cls, fobj, /, key=None):
- obj = super().__new__(cls)
- obj._key = key
- obj._file_path = None
- obj._load_file(fobj)
- obj._file_repr = repr(fobj)
- # Disable pickling for objects created from files
- obj.__reduce__ = obj._file_reduce
- return obj
- @classmethod
- def clear_cache(cls, *, only_keys=None):
- if only_keys is not None:
- for key in only_keys:
- cls._weak_cache.pop(key, None)
- cls._strong_cache.pop(key, None)
- else:
- cls._weak_cache.clear()
- cls._strong_cache.clear()
- @property
- def key(self):
- return self._key
- def utcoffset(self, dt):
- return self._find_trans(dt).utcoff
- def dst(self, dt):
- return self._find_trans(dt).dstoff
- def tzname(self, dt):
- return self._find_trans(dt).tzname
- def fromutc(self, dt):
- """Convert from datetime in UTC to datetime in local time"""
- if not isinstance(dt, datetime):
- raise TypeError("fromutc() requires a datetime argument")
- if dt.tzinfo is not self:
- raise ValueError("dt.tzinfo is not self")
- timestamp = self._get_local_timestamp(dt)
- num_trans = len(self._trans_utc)
- if num_trans >= 1 and timestamp < self._trans_utc[0]:
- tti = self._tti_before
- fold = 0
- elif (
- num_trans == 0 or timestamp > self._trans_utc[-1]
- ) and not isinstance(self._tz_after, _ttinfo):
- tti, fold = self._tz_after.get_trans_info_fromutc(
- timestamp, dt.year
- )
- elif num_trans == 0:
- tti = self._tz_after
- fold = 0
- else:
- idx = bisect.bisect_right(self._trans_utc, timestamp)
- if num_trans > 1 and timestamp >= self._trans_utc[1]:
- tti_prev, tti = self._ttinfos[idx - 2 : idx]
- elif timestamp > self._trans_utc[-1]:
- tti_prev = self._ttinfos[-1]
- tti = self._tz_after
- else:
- tti_prev = self._tti_before
- tti = self._ttinfos[0]
- # Detect fold
- shift = tti_prev.utcoff - tti.utcoff
- fold = shift.total_seconds() > timestamp - self._trans_utc[idx - 1]
- dt += tti.utcoff
- if fold:
- return dt.replace(fold=1)
- else:
- return dt
- def _find_trans(self, dt):
- if dt is None:
- if self._fixed_offset:
- return self._tz_after
- else:
- return _NO_TTINFO
- ts = self._get_local_timestamp(dt)
- lt = self._trans_local[dt.fold]
- num_trans = len(lt)
- if num_trans and ts < lt[0]:
- return self._tti_before
- elif not num_trans or ts > lt[-1]:
- if isinstance(self._tz_after, _TZStr):
- return self._tz_after.get_trans_info(ts, dt.year, dt.fold)
- else:
- return self._tz_after
- else:
- # idx is the transition that occurs after this timestamp, so we
- # subtract off 1 to get the current ttinfo
- idx = bisect.bisect_right(lt, ts) - 1
- assert idx >= 0
- return self._ttinfos[idx]
- def _get_local_timestamp(self, dt):
- return (
- (dt.toordinal() - EPOCHORDINAL) * 86400
- + dt.hour * 3600
- + dt.minute * 60
- + dt.second
- )
- def __str__(self):
- if self._key is not None:
- return f"{self._key}"
- else:
- return repr(self)
- def __repr__(self):
- if self._key is not None:
- return f"{self.__class__.__name__}(key={self._key!r})"
- else:
- return f"{self.__class__.__name__}.from_file({self._file_repr})"
- def __reduce__(self):
- return (self.__class__._unpickle, (self._key, self._from_cache))
- def _file_reduce(self):
- import pickle
- raise pickle.PicklingError(
- "Cannot pickle a ZoneInfo file created from a file stream."
- )
- @classmethod
- def _unpickle(cls, key, from_cache, /):
- if from_cache:
- return cls(key)
- else:
- return cls.no_cache(key)
- def _find_tzfile(self, key):
- return _tzpath.find_tzfile(key)
- def _load_file(self, fobj):
- # Retrieve all the data as it exists in the zoneinfo file
- trans_idx, trans_utc, utcoff, isdst, abbr, tz_str = _common.load_data(
- fobj
- )
- # Infer the DST offsets (needed for .dst()) from the data
- dstoff = self._utcoff_to_dstoff(trans_idx, utcoff, isdst)
- # Convert all the transition times (UTC) into "seconds since 1970-01-01 local time"
- trans_local = self._ts_to_local(trans_idx, trans_utc, utcoff)
- # Construct `_ttinfo` objects for each transition in the file
- _ttinfo_list = [
- _ttinfo(
- _load_timedelta(utcoffset), _load_timedelta(dstoffset), tzname
- )
- for utcoffset, dstoffset, tzname in zip(utcoff, dstoff, abbr)
- ]
- self._trans_utc = trans_utc
- self._trans_local = trans_local
- self._ttinfos = [_ttinfo_list[idx] for idx in trans_idx]
- # Find the first non-DST transition
- for i in range(len(isdst)):
- if not isdst[i]:
- self._tti_before = _ttinfo_list[i]
- break
- else:
- if self._ttinfos:
- self._tti_before = self._ttinfos[0]
- else:
- self._tti_before = None
- # Set the "fallback" time zone
- if tz_str is not None and tz_str != b"":
- self._tz_after = _parse_tz_str(tz_str.decode())
- else:
- if not self._ttinfos and not _ttinfo_list:
- raise ValueError("No time zone information found.")
- if self._ttinfos:
- self._tz_after = self._ttinfos[-1]
- else:
- self._tz_after = _ttinfo_list[-1]
- # Determine if this is a "fixed offset" zone, meaning that the output
- # of the utcoffset, dst and tzname functions does not depend on the
- # specific datetime passed.
- #
- # We make three simplifying assumptions here:
- #
- # 1. If _tz_after is not a _ttinfo, it has transitions that might
- # actually occur (it is possible to construct TZ strings that
- # specify STD and DST but no transitions ever occur, such as
- # AAA0BBB,0/0,J365/25).
- # 2. If _ttinfo_list contains more than one _ttinfo object, the objects
- # represent different offsets.
- # 3. _ttinfo_list contains no unused _ttinfos (in which case an
- # otherwise fixed-offset zone with extra _ttinfos defined may
- # appear to *not* be a fixed offset zone).
- #
- # Violations to these assumptions would be fairly exotic, and exotic
- # zones should almost certainly not be used with datetime.time (the
- # only thing that would be affected by this).
- if len(_ttinfo_list) > 1 or not isinstance(self._tz_after, _ttinfo):
- self._fixed_offset = False
- elif not _ttinfo_list:
- self._fixed_offset = True
- else:
- self._fixed_offset = _ttinfo_list[0] == self._tz_after
- @staticmethod
- def _utcoff_to_dstoff(trans_idx, utcoffsets, isdsts):
- # Now we must transform our ttis and abbrs into `_ttinfo` objects,
- # but there is an issue: .dst() must return a timedelta with the
- # difference between utcoffset() and the "standard" offset, but
- # the "base offset" and "DST offset" are not encoded in the file;
- # we can infer what they are from the isdst flag, but it is not
- # sufficient to just look at the last standard offset, because
- # occasionally countries will shift both DST offset and base offset.
- typecnt = len(isdsts)
- dstoffs = [0] * typecnt # Provisionally assign all to 0.
- dst_cnt = sum(isdsts)
- dst_found = 0
- for i in range(1, len(trans_idx)):
- if dst_cnt == dst_found:
- break
- idx = trans_idx[i]
- dst = isdsts[idx]
- # We're only going to look at daylight saving time
- if not dst:
- continue
- # Skip any offsets that have already been assigned
- if dstoffs[idx] != 0:
- continue
- dstoff = 0
- utcoff = utcoffsets[idx]
- comp_idx = trans_idx[i - 1]
- if not isdsts[comp_idx]:
- dstoff = utcoff - utcoffsets[comp_idx]
- if not dstoff and idx < (typecnt - 1):
- comp_idx = trans_idx[i + 1]
- # If the following transition is also DST and we couldn't
- # find the DST offset by this point, we're going to have to
- # skip it and hope this transition gets assigned later
- if isdsts[comp_idx]:
- continue
- dstoff = utcoff - utcoffsets[comp_idx]
- if dstoff:
- dst_found += 1
- dstoffs[idx] = dstoff
- else:
- # If we didn't find a valid value for a given index, we'll end up
- # with dstoff = 0 for something where `isdst=1`. This is obviously
- # wrong - one hour will be a much better guess than 0
- for idx in range(typecnt):
- if not dstoffs[idx] and isdsts[idx]:
- dstoffs[idx] = 3600
- return dstoffs
- @staticmethod
- def _ts_to_local(trans_idx, trans_list_utc, utcoffsets):
- """Generate number of seconds since 1970 *in the local time*.
- This is necessary to easily find the transition times in local time"""
- if not trans_list_utc:
- return [[], []]
- # Start with the timestamps and modify in-place
- trans_list_wall = [list(trans_list_utc), list(trans_list_utc)]
- if len(utcoffsets) > 1:
- offset_0 = utcoffsets[0]
- offset_1 = utcoffsets[trans_idx[0]]
- if offset_1 > offset_0:
- offset_1, offset_0 = offset_0, offset_1
- else:
- offset_0 = offset_1 = utcoffsets[0]
- trans_list_wall[0][0] += offset_0
- trans_list_wall[1][0] += offset_1
- for i in range(1, len(trans_idx)):
- offset_0 = utcoffsets[trans_idx[i - 1]]
- offset_1 = utcoffsets[trans_idx[i]]
- if offset_1 > offset_0:
- offset_1, offset_0 = offset_0, offset_1
- trans_list_wall[0][i] += offset_0
- trans_list_wall[1][i] += offset_1
- return trans_list_wall
- class _ttinfo:
- __slots__ = ["utcoff", "dstoff", "tzname"]
- def __init__(self, utcoff, dstoff, tzname):
- self.utcoff = utcoff
- self.dstoff = dstoff
- self.tzname = tzname
- def __eq__(self, other):
- return (
- self.utcoff == other.utcoff
- and self.dstoff == other.dstoff
- and self.tzname == other.tzname
- )
- def __repr__(self): # pragma: nocover
- return (
- f"{self.__class__.__name__}"
- + f"({self.utcoff}, {self.dstoff}, {self.tzname})"
- )
- _NO_TTINFO = _ttinfo(None, None, None)
- class _TZStr:
- __slots__ = (
- "std",
- "dst",
- "start",
- "end",
- "get_trans_info",
- "get_trans_info_fromutc",
- "dst_diff",
- )
- def __init__(
- self, std_abbr, std_offset, dst_abbr, dst_offset, start=None, end=None
- ):
- self.dst_diff = dst_offset - std_offset
- std_offset = _load_timedelta(std_offset)
- self.std = _ttinfo(
- utcoff=std_offset, dstoff=_load_timedelta(0), tzname=std_abbr
- )
- self.start = start
- self.end = end
- dst_offset = _load_timedelta(dst_offset)
- delta = _load_timedelta(self.dst_diff)
- self.dst = _ttinfo(utcoff=dst_offset, dstoff=delta, tzname=dst_abbr)
- # These are assertions because the constructor should only be called
- # by functions that would fail before passing start or end
- assert start is not None, "No transition start specified"
- assert end is not None, "No transition end specified"
- self.get_trans_info = self._get_trans_info
- self.get_trans_info_fromutc = self._get_trans_info_fromutc
- def transitions(self, year):
- start = self.start.year_to_epoch(year)
- end = self.end.year_to_epoch(year)
- return start, end
- def _get_trans_info(self, ts, year, fold):
- """Get the information about the current transition - tti"""
- start, end = self.transitions(year)
- # With fold = 0, the period (denominated in local time) with the
- # smaller offset starts at the end of the gap and ends at the end of
- # the fold; with fold = 1, it runs from the start of the gap to the
- # beginning of the fold.
- #
- # So in order to determine the DST boundaries we need to know both
- # the fold and whether DST is positive or negative (rare), and it
- # turns out that this boils down to fold XOR is_positive.
- if fold == (self.dst_diff >= 0):
- end -= self.dst_diff
- else:
- start += self.dst_diff
- if start < end:
- isdst = start <= ts < end
- else:
- isdst = not (end <= ts < start)
- return self.dst if isdst else self.std
- def _get_trans_info_fromutc(self, ts, year):
- start, end = self.transitions(year)
- start -= self.std.utcoff.total_seconds()
- end -= self.dst.utcoff.total_seconds()
- if start < end:
- isdst = start <= ts < end
- else:
- isdst = not (end <= ts < start)
- # For positive DST, the ambiguous period is one dst_diff after the end
- # of DST; for negative DST, the ambiguous period is one dst_diff before
- # the start of DST.
- if self.dst_diff > 0:
- ambig_start = end
- ambig_end = end + self.dst_diff
- else:
- ambig_start = start
- ambig_end = start - self.dst_diff
- fold = ambig_start <= ts < ambig_end
- return (self.dst if isdst else self.std, fold)
- def _post_epoch_days_before_year(year):
- """Get the number of days between 1970-01-01 and YEAR-01-01"""
- y = year - 1
- return y * 365 + y // 4 - y // 100 + y // 400 - EPOCHORDINAL
- class _DayOffset:
- __slots__ = ["d", "julian", "hour", "minute", "second"]
- def __init__(self, d, julian, hour=2, minute=0, second=0):
- if not (0 + julian) <= d <= 365:
- min_day = 0 + julian
- raise ValueError(f"d must be in [{min_day}, 365], not: {d}")
- self.d = d
- self.julian = julian
- self.hour = hour
- self.minute = minute
- self.second = second
- def year_to_epoch(self, year):
- days_before_year = _post_epoch_days_before_year(year)
- d = self.d
- if self.julian and d >= 59 and calendar.isleap(year):
- d += 1
- epoch = (days_before_year + d) * 86400
- epoch += self.hour * 3600 + self.minute * 60 + self.second
- return epoch
- class _CalendarOffset:
- __slots__ = ["m", "w", "d", "hour", "minute", "second"]
- _DAYS_BEFORE_MONTH = (
- -1,
- 0,
- 31,
- 59,
- 90,
- 120,
- 151,
- 181,
- 212,
- 243,
- 273,
- 304,
- 334,
- )
- def __init__(self, m, w, d, hour=2, minute=0, second=0):
- if not 0 < m <= 12:
- raise ValueError("m must be in (0, 12]")
- if not 0 < w <= 5:
- raise ValueError("w must be in (0, 5]")
- if not 0 <= d <= 6:
- raise ValueError("d must be in [0, 6]")
- self.m = m
- self.w = w
- self.d = d
- self.hour = hour
- self.minute = minute
- self.second = second
- @classmethod
- def _ymd2ord(cls, year, month, day):
- return (
- _post_epoch_days_before_year(year)
- + cls._DAYS_BEFORE_MONTH[month]
- + (month > 2 and calendar.isleap(year))
- + day
- )
- # TODO: These are not actually epoch dates as they are expressed in local time
- def year_to_epoch(self, year):
- """Calculates the datetime of the occurrence from the year"""
- # We know year and month, we need to convert w, d into day of month
- #
- # Week 1 is the first week in which day `d` (where 0 = Sunday) appears.
- # Week 5 represents the last occurrence of day `d`, so we need to know
- # the range of the month.
- first_day, days_in_month = calendar.monthrange(year, self.m)
- # This equation seems magical, so I'll break it down:
- # 1. calendar says 0 = Monday, POSIX says 0 = Sunday
- # so we need first_day + 1 to get 1 = Monday -> 7 = Sunday,
- # which is still equivalent because this math is mod 7
- # 2. Get first day - desired day mod 7: -1 % 7 = 6, so we don't need
- # to do anything to adjust negative numbers.
- # 3. Add 1 because month days are a 1-based index.
- month_day = (self.d - (first_day + 1)) % 7 + 1
- # Now use a 0-based index version of `w` to calculate the w-th
- # occurrence of `d`
- month_day += (self.w - 1) * 7
- # month_day will only be > days_in_month if w was 5, and `w` means
- # "last occurrence of `d`", so now we just check if we over-shot the
- # end of the month and if so knock off 1 week.
- if month_day > days_in_month:
- month_day -= 7
- ordinal = self._ymd2ord(year, self.m, month_day)
- epoch = ordinal * 86400
- epoch += self.hour * 3600 + self.minute * 60 + self.second
- return epoch
- def _parse_tz_str(tz_str):
- # The tz string has the format:
- #
- # std[offset[dst[offset],start[/time],end[/time]]]
- #
- # std and dst must be 3 or more characters long and must not contain
- # a leading colon, embedded digits, commas, nor a plus or minus signs;
- # The spaces between "std" and "offset" are only for display and are
- # not actually present in the string.
- #
- # The format of the offset is ``[+|-]hh[:mm[:ss]]``
- offset_str, *start_end_str = tz_str.split(",", 1)
- # fmt: off
- parser_re = re.compile(
- r"(?P<std>[^<0-9:.+-]+|<[a-zA-Z0-9+\-]+>)" +
- r"((?P<stdoff>[+-]?\d{1,2}(:\d{2}(:\d{2})?)?)" +
- r"((?P<dst>[^0-9:.+-]+|<[a-zA-Z0-9+\-]+>)" +
- r"((?P<dstoff>[+-]?\d{1,2}(:\d{2}(:\d{2})?)?))?" +
- r")?" + # dst
- r")?$" # stdoff
- )
- # fmt: on
- m = parser_re.match(offset_str)
- if m is None:
- raise ValueError(f"{tz_str} is not a valid TZ string")
- std_abbr = m.group("std")
- dst_abbr = m.group("dst")
- dst_offset = None
- std_abbr = std_abbr.strip("<>")
- if dst_abbr:
- dst_abbr = dst_abbr.strip("<>")
- if std_offset := m.group("stdoff"):
- try:
- std_offset = _parse_tz_delta(std_offset)
- except ValueError as e:
- raise ValueError(f"Invalid STD offset in {tz_str}") from e
- else:
- std_offset = 0
- if dst_abbr is not None:
- if dst_offset := m.group("dstoff"):
- try:
- dst_offset = _parse_tz_delta(dst_offset)
- except ValueError as e:
- raise ValueError(f"Invalid DST offset in {tz_str}") from e
- else:
- dst_offset = std_offset + 3600
- if not start_end_str:
- raise ValueError(f"Missing transition rules: {tz_str}")
- start_end_strs = start_end_str[0].split(",", 1)
- try:
- start, end = (_parse_dst_start_end(x) for x in start_end_strs)
- except ValueError as e:
- raise ValueError(f"Invalid TZ string: {tz_str}") from e
- return _TZStr(std_abbr, std_offset, dst_abbr, dst_offset, start, end)
- elif start_end_str:
- raise ValueError(f"Transition rule present without DST: {tz_str}")
- else:
- # This is a static ttinfo, don't return _TZStr
- return _ttinfo(
- _load_timedelta(std_offset), _load_timedelta(0), std_abbr
- )
- def _parse_dst_start_end(dststr):
- date, *time = dststr.split("/")
- if date[0] == "M":
- n_is_julian = False
- m = re.match(r"M(\d{1,2})\.(\d).(\d)$", date)
- if m is None:
- raise ValueError(f"Invalid dst start/end date: {dststr}")
- date_offset = tuple(map(int, m.groups()))
- offset = _CalendarOffset(*date_offset)
- else:
- if date[0] == "J":
- n_is_julian = True
- date = date[1:]
- else:
- n_is_julian = False
- doy = int(date)
- offset = _DayOffset(doy, n_is_julian)
- if time:
- time_components = list(map(int, time[0].split(":")))
- n_components = len(time_components)
- if n_components < 3:
- time_components.extend([0] * (3 - n_components))
- offset.hour, offset.minute, offset.second = time_components
- return offset
- def _parse_tz_delta(tz_delta):
- match = re.match(
- r"(?P<sign>[+-])?(?P<h>\d{1,2})(:(?P<m>\d{2})(:(?P<s>\d{2}))?)?",
- tz_delta,
- )
- # Anything passed to this function should already have hit an equivalent
- # regular expression to find the section to parse.
- assert match is not None, tz_delta
- h, m, s = (
- int(v) if v is not None else 0
- for v in map(match.group, ("h", "m", "s"))
- )
- total = h * 3600 + m * 60 + s
- if not -86400 < total < 86400:
- raise ValueError(
- f"Offset must be strictly between -24h and +24h: {tz_delta}"
- )
- # Yes, +5 maps to an offset of -5h
- if match.group("sign") != "-":
- total *= -1
- return total
|