warnings.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549
  1. """Python part of the warnings subsystem."""
  2. import sys
  3. __all__ = ["warn", "warn_explicit", "showwarning",
  4. "formatwarning", "filterwarnings", "simplefilter",
  5. "resetwarnings", "catch_warnings"]
  6. def showwarning(message, category, filename, lineno, file=None, line=None):
  7. """Hook to write a warning to a file; replace if you like."""
  8. msg = WarningMessage(message, category, filename, lineno, file, line)
  9. _showwarnmsg_impl(msg)
  10. def formatwarning(message, category, filename, lineno, line=None):
  11. """Function to format a warning the standard way."""
  12. msg = WarningMessage(message, category, filename, lineno, None, line)
  13. return _formatwarnmsg_impl(msg)
  14. def _showwarnmsg_impl(msg):
  15. file = msg.file
  16. if file is None:
  17. file = sys.stderr
  18. if file is None:
  19. # sys.stderr is None when run with pythonw.exe:
  20. # warnings get lost
  21. return
  22. text = _formatwarnmsg(msg)
  23. try:
  24. file.write(text)
  25. except OSError:
  26. # the file (probably stderr) is invalid - this warning gets lost.
  27. pass
  28. def _formatwarnmsg_impl(msg):
  29. category = msg.category.__name__
  30. s = f"{msg.filename}:{msg.lineno}: {category}: {msg.message}\n"
  31. if msg.line is None:
  32. try:
  33. import linecache
  34. line = linecache.getline(msg.filename, msg.lineno)
  35. except Exception:
  36. # When a warning is logged during Python shutdown, linecache
  37. # and the import machinery don't work anymore
  38. line = None
  39. linecache = None
  40. else:
  41. line = msg.line
  42. if line:
  43. line = line.strip()
  44. s += " %s\n" % line
  45. if msg.source is not None:
  46. try:
  47. import tracemalloc
  48. # Logging a warning should not raise a new exception:
  49. # catch Exception, not only ImportError and RecursionError.
  50. except Exception:
  51. # don't suggest to enable tracemalloc if it's not available
  52. tracing = True
  53. tb = None
  54. else:
  55. tracing = tracemalloc.is_tracing()
  56. try:
  57. tb = tracemalloc.get_object_traceback(msg.source)
  58. except Exception:
  59. # When a warning is logged during Python shutdown, tracemalloc
  60. # and the import machinery don't work anymore
  61. tb = None
  62. if tb is not None:
  63. s += 'Object allocated at (most recent call last):\n'
  64. for frame in tb:
  65. s += (' File "%s", lineno %s\n'
  66. % (frame.filename, frame.lineno))
  67. try:
  68. if linecache is not None:
  69. line = linecache.getline(frame.filename, frame.lineno)
  70. else:
  71. line = None
  72. except Exception:
  73. line = None
  74. if line:
  75. line = line.strip()
  76. s += ' %s\n' % line
  77. elif not tracing:
  78. s += (f'{category}: Enable tracemalloc to get the object '
  79. f'allocation traceback\n')
  80. return s
  81. # Keep a reference to check if the function was replaced
  82. _showwarning_orig = showwarning
  83. def _showwarnmsg(msg):
  84. """Hook to write a warning to a file; replace if you like."""
  85. try:
  86. sw = showwarning
  87. except NameError:
  88. pass
  89. else:
  90. if sw is not _showwarning_orig:
  91. # warnings.showwarning() was replaced
  92. if not callable(sw):
  93. raise TypeError("warnings.showwarning() must be set to a "
  94. "function or method")
  95. sw(msg.message, msg.category, msg.filename, msg.lineno,
  96. msg.file, msg.line)
  97. return
  98. _showwarnmsg_impl(msg)
  99. # Keep a reference to check if the function was replaced
  100. _formatwarning_orig = formatwarning
  101. def _formatwarnmsg(msg):
  102. """Function to format a warning the standard way."""
  103. try:
  104. fw = formatwarning
  105. except NameError:
  106. pass
  107. else:
  108. if fw is not _formatwarning_orig:
  109. # warnings.formatwarning() was replaced
  110. return fw(msg.message, msg.category,
  111. msg.filename, msg.lineno, msg.line)
  112. return _formatwarnmsg_impl(msg)
  113. def filterwarnings(action, message="", category=Warning, module="", lineno=0,
  114. append=False):
  115. """Insert an entry into the list of warnings filters (at the front).
  116. 'action' -- one of "error", "ignore", "always", "default", "module",
  117. or "once"
  118. 'message' -- a regex that the warning message must match
  119. 'category' -- a class that the warning must be a subclass of
  120. 'module' -- a regex that the module name must match
  121. 'lineno' -- an integer line number, 0 matches all warnings
  122. 'append' -- if true, append to the list of filters
  123. """
  124. assert action in ("error", "ignore", "always", "default", "module",
  125. "once"), "invalid action: %r" % (action,)
  126. assert isinstance(message, str), "message must be a string"
  127. assert isinstance(category, type), "category must be a class"
  128. assert issubclass(category, Warning), "category must be a Warning subclass"
  129. assert isinstance(module, str), "module must be a string"
  130. assert isinstance(lineno, int) and lineno >= 0, \
  131. "lineno must be an int >= 0"
  132. if message or module:
  133. import re
  134. if message:
  135. message = re.compile(message, re.I)
  136. else:
  137. message = None
  138. if module:
  139. module = re.compile(module)
  140. else:
  141. module = None
  142. _add_filter(action, message, category, module, lineno, append=append)
  143. def simplefilter(action, category=Warning, lineno=0, append=False):
  144. """Insert a simple entry into the list of warnings filters (at the front).
  145. A simple filter matches all modules and messages.
  146. 'action' -- one of "error", "ignore", "always", "default", "module",
  147. or "once"
  148. 'category' -- a class that the warning must be a subclass of
  149. 'lineno' -- an integer line number, 0 matches all warnings
  150. 'append' -- if true, append to the list of filters
  151. """
  152. assert action in ("error", "ignore", "always", "default", "module",
  153. "once"), "invalid action: %r" % (action,)
  154. assert isinstance(lineno, int) and lineno >= 0, \
  155. "lineno must be an int >= 0"
  156. _add_filter(action, None, category, None, lineno, append=append)
  157. def _add_filter(*item, append):
  158. # Remove possible duplicate filters, so new one will be placed
  159. # in correct place. If append=True and duplicate exists, do nothing.
  160. if not append:
  161. try:
  162. filters.remove(item)
  163. except ValueError:
  164. pass
  165. filters.insert(0, item)
  166. else:
  167. if item not in filters:
  168. filters.append(item)
  169. _filters_mutated()
  170. def resetwarnings():
  171. """Clear the list of warning filters, so that no filters are active."""
  172. filters[:] = []
  173. _filters_mutated()
  174. class _OptionError(Exception):
  175. """Exception used by option processing helpers."""
  176. pass
  177. # Helper to process -W options passed via sys.warnoptions
  178. def _processoptions(args):
  179. for arg in args:
  180. try:
  181. _setoption(arg)
  182. except _OptionError as msg:
  183. print("Invalid -W option ignored:", msg, file=sys.stderr)
  184. # Helper for _processoptions()
  185. def _setoption(arg):
  186. parts = arg.split(':')
  187. if len(parts) > 5:
  188. raise _OptionError("too many fields (max 5): %r" % (arg,))
  189. while len(parts) < 5:
  190. parts.append('')
  191. action, message, category, module, lineno = [s.strip()
  192. for s in parts]
  193. action = _getaction(action)
  194. category = _getcategory(category)
  195. if message or module:
  196. import re
  197. if message:
  198. message = re.escape(message)
  199. if module:
  200. module = re.escape(module) + r'\Z'
  201. if lineno:
  202. try:
  203. lineno = int(lineno)
  204. if lineno < 0:
  205. raise ValueError
  206. except (ValueError, OverflowError):
  207. raise _OptionError("invalid lineno %r" % (lineno,)) from None
  208. else:
  209. lineno = 0
  210. filterwarnings(action, message, category, module, lineno)
  211. # Helper for _setoption()
  212. def _getaction(action):
  213. if not action:
  214. return "default"
  215. if action == "all": return "always" # Alias
  216. for a in ('default', 'always', 'ignore', 'module', 'once', 'error'):
  217. if a.startswith(action):
  218. return a
  219. raise _OptionError("invalid action: %r" % (action,))
  220. # Helper for _setoption()
  221. def _getcategory(category):
  222. if not category:
  223. return Warning
  224. if '.' not in category:
  225. import builtins as m
  226. klass = category
  227. else:
  228. module, _, klass = category.rpartition('.')
  229. try:
  230. m = __import__(module, None, None, [klass])
  231. except ImportError:
  232. raise _OptionError("invalid module name: %r" % (module,)) from None
  233. try:
  234. cat = getattr(m, klass)
  235. except AttributeError:
  236. raise _OptionError("unknown warning category: %r" % (category,)) from None
  237. if not issubclass(cat, Warning):
  238. raise _OptionError("invalid warning category: %r" % (category,))
  239. return cat
  240. def _is_internal_frame(frame):
  241. """Signal whether the frame is an internal CPython implementation detail."""
  242. filename = frame.f_code.co_filename
  243. return 'importlib' in filename and '_bootstrap' in filename
  244. def _next_external_frame(frame):
  245. """Find the next frame that doesn't involve CPython internals."""
  246. frame = frame.f_back
  247. while frame is not None and _is_internal_frame(frame):
  248. frame = frame.f_back
  249. return frame
  250. # Code typically replaced by _warnings
  251. def warn(message, category=None, stacklevel=1, source=None):
  252. """Issue a warning, or maybe ignore it or raise an exception."""
  253. # Check if message is already a Warning object
  254. if isinstance(message, Warning):
  255. category = message.__class__
  256. # Check category argument
  257. if category is None:
  258. category = UserWarning
  259. if not (isinstance(category, type) and issubclass(category, Warning)):
  260. raise TypeError("category must be a Warning subclass, "
  261. "not '{:s}'".format(type(category).__name__))
  262. # Get context information
  263. try:
  264. if stacklevel <= 1 or _is_internal_frame(sys._getframe(1)):
  265. # If frame is too small to care or if the warning originated in
  266. # internal code, then do not try to hide any frames.
  267. frame = sys._getframe(stacklevel)
  268. else:
  269. frame = sys._getframe(1)
  270. # Look for one frame less since the above line starts us off.
  271. for x in range(stacklevel-1):
  272. frame = _next_external_frame(frame)
  273. if frame is None:
  274. raise ValueError
  275. except ValueError:
  276. globals = sys.__dict__
  277. filename = "sys"
  278. lineno = 1
  279. else:
  280. globals = frame.f_globals
  281. filename = frame.f_code.co_filename
  282. lineno = frame.f_lineno
  283. if '__name__' in globals:
  284. module = globals['__name__']
  285. else:
  286. module = "<string>"
  287. registry = globals.setdefault("__warningregistry__", {})
  288. warn_explicit(message, category, filename, lineno, module, registry,
  289. globals, source)
  290. def warn_explicit(message, category, filename, lineno,
  291. module=None, registry=None, module_globals=None,
  292. source=None):
  293. lineno = int(lineno)
  294. if module is None:
  295. module = filename or "<unknown>"
  296. if module[-3:].lower() == ".py":
  297. module = module[:-3] # XXX What about leading pathname?
  298. if registry is None:
  299. registry = {}
  300. if registry.get('version', 0) != _filters_version:
  301. registry.clear()
  302. registry['version'] = _filters_version
  303. if isinstance(message, Warning):
  304. text = str(message)
  305. category = message.__class__
  306. else:
  307. text = message
  308. message = category(message)
  309. key = (text, category, lineno)
  310. # Quick test for common case
  311. if registry.get(key):
  312. return
  313. # Search the filters
  314. for item in filters:
  315. action, msg, cat, mod, ln = item
  316. if ((msg is None or msg.match(text)) and
  317. issubclass(category, cat) and
  318. (mod is None or mod.match(module)) and
  319. (ln == 0 or lineno == ln)):
  320. break
  321. else:
  322. action = defaultaction
  323. # Early exit actions
  324. if action == "ignore":
  325. return
  326. # Prime the linecache for formatting, in case the
  327. # "file" is actually in a zipfile or something.
  328. import linecache
  329. linecache.getlines(filename, module_globals)
  330. if action == "error":
  331. raise message
  332. # Other actions
  333. if action == "once":
  334. registry[key] = 1
  335. oncekey = (text, category)
  336. if onceregistry.get(oncekey):
  337. return
  338. onceregistry[oncekey] = 1
  339. elif action == "always":
  340. pass
  341. elif action == "module":
  342. registry[key] = 1
  343. altkey = (text, category, 0)
  344. if registry.get(altkey):
  345. return
  346. registry[altkey] = 1
  347. elif action == "default":
  348. registry[key] = 1
  349. else:
  350. # Unrecognized actions are errors
  351. raise RuntimeError(
  352. "Unrecognized action (%r) in warnings.filters:\n %s" %
  353. (action, item))
  354. # Print message and context
  355. msg = WarningMessage(message, category, filename, lineno, source)
  356. _showwarnmsg(msg)
  357. class WarningMessage(object):
  358. _WARNING_DETAILS = ("message", "category", "filename", "lineno", "file",
  359. "line", "source")
  360. def __init__(self, message, category, filename, lineno, file=None,
  361. line=None, source=None):
  362. self.message = message
  363. self.category = category
  364. self.filename = filename
  365. self.lineno = lineno
  366. self.file = file
  367. self.line = line
  368. self.source = source
  369. self._category_name = category.__name__ if category else None
  370. def __str__(self):
  371. return ("{message : %r, category : %r, filename : %r, lineno : %s, "
  372. "line : %r}" % (self.message, self._category_name,
  373. self.filename, self.lineno, self.line))
  374. class catch_warnings(object):
  375. """A context manager that copies and restores the warnings filter upon
  376. exiting the context.
  377. The 'record' argument specifies whether warnings should be captured by a
  378. custom implementation of warnings.showwarning() and be appended to a list
  379. returned by the context manager. Otherwise None is returned by the context
  380. manager. The objects appended to the list are arguments whose attributes
  381. mirror the arguments to showwarning().
  382. The 'module' argument is to specify an alternative module to the module
  383. named 'warnings' and imported under that name. This argument is only useful
  384. when testing the warnings module itself.
  385. """
  386. def __init__(self, *, record=False, module=None):
  387. """Specify whether to record warnings and if an alternative module
  388. should be used other than sys.modules['warnings'].
  389. For compatibility with Python 3.0, please consider all arguments to be
  390. keyword-only.
  391. """
  392. self._record = record
  393. self._module = sys.modules['warnings'] if module is None else module
  394. self._entered = False
  395. def __repr__(self):
  396. args = []
  397. if self._record:
  398. args.append("record=True")
  399. if self._module is not sys.modules['warnings']:
  400. args.append("module=%r" % self._module)
  401. name = type(self).__name__
  402. return "%s(%s)" % (name, ", ".join(args))
  403. def __enter__(self):
  404. if self._entered:
  405. raise RuntimeError("Cannot enter %r twice" % self)
  406. self._entered = True
  407. self._filters = self._module.filters
  408. self._module.filters = self._filters[:]
  409. self._module._filters_mutated()
  410. self._showwarning = self._module.showwarning
  411. self._showwarnmsg_impl = self._module._showwarnmsg_impl
  412. if self._record:
  413. log = []
  414. self._module._showwarnmsg_impl = log.append
  415. # Reset showwarning() to the default implementation to make sure
  416. # that _showwarnmsg() calls _showwarnmsg_impl()
  417. self._module.showwarning = self._module._showwarning_orig
  418. return log
  419. else:
  420. return None
  421. def __exit__(self, *exc_info):
  422. if not self._entered:
  423. raise RuntimeError("Cannot exit %r without entering first" % self)
  424. self._module.filters = self._filters
  425. self._module._filters_mutated()
  426. self._module.showwarning = self._showwarning
  427. self._module._showwarnmsg_impl = self._showwarnmsg_impl
  428. # Private utility function called by _PyErr_WarnUnawaitedCoroutine
  429. def _warn_unawaited_coroutine(coro):
  430. msg_lines = [
  431. f"coroutine '{coro.__qualname__}' was never awaited\n"
  432. ]
  433. if coro.cr_origin is not None:
  434. import linecache, traceback
  435. def extract():
  436. for filename, lineno, funcname in reversed(coro.cr_origin):
  437. line = linecache.getline(filename, lineno)
  438. yield (filename, lineno, funcname, line)
  439. msg_lines.append("Coroutine created at (most recent call last)\n")
  440. msg_lines += traceback.format_list(list(extract()))
  441. msg = "".join(msg_lines).rstrip("\n")
  442. # Passing source= here means that if the user happens to have tracemalloc
  443. # enabled and tracking where the coroutine was created, the warning will
  444. # contain that traceback. This does mean that if they have *both*
  445. # coroutine origin tracking *and* tracemalloc enabled, they'll get two
  446. # partially-redundant tracebacks. If we wanted to be clever we could
  447. # probably detect this case and avoid it, but for now we don't bother.
  448. warn(msg, category=RuntimeWarning, stacklevel=2, source=coro)
  449. # filters contains a sequence of filter 5-tuples
  450. # The components of the 5-tuple are:
  451. # - an action: error, ignore, always, default, module, or once
  452. # - a compiled regex that must match the warning message
  453. # - a class representing the warning category
  454. # - a compiled regex that must match the module that is being warned
  455. # - a line number for the line being warning, or 0 to mean any line
  456. # If either if the compiled regexs are None, match anything.
  457. try:
  458. from _warnings import (filters, _defaultaction, _onceregistry,
  459. warn, warn_explicit, _filters_mutated)
  460. defaultaction = _defaultaction
  461. onceregistry = _onceregistry
  462. _warnings_defaults = True
  463. except ImportError:
  464. filters = []
  465. defaultaction = "default"
  466. onceregistry = {}
  467. _filters_version = 1
  468. def _filters_mutated():
  469. global _filters_version
  470. _filters_version += 1
  471. _warnings_defaults = False
  472. # Module initialization
  473. _processoptions(sys.warnoptions)
  474. if not _warnings_defaults:
  475. # Several warning categories are ignored by default in regular builds
  476. if not hasattr(sys, 'gettotalrefcount'):
  477. filterwarnings("default", category=DeprecationWarning,
  478. module="__main__", append=1)
  479. simplefilter("ignore", category=DeprecationWarning, append=1)
  480. simplefilter("ignore", category=PendingDeprecationWarning, append=1)
  481. simplefilter("ignore", category=ImportWarning, append=1)
  482. simplefilter("ignore", category=ResourceWarning, append=1)
  483. del _warnings_defaults