format_helpers.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. import functools
  2. import inspect
  3. import reprlib
  4. import sys
  5. import traceback
  6. from . import constants
  7. def _get_function_source(func):
  8. func = inspect.unwrap(func)
  9. if inspect.isfunction(func):
  10. code = func.__code__
  11. return (code.co_filename, code.co_firstlineno)
  12. if isinstance(func, functools.partial):
  13. return _get_function_source(func.func)
  14. if isinstance(func, functools.partialmethod):
  15. return _get_function_source(func.func)
  16. return None
  17. def _format_callback_source(func, args):
  18. func_repr = _format_callback(func, args, None)
  19. source = _get_function_source(func)
  20. if source:
  21. func_repr += f' at {source[0]}:{source[1]}'
  22. return func_repr
  23. def _format_args_and_kwargs(args, kwargs):
  24. """Format function arguments and keyword arguments.
  25. Special case for a single parameter: ('hello',) is formatted as ('hello').
  26. """
  27. # use reprlib to limit the length of the output
  28. items = []
  29. if args:
  30. items.extend(reprlib.repr(arg) for arg in args)
  31. if kwargs:
  32. items.extend(f'{k}={reprlib.repr(v)}' for k, v in kwargs.items())
  33. return '({})'.format(', '.join(items))
  34. def _format_callback(func, args, kwargs, suffix=''):
  35. if isinstance(func, functools.partial):
  36. suffix = _format_args_and_kwargs(args, kwargs) + suffix
  37. return _format_callback(func.func, func.args, func.keywords, suffix)
  38. if hasattr(func, '__qualname__') and func.__qualname__:
  39. func_repr = func.__qualname__
  40. elif hasattr(func, '__name__') and func.__name__:
  41. func_repr = func.__name__
  42. else:
  43. func_repr = repr(func)
  44. func_repr += _format_args_and_kwargs(args, kwargs)
  45. if suffix:
  46. func_repr += suffix
  47. return func_repr
  48. def extract_stack(f=None, limit=None):
  49. """Replacement for traceback.extract_stack() that only does the
  50. necessary work for asyncio debug mode.
  51. """
  52. if f is None:
  53. f = sys._getframe().f_back
  54. if limit is None:
  55. # Limit the amount of work to a reasonable amount, as extract_stack()
  56. # can be called for each coroutine and future in debug mode.
  57. limit = constants.DEBUG_STACK_DEPTH
  58. stack = traceback.StackSummary.extract(traceback.walk_stack(f),
  59. limit=limit,
  60. lookup_lines=False)
  61. stack.reverse()
  62. return stack