123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 |
- import datetime
- import functools
- import logging
- import os
- import re
- from collections import namedtuple
- from typing import Any, Callable, Dict, Iterable, List, Tuple # noqa
- from .abc import AbstractAccessLogger
- from .web_request import BaseRequest
- from .web_response import StreamResponse
- KeyMethod = namedtuple("KeyMethod", "key method")
- class AccessLogger(AbstractAccessLogger):
- """Helper object to log access.
- Usage:
- log = logging.getLogger("spam")
- log_format = "%a %{User-Agent}i"
- access_logger = AccessLogger(log, log_format)
- access_logger.log(request, response, time)
- Format:
- %% The percent sign
- %a Remote IP-address (IP-address of proxy if using reverse proxy)
- %t Time when the request was started to process
- %P The process ID of the child that serviced the request
- %r First line of request
- %s Response status code
- %b Size of response in bytes, including HTTP headers
- %T Time taken to serve the request, in seconds
- %Tf Time taken to serve the request, in seconds with floating fraction
- in .06f format
- %D Time taken to serve the request, in microseconds
- %{FOO}i request.headers['FOO']
- %{FOO}o response.headers['FOO']
- %{FOO}e os.environ['FOO']
- """
- LOG_FORMAT_MAP = {
- "a": "remote_address",
- "t": "request_start_time",
- "P": "process_id",
- "r": "first_request_line",
- "s": "response_status",
- "b": "response_size",
- "T": "request_time",
- "Tf": "request_time_frac",
- "D": "request_time_micro",
- "i": "request_header",
- "o": "response_header",
- }
- LOG_FORMAT = '%a %t "%r" %s %b "%{Referer}i" "%{User-Agent}i"'
- FORMAT_RE = re.compile(r"%(\{([A-Za-z0-9\-_]+)\}([ioe])|[atPrsbOD]|Tf?)")
- CLEANUP_RE = re.compile(r"(%[^s])")
- _FORMAT_CACHE = {} # type: Dict[str, Tuple[str, List[KeyMethod]]]
- def __init__(self, logger: logging.Logger, log_format: str = LOG_FORMAT) -> None:
- """Initialise the logger.
- logger is a logger object to be used for logging.
- log_format is a string with apache compatible log format description.
- """
- super().__init__(logger, log_format=log_format)
- _compiled_format = AccessLogger._FORMAT_CACHE.get(log_format)
- if not _compiled_format:
- _compiled_format = self.compile_format(log_format)
- AccessLogger._FORMAT_CACHE[log_format] = _compiled_format
- self._log_format, self._methods = _compiled_format
- def compile_format(self, log_format: str) -> Tuple[str, List[KeyMethod]]:
- """Translate log_format into form usable by modulo formatting
- All known atoms will be replaced with %s
- Also methods for formatting of those atoms will be added to
- _methods in appropriate order
- For example we have log_format = "%a %t"
- This format will be translated to "%s %s"
- Also contents of _methods will be
- [self._format_a, self._format_t]
- These method will be called and results will be passed
- to translated string format.
- Each _format_* method receive 'args' which is list of arguments
- given to self.log
- Exceptions are _format_e, _format_i and _format_o methods which
- also receive key name (by functools.partial)
- """
- # list of (key, method) tuples, we don't use an OrderedDict as users
- # can repeat the same key more than once
- methods = list()
- for atom in self.FORMAT_RE.findall(log_format):
- if atom[1] == "":
- format_key1 = self.LOG_FORMAT_MAP[atom[0]]
- m = getattr(AccessLogger, "_format_%s" % atom[0])
- key_method = KeyMethod(format_key1, m)
- else:
- format_key2 = (self.LOG_FORMAT_MAP[atom[2]], atom[1])
- m = getattr(AccessLogger, "_format_%s" % atom[2])
- key_method = KeyMethod(format_key2, functools.partial(m, atom[1]))
- methods.append(key_method)
- log_format = self.FORMAT_RE.sub(r"%s", log_format)
- log_format = self.CLEANUP_RE.sub(r"%\1", log_format)
- return log_format, methods
- @staticmethod
- def _format_i(
- key: str, request: BaseRequest, response: StreamResponse, time: float
- ) -> str:
- if request is None:
- return "(no headers)"
- # suboptimal, make istr(key) once
- return request.headers.get(key, "-")
- @staticmethod
- def _format_o(
- key: str, request: BaseRequest, response: StreamResponse, time: float
- ) -> str:
- # suboptimal, make istr(key) once
- return response.headers.get(key, "-")
- @staticmethod
- def _format_a(request: BaseRequest, response: StreamResponse, time: float) -> str:
- if request is None:
- return "-"
- ip = request.remote
- return ip if ip is not None else "-"
- @staticmethod
- def _format_t(request: BaseRequest, response: StreamResponse, time: float) -> str:
- now = datetime.datetime.utcnow()
- start_time = now - datetime.timedelta(seconds=time)
- return start_time.strftime("[%d/%b/%Y:%H:%M:%S +0000]")
- @staticmethod
- def _format_P(request: BaseRequest, response: StreamResponse, time: float) -> str:
- return "<%s>" % os.getpid()
- @staticmethod
- def _format_r(request: BaseRequest, response: StreamResponse, time: float) -> str:
- if request is None:
- return "-"
- return "{} {} HTTP/{}.{}".format(
- request.method,
- request.path_qs,
- request.version.major,
- request.version.minor,
- )
- @staticmethod
- def _format_s(request: BaseRequest, response: StreamResponse, time: float) -> int:
- return response.status
- @staticmethod
- def _format_b(request: BaseRequest, response: StreamResponse, time: float) -> int:
- return response.body_length
- @staticmethod
- def _format_T(request: BaseRequest, response: StreamResponse, time: float) -> str:
- return str(round(time))
- @staticmethod
- def _format_Tf(request: BaseRequest, response: StreamResponse, time: float) -> str:
- return "%06f" % time
- @staticmethod
- def _format_D(request: BaseRequest, response: StreamResponse, time: float) -> str:
- return str(round(time * 1000000))
- def _format_line(
- self, request: BaseRequest, response: StreamResponse, time: float
- ) -> Iterable[Tuple[str, Callable[[BaseRequest, StreamResponse, float], str]]]:
- return [(key, method(request, response, time)) for key, method in self._methods]
- def log(self, request: BaseRequest, response: StreamResponse, time: float) -> None:
- try:
- fmt_info = self._format_line(request, response, time)
- values = list()
- extra = dict()
- for key, value in fmt_info:
- values.append(value)
- if key.__class__ is str:
- extra[key] = value
- else:
- k1, k2 = key # type: ignore
- dct = extra.get(k1, {}) # type: ignore
- dct[k2] = value # type: ignore
- extra[k1] = dct # type: ignore
- self.logger.info(self._log_format % tuple(values), extra=extra)
- except Exception:
- self.logger.exception("Error in logging")
|