warnings.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595
  1. """Python part of the warnings subsystem."""
  2. import sys
  3. __all__ = ["warn", "warn_explicit", "showwarning",
  4. "formatwarning", "filterwarnings", "simplefilter",
  5. "resetwarnings", "catch_warnings"]
  6. def showwarning(message, category, filename, lineno, file=None, line=None):
  7. """Hook to write a warning to a file; replace if you like."""
  8. msg = WarningMessage(message, category, filename, lineno, file, line)
  9. _showwarnmsg_impl(msg)
  10. def formatwarning(message, category, filename, lineno, line=None):
  11. """Function to format a warning the standard way."""
  12. msg = WarningMessage(message, category, filename, lineno, None, line)
  13. return _formatwarnmsg_impl(msg)
  14. def _showwarnmsg_impl(msg):
  15. file = msg.file
  16. if file is None:
  17. file = sys.stderr
  18. if file is None:
  19. # sys.stderr is None when run with pythonw.exe:
  20. # warnings get lost
  21. return
  22. text = _formatwarnmsg(msg)
  23. try:
  24. file.write(text)
  25. except OSError:
  26. # the file (probably stderr) is invalid - this warning gets lost.
  27. pass
  28. def _formatwarnmsg_impl(msg):
  29. category = msg.category.__name__
  30. s = f"{msg.filename}:{msg.lineno}: {category}: {msg.message}\n"
  31. if msg.line is None:
  32. try:
  33. import linecache
  34. line = linecache.getline(msg.filename, msg.lineno)
  35. except Exception:
  36. # When a warning is logged during Python shutdown, linecache
  37. # and the import machinery don't work anymore
  38. line = None
  39. linecache = None
  40. else:
  41. line = msg.line
  42. if line:
  43. line = line.strip()
  44. s += " %s\n" % line
  45. if msg.source is not None:
  46. try:
  47. import tracemalloc
  48. # Logging a warning should not raise a new exception:
  49. # catch Exception, not only ImportError and RecursionError.
  50. except Exception:
  51. # don't suggest to enable tracemalloc if it's not available
  52. tracing = True
  53. tb = None
  54. else:
  55. tracing = tracemalloc.is_tracing()
  56. try:
  57. tb = tracemalloc.get_object_traceback(msg.source)
  58. except Exception:
  59. # When a warning is logged during Python shutdown, tracemalloc
  60. # and the import machinery don't work anymore
  61. tb = None
  62. if tb is not None:
  63. s += 'Object allocated at (most recent call last):\n'
  64. for frame in tb:
  65. s += (' File "%s", lineno %s\n'
  66. % (frame.filename, frame.lineno))
  67. try:
  68. if linecache is not None:
  69. line = linecache.getline(frame.filename, frame.lineno)
  70. else:
  71. line = None
  72. except Exception:
  73. line = None
  74. if line:
  75. line = line.strip()
  76. s += ' %s\n' % line
  77. elif not tracing:
  78. s += (f'{category}: Enable tracemalloc to get the object '
  79. f'allocation traceback\n')
  80. return s
  81. # Keep a reference to check if the function was replaced
  82. _showwarning_orig = showwarning
  83. def _showwarnmsg(msg):
  84. """Hook to write a warning to a file; replace if you like."""
  85. try:
  86. sw = showwarning
  87. except NameError:
  88. pass
  89. else:
  90. if sw is not _showwarning_orig:
  91. # warnings.showwarning() was replaced
  92. if not callable(sw):
  93. raise TypeError("warnings.showwarning() must be set to a "
  94. "function or method")
  95. sw(msg.message, msg.category, msg.filename, msg.lineno,
  96. msg.file, msg.line)
  97. return
  98. _showwarnmsg_impl(msg)
  99. # Keep a reference to check if the function was replaced
  100. _formatwarning_orig = formatwarning
  101. def _formatwarnmsg(msg):
  102. """Function to format a warning the standard way."""
  103. try:
  104. fw = formatwarning
  105. except NameError:
  106. pass
  107. else:
  108. if fw is not _formatwarning_orig:
  109. # warnings.formatwarning() was replaced
  110. return fw(msg.message, msg.category,
  111. msg.filename, msg.lineno, msg.line)
  112. return _formatwarnmsg_impl(msg)
  113. def filterwarnings(action, message="", category=Warning, module="", lineno=0,
  114. append=False):
  115. """Insert an entry into the list of warnings filters (at the front).
  116. 'action' -- one of "error", "ignore", "always", "default", "module",
  117. or "once"
  118. 'message' -- a regex that the warning message must match
  119. 'category' -- a class that the warning must be a subclass of
  120. 'module' -- a regex that the module name must match
  121. 'lineno' -- an integer line number, 0 matches all warnings
  122. 'append' -- if true, append to the list of filters
  123. """
  124. assert action in ("error", "ignore", "always", "default", "module",
  125. "once"), "invalid action: %r" % (action,)
  126. assert isinstance(message, str), "message must be a string"
  127. assert isinstance(category, type), "category must be a class"
  128. assert issubclass(category, Warning), "category must be a Warning subclass"
  129. assert isinstance(module, str), "module must be a string"
  130. assert isinstance(lineno, int) and lineno >= 0, \
  131. "lineno must be an int >= 0"
  132. if message or module:
  133. import re
  134. if message:
  135. message = re.compile(message, re.I)
  136. else:
  137. message = None
  138. if module:
  139. module = re.compile(module)
  140. else:
  141. module = None
  142. _add_filter(action, message, category, module, lineno, append=append)
  143. def simplefilter(action, category=Warning, lineno=0, append=False):
  144. """Insert a simple entry into the list of warnings filters (at the front).
  145. A simple filter matches all modules and messages.
  146. 'action' -- one of "error", "ignore", "always", "default", "module",
  147. or "once"
  148. 'category' -- a class that the warning must be a subclass of
  149. 'lineno' -- an integer line number, 0 matches all warnings
  150. 'append' -- if true, append to the list of filters
  151. """
  152. assert action in ("error", "ignore", "always", "default", "module",
  153. "once"), "invalid action: %r" % (action,)
  154. assert isinstance(lineno, int) and lineno >= 0, \
  155. "lineno must be an int >= 0"
  156. _add_filter(action, None, category, None, lineno, append=append)
  157. def _add_filter(*item, append):
  158. # Remove possible duplicate filters, so new one will be placed
  159. # in correct place. If append=True and duplicate exists, do nothing.
  160. if not append:
  161. try:
  162. filters.remove(item)
  163. except ValueError:
  164. pass
  165. filters.insert(0, item)
  166. else:
  167. if item not in filters:
  168. filters.append(item)
  169. _filters_mutated()
  170. def resetwarnings():
  171. """Clear the list of warning filters, so that no filters are active."""
  172. filters[:] = []
  173. _filters_mutated()
  174. class _OptionError(Exception):
  175. """Exception used by option processing helpers."""
  176. pass
  177. # Helper to process -W options passed via sys.warnoptions
  178. def _processoptions(args):
  179. for arg in args:
  180. try:
  181. _setoption(arg)
  182. except _OptionError as msg:
  183. print("Invalid -W option ignored:", msg, file=sys.stderr)
  184. # Helper for _processoptions()
  185. def _setoption(arg):
  186. parts = arg.split(':')
  187. if len(parts) > 5:
  188. raise _OptionError("too many fields (max 5): %r" % (arg,))
  189. while len(parts) < 5:
  190. parts.append('')
  191. action, message, category, module, lineno = [s.strip()
  192. for s in parts]
  193. action = _getaction(action)
  194. category = _getcategory(category)
  195. if message or module:
  196. import re
  197. if message:
  198. message = re.escape(message)
  199. if module:
  200. module = re.escape(module) + r'\Z'
  201. if lineno:
  202. try:
  203. lineno = int(lineno)
  204. if lineno < 0:
  205. raise ValueError
  206. except (ValueError, OverflowError):
  207. raise _OptionError("invalid lineno %r" % (lineno,)) from None
  208. else:
  209. lineno = 0
  210. filterwarnings(action, message, category, module, lineno)
  211. # Helper for _setoption()
  212. def _getaction(action):
  213. if not action:
  214. return "default"
  215. if action == "all": return "always" # Alias
  216. for a in ('default', 'always', 'ignore', 'module', 'once', 'error'):
  217. if a.startswith(action):
  218. return a
  219. raise _OptionError("invalid action: %r" % (action,))
  220. # Helper for _setoption()
  221. def _getcategory(category):
  222. if not category:
  223. return Warning
  224. if '.' not in category:
  225. import builtins as m
  226. klass = category
  227. else:
  228. module, _, klass = category.rpartition('.')
  229. try:
  230. m = __import__(module, None, None, [klass])
  231. except ImportError:
  232. raise _OptionError("invalid module name: %r" % (module,)) from None
  233. try:
  234. cat = getattr(m, klass)
  235. except AttributeError:
  236. raise _OptionError("unknown warning category: %r" % (category,)) from None
  237. if not issubclass(cat, Warning):
  238. raise _OptionError("invalid warning category: %r" % (category,))
  239. return cat
  240. def _is_internal_filename(filename):
  241. return 'importlib' in filename and '_bootstrap' in filename
  242. def _is_filename_to_skip(filename, skip_file_prefixes):
  243. return any(filename.startswith(prefix) for prefix in skip_file_prefixes)
  244. def _is_internal_frame(frame):
  245. """Signal whether the frame is an internal CPython implementation detail."""
  246. return _is_internal_filename(frame.f_code.co_filename)
  247. def _next_external_frame(frame, skip_file_prefixes):
  248. """Find the next frame that doesn't involve Python or user internals."""
  249. frame = frame.f_back
  250. while frame is not None and (
  251. _is_internal_filename(filename := frame.f_code.co_filename) or
  252. _is_filename_to_skip(filename, skip_file_prefixes)):
  253. frame = frame.f_back
  254. return frame
  255. # Code typically replaced by _warnings
  256. def warn(message, category=None, stacklevel=1, source=None,
  257. *, skip_file_prefixes=()):
  258. """Issue a warning, or maybe ignore it or raise an exception."""
  259. # Check if message is already a Warning object
  260. if isinstance(message, Warning):
  261. category = message.__class__
  262. # Check category argument
  263. if category is None:
  264. category = UserWarning
  265. if not (isinstance(category, type) and issubclass(category, Warning)):
  266. raise TypeError("category must be a Warning subclass, "
  267. "not '{:s}'".format(type(category).__name__))
  268. if not isinstance(skip_file_prefixes, tuple):
  269. # The C version demands a tuple for implementation performance.
  270. raise TypeError('skip_file_prefixes must be a tuple of strs.')
  271. if skip_file_prefixes:
  272. stacklevel = max(2, stacklevel)
  273. # Get context information
  274. try:
  275. if stacklevel <= 1 or _is_internal_frame(sys._getframe(1)):
  276. # If frame is too small to care or if the warning originated in
  277. # internal code, then do not try to hide any frames.
  278. frame = sys._getframe(stacklevel)
  279. else:
  280. frame = sys._getframe(1)
  281. # Look for one frame less since the above line starts us off.
  282. for x in range(stacklevel-1):
  283. frame = _next_external_frame(frame, skip_file_prefixes)
  284. if frame is None:
  285. raise ValueError
  286. except ValueError:
  287. globals = sys.__dict__
  288. filename = "sys"
  289. lineno = 1
  290. else:
  291. globals = frame.f_globals
  292. filename = frame.f_code.co_filename
  293. lineno = frame.f_lineno
  294. if '__name__' in globals:
  295. module = globals['__name__']
  296. else:
  297. module = "<string>"
  298. registry = globals.setdefault("__warningregistry__", {})
  299. warn_explicit(message, category, filename, lineno, module, registry,
  300. globals, source)
  301. def warn_explicit(message, category, filename, lineno,
  302. module=None, registry=None, module_globals=None,
  303. source=None):
  304. lineno = int(lineno)
  305. if module is None:
  306. module = filename or "<unknown>"
  307. if module[-3:].lower() == ".py":
  308. module = module[:-3] # XXX What about leading pathname?
  309. if registry is None:
  310. registry = {}
  311. if registry.get('version', 0) != _filters_version:
  312. registry.clear()
  313. registry['version'] = _filters_version
  314. if isinstance(message, Warning):
  315. text = str(message)
  316. category = message.__class__
  317. else:
  318. text = message
  319. message = category(message)
  320. key = (text, category, lineno)
  321. # Quick test for common case
  322. if registry.get(key):
  323. return
  324. # Search the filters
  325. for item in filters:
  326. action, msg, cat, mod, ln = item
  327. if ((msg is None or msg.match(text)) and
  328. issubclass(category, cat) and
  329. (mod is None or mod.match(module)) and
  330. (ln == 0 or lineno == ln)):
  331. break
  332. else:
  333. action = defaultaction
  334. # Early exit actions
  335. if action == "ignore":
  336. return
  337. # Prime the linecache for formatting, in case the
  338. # "file" is actually in a zipfile or something.
  339. import linecache
  340. linecache.getlines(filename, module_globals)
  341. if action == "error":
  342. raise message
  343. # Other actions
  344. if action == "once":
  345. registry[key] = 1
  346. oncekey = (text, category)
  347. if onceregistry.get(oncekey):
  348. return
  349. onceregistry[oncekey] = 1
  350. elif action == "always":
  351. pass
  352. elif action == "module":
  353. registry[key] = 1
  354. altkey = (text, category, 0)
  355. if registry.get(altkey):
  356. return
  357. registry[altkey] = 1
  358. elif action == "default":
  359. registry[key] = 1
  360. else:
  361. # Unrecognized actions are errors
  362. raise RuntimeError(
  363. "Unrecognized action (%r) in warnings.filters:\n %s" %
  364. (action, item))
  365. # Print message and context
  366. msg = WarningMessage(message, category, filename, lineno, source)
  367. _showwarnmsg(msg)
  368. class WarningMessage(object):
  369. _WARNING_DETAILS = ("message", "category", "filename", "lineno", "file",
  370. "line", "source")
  371. def __init__(self, message, category, filename, lineno, file=None,
  372. line=None, source=None):
  373. self.message = message
  374. self.category = category
  375. self.filename = filename
  376. self.lineno = lineno
  377. self.file = file
  378. self.line = line
  379. self.source = source
  380. self._category_name = category.__name__ if category else None
  381. def __str__(self):
  382. return ("{message : %r, category : %r, filename : %r, lineno : %s, "
  383. "line : %r}" % (self.message, self._category_name,
  384. self.filename, self.lineno, self.line))
  385. class catch_warnings(object):
  386. """A context manager that copies and restores the warnings filter upon
  387. exiting the context.
  388. The 'record' argument specifies whether warnings should be captured by a
  389. custom implementation of warnings.showwarning() and be appended to a list
  390. returned by the context manager. Otherwise None is returned by the context
  391. manager. The objects appended to the list are arguments whose attributes
  392. mirror the arguments to showwarning().
  393. The 'module' argument is to specify an alternative module to the module
  394. named 'warnings' and imported under that name. This argument is only useful
  395. when testing the warnings module itself.
  396. If the 'action' argument is not None, the remaining arguments are passed
  397. to warnings.simplefilter() as if it were called immediately on entering the
  398. context.
  399. """
  400. def __init__(self, *, record=False, module=None,
  401. action=None, category=Warning, lineno=0, append=False):
  402. """Specify whether to record warnings and if an alternative module
  403. should be used other than sys.modules['warnings'].
  404. For compatibility with Python 3.0, please consider all arguments to be
  405. keyword-only.
  406. """
  407. self._record = record
  408. self._module = sys.modules['warnings'] if module is None else module
  409. self._entered = False
  410. if action is None:
  411. self._filter = None
  412. else:
  413. self._filter = (action, category, lineno, append)
  414. def __repr__(self):
  415. args = []
  416. if self._record:
  417. args.append("record=True")
  418. if self._module is not sys.modules['warnings']:
  419. args.append("module=%r" % self._module)
  420. name = type(self).__name__
  421. return "%s(%s)" % (name, ", ".join(args))
  422. def __enter__(self):
  423. if self._entered:
  424. raise RuntimeError("Cannot enter %r twice" % self)
  425. self._entered = True
  426. self._filters = self._module.filters
  427. self._module.filters = self._filters[:]
  428. self._module._filters_mutated()
  429. self._showwarning = self._module.showwarning
  430. self._showwarnmsg_impl = self._module._showwarnmsg_impl
  431. if self._filter is not None:
  432. simplefilter(*self._filter)
  433. if self._record:
  434. log = []
  435. self._module._showwarnmsg_impl = log.append
  436. # Reset showwarning() to the default implementation to make sure
  437. # that _showwarnmsg() calls _showwarnmsg_impl()
  438. self._module.showwarning = self._module._showwarning_orig
  439. return log
  440. else:
  441. return None
  442. def __exit__(self, *exc_info):
  443. if not self._entered:
  444. raise RuntimeError("Cannot exit %r without entering first" % self)
  445. self._module.filters = self._filters
  446. self._module._filters_mutated()
  447. self._module.showwarning = self._showwarning
  448. self._module._showwarnmsg_impl = self._showwarnmsg_impl
  449. _DEPRECATED_MSG = "{name!r} is deprecated and slated for removal in Python {remove}"
  450. def _deprecated(name, message=_DEPRECATED_MSG, *, remove, _version=sys.version_info):
  451. """Warn that *name* is deprecated or should be removed.
  452. RuntimeError is raised if *remove* specifies a major/minor tuple older than
  453. the current Python version or the same version but past the alpha.
  454. The *message* argument is formatted with *name* and *remove* as a Python
  455. version (e.g. "3.11").
  456. """
  457. remove_formatted = f"{remove[0]}.{remove[1]}"
  458. if (_version[:2] > remove) or (_version[:2] == remove and _version[3] != "alpha"):
  459. msg = f"{name!r} was slated for removal after Python {remove_formatted} alpha"
  460. raise RuntimeError(msg)
  461. else:
  462. msg = message.format(name=name, remove=remove_formatted)
  463. warn(msg, DeprecationWarning, stacklevel=3)
  464. # Private utility function called by _PyErr_WarnUnawaitedCoroutine
  465. def _warn_unawaited_coroutine(coro):
  466. msg_lines = [
  467. f"coroutine '{coro.__qualname__}' was never awaited\n"
  468. ]
  469. if coro.cr_origin is not None:
  470. import linecache, traceback
  471. def extract():
  472. for filename, lineno, funcname in reversed(coro.cr_origin):
  473. line = linecache.getline(filename, lineno)
  474. yield (filename, lineno, funcname, line)
  475. msg_lines.append("Coroutine created at (most recent call last)\n")
  476. msg_lines += traceback.format_list(list(extract()))
  477. msg = "".join(msg_lines).rstrip("\n")
  478. # Passing source= here means that if the user happens to have tracemalloc
  479. # enabled and tracking where the coroutine was created, the warning will
  480. # contain that traceback. This does mean that if they have *both*
  481. # coroutine origin tracking *and* tracemalloc enabled, they'll get two
  482. # partially-redundant tracebacks. If we wanted to be clever we could
  483. # probably detect this case and avoid it, but for now we don't bother.
  484. warn(msg, category=RuntimeWarning, stacklevel=2, source=coro)
  485. # filters contains a sequence of filter 5-tuples
  486. # The components of the 5-tuple are:
  487. # - an action: error, ignore, always, default, module, or once
  488. # - a compiled regex that must match the warning message
  489. # - a class representing the warning category
  490. # - a compiled regex that must match the module that is being warned
  491. # - a line number for the line being warning, or 0 to mean any line
  492. # If either if the compiled regexs are None, match anything.
  493. try:
  494. from _warnings import (filters, _defaultaction, _onceregistry,
  495. warn, warn_explicit, _filters_mutated)
  496. defaultaction = _defaultaction
  497. onceregistry = _onceregistry
  498. _warnings_defaults = True
  499. except ImportError:
  500. filters = []
  501. defaultaction = "default"
  502. onceregistry = {}
  503. _filters_version = 1
  504. def _filters_mutated():
  505. global _filters_version
  506. _filters_version += 1
  507. _warnings_defaults = False
  508. # Module initialization
  509. _processoptions(sys.warnoptions)
  510. if not _warnings_defaults:
  511. # Several warning categories are ignored by default in regular builds
  512. if not hasattr(sys, 'gettotalrefcount'):
  513. filterwarnings("default", category=DeprecationWarning,
  514. module="__main__", append=1)
  515. simplefilter("ignore", category=DeprecationWarning, append=1)
  516. simplefilter("ignore", category=PendingDeprecationWarning, append=1)
  517. simplefilter("ignore", category=ImportWarning, append=1)
  518. simplefilter("ignore", category=ResourceWarning, append=1)
  519. del _warnings_defaults