util.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. # util.py
  2. import inspect
  3. import warnings
  4. import types
  5. import collections
  6. import itertools
  7. from functools import lru_cache, wraps
  8. from typing import Callable, List, Union, Iterable, TypeVar, cast
  9. _bslash = chr(92)
  10. C = TypeVar("C", bound=Callable)
  11. class __config_flags:
  12. """Internal class for defining compatibility and debugging flags"""
  13. _all_names: List[str] = []
  14. _fixed_names: List[str] = []
  15. _type_desc = "configuration"
  16. @classmethod
  17. def _set(cls, dname, value):
  18. if dname in cls._fixed_names:
  19. warnings.warn(
  20. f"{cls.__name__}.{dname} {cls._type_desc} is {str(getattr(cls, dname)).upper()}"
  21. f" and cannot be overridden",
  22. stacklevel=3,
  23. )
  24. return
  25. if dname in cls._all_names:
  26. setattr(cls, dname, value)
  27. else:
  28. raise ValueError(f"no such {cls._type_desc} {dname!r}")
  29. enable = classmethod(lambda cls, name: cls._set(name, True))
  30. disable = classmethod(lambda cls, name: cls._set(name, False))
  31. @lru_cache(maxsize=128)
  32. def col(loc: int, strg: str) -> int:
  33. """
  34. Returns current column within a string, counting newlines as line separators.
  35. The first column is number 1.
  36. Note: the default parsing behavior is to expand tabs in the input string
  37. before starting the parsing process. See
  38. :class:`ParserElement.parse_string` for more
  39. information on parsing strings containing ``<TAB>`` s, and suggested
  40. methods to maintain a consistent view of the parsed string, the parse
  41. location, and line and column positions within the parsed string.
  42. """
  43. s = strg
  44. return 1 if 0 < loc < len(s) and s[loc - 1] == "\n" else loc - s.rfind("\n", 0, loc)
  45. @lru_cache(maxsize=128)
  46. def lineno(loc: int, strg: str) -> int:
  47. """Returns current line number within a string, counting newlines as line separators.
  48. The first line is number 1.
  49. Note - the default parsing behavior is to expand tabs in the input string
  50. before starting the parsing process. See :class:`ParserElement.parse_string`
  51. for more information on parsing strings containing ``<TAB>`` s, and
  52. suggested methods to maintain a consistent view of the parsed string, the
  53. parse location, and line and column positions within the parsed string.
  54. """
  55. return strg.count("\n", 0, loc) + 1
  56. @lru_cache(maxsize=128)
  57. def line(loc: int, strg: str) -> str:
  58. """
  59. Returns the line of text containing loc within a string, counting newlines as line separators.
  60. """
  61. last_cr = strg.rfind("\n", 0, loc)
  62. next_cr = strg.find("\n", loc)
  63. return strg[last_cr + 1 : next_cr] if next_cr >= 0 else strg[last_cr + 1 :]
  64. class _UnboundedCache:
  65. def __init__(self):
  66. cache = {}
  67. cache_get = cache.get
  68. self.not_in_cache = not_in_cache = object()
  69. def get(_, key):
  70. return cache_get(key, not_in_cache)
  71. def set_(_, key, value):
  72. cache[key] = value
  73. def clear(_):
  74. cache.clear()
  75. self.size = None
  76. self.get = types.MethodType(get, self)
  77. self.set = types.MethodType(set_, self)
  78. self.clear = types.MethodType(clear, self)
  79. class _FifoCache:
  80. def __init__(self, size):
  81. self.not_in_cache = not_in_cache = object()
  82. cache = {}
  83. keyring = [object()] * size
  84. cache_get = cache.get
  85. cache_pop = cache.pop
  86. keyiter = itertools.cycle(range(size))
  87. def get(_, key):
  88. return cache_get(key, not_in_cache)
  89. def set_(_, key, value):
  90. cache[key] = value
  91. i = next(keyiter)
  92. cache_pop(keyring[i], None)
  93. keyring[i] = key
  94. def clear(_):
  95. cache.clear()
  96. keyring[:] = [object()] * size
  97. self.size = size
  98. self.get = types.MethodType(get, self)
  99. self.set = types.MethodType(set_, self)
  100. self.clear = types.MethodType(clear, self)
  101. class LRUMemo:
  102. """
  103. A memoizing mapping that retains `capacity` deleted items
  104. The memo tracks retained items by their access order; once `capacity` items
  105. are retained, the least recently used item is discarded.
  106. """
  107. def __init__(self, capacity):
  108. self._capacity = capacity
  109. self._active = {}
  110. self._memory = collections.OrderedDict()
  111. def __getitem__(self, key):
  112. try:
  113. return self._active[key]
  114. except KeyError:
  115. self._memory.move_to_end(key)
  116. return self._memory[key]
  117. def __setitem__(self, key, value):
  118. self._memory.pop(key, None)
  119. self._active[key] = value
  120. def __delitem__(self, key):
  121. try:
  122. value = self._active.pop(key)
  123. except KeyError:
  124. pass
  125. else:
  126. while len(self._memory) >= self._capacity:
  127. self._memory.popitem(last=False)
  128. self._memory[key] = value
  129. def clear(self):
  130. self._active.clear()
  131. self._memory.clear()
  132. class UnboundedMemo(dict):
  133. """
  134. A memoizing mapping that retains all deleted items
  135. """
  136. def __delitem__(self, key):
  137. pass
  138. def _escape_regex_range_chars(s: str) -> str:
  139. # escape these chars: ^-[]
  140. for c in r"\^-[]":
  141. s = s.replace(c, _bslash + c)
  142. s = s.replace("\n", r"\n")
  143. s = s.replace("\t", r"\t")
  144. return str(s)
  145. def _collapse_string_to_ranges(
  146. s: Union[str, Iterable[str]], re_escape: bool = True
  147. ) -> str:
  148. def is_consecutive(c):
  149. c_int = ord(c)
  150. is_consecutive.prev, prev = c_int, is_consecutive.prev
  151. if c_int - prev > 1:
  152. is_consecutive.value = next(is_consecutive.counter)
  153. return is_consecutive.value
  154. is_consecutive.prev = 0 # type: ignore [attr-defined]
  155. is_consecutive.counter = itertools.count() # type: ignore [attr-defined]
  156. is_consecutive.value = -1 # type: ignore [attr-defined]
  157. def escape_re_range_char(c):
  158. return "\\" + c if c in r"\^-][" else c
  159. def no_escape_re_range_char(c):
  160. return c
  161. if not re_escape:
  162. escape_re_range_char = no_escape_re_range_char
  163. ret = []
  164. s = "".join(sorted(set(s)))
  165. if len(s) > 3:
  166. for _, chars in itertools.groupby(s, key=is_consecutive):
  167. first = last = next(chars)
  168. last = collections.deque(
  169. itertools.chain(iter([last]), chars), maxlen=1
  170. ).pop()
  171. if first == last:
  172. ret.append(escape_re_range_char(first))
  173. else:
  174. sep = "" if ord(last) == ord(first) + 1 else "-"
  175. ret.append(
  176. f"{escape_re_range_char(first)}{sep}{escape_re_range_char(last)}"
  177. )
  178. else:
  179. ret = [escape_re_range_char(c) for c in s]
  180. return "".join(ret)
  181. def _flatten(ll: list) -> list:
  182. ret = []
  183. for i in ll:
  184. if isinstance(i, list):
  185. ret.extend(_flatten(i))
  186. else:
  187. ret.append(i)
  188. return ret
  189. def _make_synonym_function(compat_name: str, fn: C) -> C:
  190. # In a future version, uncomment the code in the internal _inner() functions
  191. # to begin emitting DeprecationWarnings.
  192. # Unwrap staticmethod/classmethod
  193. fn = getattr(fn, "__func__", fn)
  194. # (Presence of 'self' arg in signature is used by explain_exception() methods, so we take
  195. # some extra steps to add it if present in decorated function.)
  196. if "self" == list(inspect.signature(fn).parameters)[0]:
  197. @wraps(fn)
  198. def _inner(self, *args, **kwargs):
  199. # warnings.warn(
  200. # f"Deprecated - use {fn.__name__}", DeprecationWarning, stacklevel=3
  201. # )
  202. return fn(self, *args, **kwargs)
  203. else:
  204. @wraps(fn)
  205. def _inner(*args, **kwargs):
  206. # warnings.warn(
  207. # f"Deprecated - use {fn.__name__}", DeprecationWarning, stacklevel=3
  208. # )
  209. return fn(*args, **kwargs)
  210. _inner.__doc__ = f"""Deprecated - use :class:`{fn.__name__}`"""
  211. _inner.__name__ = compat_name
  212. _inner.__annotations__ = fn.__annotations__
  213. if isinstance(fn, types.FunctionType):
  214. _inner.__kwdefaults__ = fn.__kwdefaults__
  215. elif isinstance(fn, type) and hasattr(fn, "__init__"):
  216. _inner.__kwdefaults__ = fn.__init__.__kwdefaults__
  217. else:
  218. _inner.__kwdefaults__ = None
  219. _inner.__qualname__ = fn.__qualname__
  220. return cast(C, _inner)
  221. def replaced_by_pep8(fn: C) -> Callable[[Callable], C]:
  222. """
  223. Decorator for pre-PEP8 compatibility synonyms, to link them to the new function.
  224. """
  225. return lambda other: _make_synonym_function(other.__name__, fn)