12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576 |
- import functools
- import inspect
- import reprlib
- import sys
- import traceback
- from . import constants
- def _get_function_source(func):
- func = inspect.unwrap(func)
- if inspect.isfunction(func):
- code = func.__code__
- return (code.co_filename, code.co_firstlineno)
- if isinstance(func, functools.partial):
- return _get_function_source(func.func)
- if isinstance(func, functools.partialmethod):
- return _get_function_source(func.func)
- return None
- def _format_callback_source(func, args):
- func_repr = _format_callback(func, args, None)
- source = _get_function_source(func)
- if source:
- func_repr += f' at {source[0]}:{source[1]}'
- return func_repr
- def _format_args_and_kwargs(args, kwargs):
- """Format function arguments and keyword arguments.
- Special case for a single parameter: ('hello',) is formatted as ('hello').
- """
- # use reprlib to limit the length of the output
- items = []
- if args:
- items.extend(reprlib.repr(arg) for arg in args)
- if kwargs:
- items.extend(f'{k}={reprlib.repr(v)}' for k, v in kwargs.items())
- return '({})'.format(', '.join(items))
- def _format_callback(func, args, kwargs, suffix=''):
- if isinstance(func, functools.partial):
- suffix = _format_args_and_kwargs(args, kwargs) + suffix
- return _format_callback(func.func, func.args, func.keywords, suffix)
- if hasattr(func, '__qualname__') and func.__qualname__:
- func_repr = func.__qualname__
- elif hasattr(func, '__name__') and func.__name__:
- func_repr = func.__name__
- else:
- func_repr = repr(func)
- func_repr += _format_args_and_kwargs(args, kwargs)
- if suffix:
- func_repr += suffix
- return func_repr
- def extract_stack(f=None, limit=None):
- """Replacement for traceback.extract_stack() that only does the
- necessary work for asyncio debug mode.
- """
- if f is None:
- f = sys._getframe().f_back
- if limit is None:
- # Limit the amount of work to a reasonable amount, as extract_stack()
- # can be called for each coroutine and future in debug mode.
- limit = constants.DEBUG_STACK_DEPTH
- stack = traceback.StackSummary.extract(traceback.walk_stack(f),
- limit=limit,
- lookup_lines=False)
- stack.reverse()
- return stack
|