123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275 |
- """HTTP Client for asyncio."""
- import asyncio
- import base64
- import hashlib
- import json
- import os
- import sys
- import traceback
- import warnings
- from types import SimpleNamespace, TracebackType
- from typing import (
- Any,
- Awaitable,
- Callable,
- Coroutine,
- FrozenSet,
- Generator,
- Generic,
- Iterable,
- List,
- Mapping,
- Optional,
- Set,
- Tuple,
- Type,
- TypeVar,
- Union,
- )
- import attr
- from multidict import CIMultiDict, MultiDict, MultiDictProxy, istr
- from yarl import URL
- from . import hdrs, http, payload
- from .abc import AbstractCookieJar
- from .client_exceptions import (
- ClientConnectionError as ClientConnectionError,
- ClientConnectorCertificateError as ClientConnectorCertificateError,
- ClientConnectorError as ClientConnectorError,
- ClientConnectorSSLError as ClientConnectorSSLError,
- ClientError as ClientError,
- ClientHttpProxyError as ClientHttpProxyError,
- ClientOSError as ClientOSError,
- ClientPayloadError as ClientPayloadError,
- ClientProxyConnectionError as ClientProxyConnectionError,
- ClientResponseError as ClientResponseError,
- ClientSSLError as ClientSSLError,
- ContentTypeError as ContentTypeError,
- InvalidURL as InvalidURL,
- ServerConnectionError as ServerConnectionError,
- ServerDisconnectedError as ServerDisconnectedError,
- ServerFingerprintMismatch as ServerFingerprintMismatch,
- ServerTimeoutError as ServerTimeoutError,
- TooManyRedirects as TooManyRedirects,
- WSServerHandshakeError as WSServerHandshakeError,
- )
- from .client_reqrep import (
- ClientRequest as ClientRequest,
- ClientResponse as ClientResponse,
- Fingerprint as Fingerprint,
- RequestInfo as RequestInfo,
- _merge_ssl_params,
- )
- from .client_ws import ClientWebSocketResponse as ClientWebSocketResponse
- from .connector import (
- BaseConnector as BaseConnector,
- NamedPipeConnector as NamedPipeConnector,
- TCPConnector as TCPConnector,
- UnixConnector as UnixConnector,
- )
- from .cookiejar import CookieJar
- from .helpers import (
- DEBUG,
- PY_36,
- BasicAuth,
- CeilTimeout,
- TimeoutHandle,
- get_running_loop,
- proxies_from_env,
- sentinel,
- strip_auth_from_url,
- )
- from .http import WS_KEY, HttpVersion, WebSocketReader, WebSocketWriter
- from .http_websocket import WSHandshakeError, WSMessage, ws_ext_gen, ws_ext_parse
- from .streams import FlowControlDataQueue
- from .tracing import Trace, TraceConfig
- from .typedefs import JSONEncoder, LooseCookies, LooseHeaders, StrOrURL
- __all__ = (
- # client_exceptions
- "ClientConnectionError",
- "ClientConnectorCertificateError",
- "ClientConnectorError",
- "ClientConnectorSSLError",
- "ClientError",
- "ClientHttpProxyError",
- "ClientOSError",
- "ClientPayloadError",
- "ClientProxyConnectionError",
- "ClientResponseError",
- "ClientSSLError",
- "ContentTypeError",
- "InvalidURL",
- "ServerConnectionError",
- "ServerDisconnectedError",
- "ServerFingerprintMismatch",
- "ServerTimeoutError",
- "TooManyRedirects",
- "WSServerHandshakeError",
- # client_reqrep
- "ClientRequest",
- "ClientResponse",
- "Fingerprint",
- "RequestInfo",
- # connector
- "BaseConnector",
- "TCPConnector",
- "UnixConnector",
- "NamedPipeConnector",
- # client_ws
- "ClientWebSocketResponse",
- # client
- "ClientSession",
- "ClientTimeout",
- "request",
- )
- try:
- from ssl import SSLContext
- except ImportError: # pragma: no cover
- SSLContext = object # type: ignore
- @attr.s(auto_attribs=True, frozen=True, slots=True)
- class ClientTimeout:
- total: Optional[float] = None
- connect: Optional[float] = None
- sock_read: Optional[float] = None
- sock_connect: Optional[float] = None
- # pool_queue_timeout: Optional[float] = None
- # dns_resolution_timeout: Optional[float] = None
- # socket_connect_timeout: Optional[float] = None
- # connection_acquiring_timeout: Optional[float] = None
- # new_connection_timeout: Optional[float] = None
- # http_header_timeout: Optional[float] = None
- # response_body_timeout: Optional[float] = None
- # to create a timeout specific for a single request, either
- # - create a completely new one to overwrite the default
- # - or use http://www.attrs.org/en/stable/api.html#attr.evolve
- # to overwrite the defaults
- # 5 Minute default read timeout
- DEFAULT_TIMEOUT = ClientTimeout(total=5 * 60)
- _RetType = TypeVar("_RetType")
- class ClientSession:
- """First-class interface for making HTTP requests."""
- ATTRS = frozenset(
- [
- "_source_traceback",
- "_connector",
- "requote_redirect_url",
- "_loop",
- "_cookie_jar",
- "_connector_owner",
- "_default_auth",
- "_version",
- "_json_serialize",
- "_requote_redirect_url",
- "_timeout",
- "_raise_for_status",
- "_auto_decompress",
- "_trust_env",
- "_default_headers",
- "_skip_auto_headers",
- "_request_class",
- "_response_class",
- "_ws_response_class",
- "_trace_configs",
- "_read_bufsize",
- ]
- )
- _source_traceback = None
- def __init__(
- self,
- *,
- connector: Optional[BaseConnector] = None,
- loop: Optional[asyncio.AbstractEventLoop] = None,
- cookies: Optional[LooseCookies] = None,
- headers: Optional[LooseHeaders] = None,
- skip_auto_headers: Optional[Iterable[str]] = None,
- auth: Optional[BasicAuth] = None,
- json_serialize: JSONEncoder = json.dumps,
- request_class: Type[ClientRequest] = ClientRequest,
- response_class: Type[ClientResponse] = ClientResponse,
- ws_response_class: Type[ClientWebSocketResponse] = ClientWebSocketResponse,
- version: HttpVersion = http.HttpVersion11,
- cookie_jar: Optional[AbstractCookieJar] = None,
- connector_owner: bool = True,
- raise_for_status: bool = False,
- read_timeout: Union[float, object] = sentinel,
- conn_timeout: Optional[float] = None,
- timeout: Union[object, ClientTimeout] = sentinel,
- auto_decompress: bool = True,
- trust_env: bool = False,
- requote_redirect_url: bool = True,
- trace_configs: Optional[List[TraceConfig]] = None,
- read_bufsize: int = 2 ** 16,
- ) -> None:
- if loop is None:
- if connector is not None:
- loop = connector._loop
- loop = get_running_loop(loop)
- if connector is None:
- connector = TCPConnector(loop=loop)
- if connector._loop is not loop:
- raise RuntimeError("Session and connector has to use same event loop")
- self._loop = loop
- if loop.get_debug():
- self._source_traceback = traceback.extract_stack(sys._getframe(1))
- if cookie_jar is None:
- cookie_jar = CookieJar(loop=loop)
- self._cookie_jar = cookie_jar
- if cookies is not None:
- self._cookie_jar.update_cookies(cookies)
- self._connector = connector # type: Optional[BaseConnector]
- self._connector_owner = connector_owner
- self._default_auth = auth
- self._version = version
- self._json_serialize = json_serialize
- if timeout is sentinel:
- self._timeout = DEFAULT_TIMEOUT
- if read_timeout is not sentinel:
- warnings.warn(
- "read_timeout is deprecated, " "use timeout argument instead",
- DeprecationWarning,
- stacklevel=2,
- )
- self._timeout = attr.evolve(self._timeout, total=read_timeout)
- if conn_timeout is not None:
- self._timeout = attr.evolve(self._timeout, connect=conn_timeout)
- warnings.warn(
- "conn_timeout is deprecated, " "use timeout argument instead",
- DeprecationWarning,
- stacklevel=2,
- )
- else:
- self._timeout = timeout # type: ignore
- if read_timeout is not sentinel:
- raise ValueError(
- "read_timeout and timeout parameters "
- "conflict, please setup "
- "timeout.read"
- )
- if conn_timeout is not None:
- raise ValueError(
- "conn_timeout and timeout parameters "
- "conflict, please setup "
- "timeout.connect"
- )
- self._raise_for_status = raise_for_status
- self._auto_decompress = auto_decompress
- self._trust_env = trust_env
- self._requote_redirect_url = requote_redirect_url
- self._read_bufsize = read_bufsize
- # Convert to list of tuples
- if headers:
- real_headers = CIMultiDict(headers) # type: CIMultiDict[str]
- else:
- real_headers = CIMultiDict()
- self._default_headers = real_headers # type: CIMultiDict[str]
- if skip_auto_headers is not None:
- self._skip_auto_headers = frozenset([istr(i) for i in skip_auto_headers])
- else:
- self._skip_auto_headers = frozenset()
- self._request_class = request_class
- self._response_class = response_class
- self._ws_response_class = ws_response_class
- self._trace_configs = trace_configs or []
- for trace_config in self._trace_configs:
- trace_config.freeze()
- def __init_subclass__(cls: Type["ClientSession"]) -> None:
- warnings.warn(
- "Inheritance class {} from ClientSession "
- "is discouraged".format(cls.__name__),
- DeprecationWarning,
- stacklevel=2,
- )
- if DEBUG:
- def __setattr__(self, name: str, val: Any) -> None:
- if name not in self.ATTRS:
- warnings.warn(
- "Setting custom ClientSession.{} attribute "
- "is discouraged".format(name),
- DeprecationWarning,
- stacklevel=2,
- )
- super().__setattr__(name, val)
- def __del__(self, _warnings: Any = warnings) -> None:
- if not self.closed:
- if PY_36:
- kwargs = {"source": self}
- else:
- kwargs = {}
- _warnings.warn(
- f"Unclosed client session {self!r}", ResourceWarning, **kwargs
- )
- context = {"client_session": self, "message": "Unclosed client session"}
- if self._source_traceback is not None:
- context["source_traceback"] = self._source_traceback
- self._loop.call_exception_handler(context)
- def request(
- self, method: str, url: StrOrURL, **kwargs: Any
- ) -> "_RequestContextManager":
- """Perform HTTP request."""
- return _RequestContextManager(self._request(method, url, **kwargs))
- async def _request(
- self,
- method: str,
- str_or_url: StrOrURL,
- *,
- params: Optional[Mapping[str, str]] = None,
- data: Any = None,
- json: Any = None,
- cookies: Optional[LooseCookies] = None,
- headers: Optional[LooseHeaders] = None,
- skip_auto_headers: Optional[Iterable[str]] = None,
- auth: Optional[BasicAuth] = None,
- allow_redirects: bool = True,
- max_redirects: int = 10,
- compress: Optional[str] = None,
- chunked: Optional[bool] = None,
- expect100: bool = False,
- raise_for_status: Optional[bool] = None,
- read_until_eof: bool = True,
- proxy: Optional[StrOrURL] = None,
- proxy_auth: Optional[BasicAuth] = None,
- timeout: Union[ClientTimeout, object] = sentinel,
- verify_ssl: Optional[bool] = None,
- fingerprint: Optional[bytes] = None,
- ssl_context: Optional[SSLContext] = None,
- ssl: Optional[Union[SSLContext, bool, Fingerprint]] = None,
- proxy_headers: Optional[LooseHeaders] = None,
- trace_request_ctx: Optional[SimpleNamespace] = None,
- read_bufsize: Optional[int] = None,
- ) -> ClientResponse:
- # NOTE: timeout clamps existing connect and read timeouts. We cannot
- # set the default to None because we need to detect if the user wants
- # to use the existing timeouts by setting timeout to None.
- if self.closed:
- raise RuntimeError("Session is closed")
- ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint)
- if data is not None and json is not None:
- raise ValueError(
- "data and json parameters can not be used at the same time"
- )
- elif json is not None:
- data = payload.JsonPayload(json, dumps=self._json_serialize)
- if not isinstance(chunked, bool) and chunked is not None:
- warnings.warn("Chunk size is deprecated #1615", DeprecationWarning)
- redirects = 0
- history = []
- version = self._version
- # Merge with default headers and transform to CIMultiDict
- headers = self._prepare_headers(headers)
- proxy_headers = self._prepare_headers(proxy_headers)
- try:
- url = URL(str_or_url)
- except ValueError as e:
- raise InvalidURL(str_or_url) from e
- skip_headers = set(self._skip_auto_headers)
- if skip_auto_headers is not None:
- for i in skip_auto_headers:
- skip_headers.add(istr(i))
- if proxy is not None:
- try:
- proxy = URL(proxy)
- except ValueError as e:
- raise InvalidURL(proxy) from e
- if timeout is sentinel:
- real_timeout = self._timeout # type: ClientTimeout
- else:
- if not isinstance(timeout, ClientTimeout):
- real_timeout = ClientTimeout(total=timeout) # type: ignore
- else:
- real_timeout = timeout
- # timeout is cumulative for all request operations
- # (request, redirects, responses, data consuming)
- tm = TimeoutHandle(self._loop, real_timeout.total)
- handle = tm.start()
- if read_bufsize is None:
- read_bufsize = self._read_bufsize
- traces = [
- Trace(
- self,
- trace_config,
- trace_config.trace_config_ctx(trace_request_ctx=trace_request_ctx),
- )
- for trace_config in self._trace_configs
- ]
- for trace in traces:
- await trace.send_request_start(method, url, headers)
- timer = tm.timer()
- try:
- with timer:
- while True:
- url, auth_from_url = strip_auth_from_url(url)
- if auth and auth_from_url:
- raise ValueError(
- "Cannot combine AUTH argument with "
- "credentials encoded in URL"
- )
- if auth is None:
- auth = auth_from_url
- if auth is None:
- auth = self._default_auth
- # It would be confusing if we support explicit
- # Authorization header with auth argument
- if (
- headers is not None
- and auth is not None
- and hdrs.AUTHORIZATION in headers
- ):
- raise ValueError(
- "Cannot combine AUTHORIZATION header "
- "with AUTH argument or credentials "
- "encoded in URL"
- )
- all_cookies = self._cookie_jar.filter_cookies(url)
- if cookies is not None:
- tmp_cookie_jar = CookieJar()
- tmp_cookie_jar.update_cookies(cookies)
- req_cookies = tmp_cookie_jar.filter_cookies(url)
- if req_cookies:
- all_cookies.load(req_cookies)
- if proxy is not None:
- proxy = URL(proxy)
- elif self._trust_env:
- for scheme, proxy_info in proxies_from_env().items():
- if scheme == url.scheme:
- proxy = proxy_info.proxy
- proxy_auth = proxy_info.proxy_auth
- break
- req = self._request_class(
- method,
- url,
- params=params,
- headers=headers,
- skip_auto_headers=skip_headers,
- data=data,
- cookies=all_cookies,
- auth=auth,
- version=version,
- compress=compress,
- chunked=chunked,
- expect100=expect100,
- loop=self._loop,
- response_class=self._response_class,
- proxy=proxy,
- proxy_auth=proxy_auth,
- timer=timer,
- session=self,
- ssl=ssl,
- proxy_headers=proxy_headers,
- traces=traces,
- )
- # connection timeout
- try:
- with CeilTimeout(real_timeout.connect, loop=self._loop):
- assert self._connector is not None
- conn = await self._connector.connect(
- req, traces=traces, timeout=real_timeout
- )
- except asyncio.TimeoutError as exc:
- raise ServerTimeoutError(
- "Connection timeout " "to host {}".format(url)
- ) from exc
- assert conn.transport is not None
- assert conn.protocol is not None
- conn.protocol.set_response_params(
- timer=timer,
- skip_payload=method.upper() == "HEAD",
- read_until_eof=read_until_eof,
- auto_decompress=self._auto_decompress,
- read_timeout=real_timeout.sock_read,
- read_bufsize=read_bufsize,
- )
- try:
- try:
- resp = await req.send(conn)
- try:
- await resp.start(conn)
- except BaseException:
- resp.close()
- raise
- except BaseException:
- conn.close()
- raise
- except ClientError:
- raise
- except OSError as exc:
- raise ClientOSError(*exc.args) from exc
- self._cookie_jar.update_cookies(resp.cookies, resp.url)
- # redirects
- if resp.status in (301, 302, 303, 307, 308) and allow_redirects:
- for trace in traces:
- await trace.send_request_redirect(
- method, url, headers, resp
- )
- redirects += 1
- history.append(resp)
- if max_redirects and redirects >= max_redirects:
- resp.close()
- raise TooManyRedirects(
- history[0].request_info, tuple(history)
- )
- # For 301 and 302, mimic IE, now changed in RFC
- # https://github.com/kennethreitz/requests/pull/269
- if (resp.status == 303 and resp.method != hdrs.METH_HEAD) or (
- resp.status in (301, 302) and resp.method == hdrs.METH_POST
- ):
- method = hdrs.METH_GET
- data = None
- if headers.get(hdrs.CONTENT_LENGTH):
- headers.pop(hdrs.CONTENT_LENGTH)
- r_url = resp.headers.get(hdrs.LOCATION) or resp.headers.get(
- hdrs.URI
- )
- if r_url is None:
- # see github.com/aio-libs/aiohttp/issues/2022
- break
- else:
- # reading from correct redirection
- # response is forbidden
- resp.release()
- try:
- parsed_url = URL(
- r_url, encoded=not self._requote_redirect_url
- )
- except ValueError as e:
- raise InvalidURL(r_url) from e
- scheme = parsed_url.scheme
- if scheme not in ("http", "https", ""):
- resp.close()
- raise ValueError("Can redirect only to http or https")
- elif not scheme:
- parsed_url = url.join(parsed_url)
- if url.origin() != parsed_url.origin():
- auth = None
- headers.pop(hdrs.AUTHORIZATION, None)
- url = parsed_url
- params = None
- resp.release()
- continue
- break
- # check response status
- if raise_for_status is None:
- raise_for_status = self._raise_for_status
- if raise_for_status:
- resp.raise_for_status()
- # register connection
- if handle is not None:
- if resp.connection is not None:
- resp.connection.add_callback(handle.cancel)
- else:
- handle.cancel()
- resp._history = tuple(history)
- for trace in traces:
- await trace.send_request_end(method, url, headers, resp)
- return resp
- except BaseException as e:
- # cleanup timer
- tm.close()
- if handle:
- handle.cancel()
- handle = None
- for trace in traces:
- await trace.send_request_exception(method, url, headers, e)
- raise
- def ws_connect(
- self,
- url: StrOrURL,
- *,
- method: str = hdrs.METH_GET,
- protocols: Iterable[str] = (),
- timeout: float = 10.0,
- receive_timeout: Optional[float] = None,
- autoclose: bool = True,
- autoping: bool = True,
- heartbeat: Optional[float] = None,
- auth: Optional[BasicAuth] = None,
- origin: Optional[str] = None,
- headers: Optional[LooseHeaders] = None,
- proxy: Optional[StrOrURL] = None,
- proxy_auth: Optional[BasicAuth] = None,
- ssl: Union[SSLContext, bool, None, Fingerprint] = None,
- verify_ssl: Optional[bool] = None,
- fingerprint: Optional[bytes] = None,
- ssl_context: Optional[SSLContext] = None,
- proxy_headers: Optional[LooseHeaders] = None,
- compress: int = 0,
- max_msg_size: int = 4 * 1024 * 1024,
- ) -> "_WSRequestContextManager":
- """Initiate websocket connection."""
- return _WSRequestContextManager(
- self._ws_connect(
- url,
- method=method,
- protocols=protocols,
- timeout=timeout,
- receive_timeout=receive_timeout,
- autoclose=autoclose,
- autoping=autoping,
- heartbeat=heartbeat,
- auth=auth,
- origin=origin,
- headers=headers,
- proxy=proxy,
- proxy_auth=proxy_auth,
- ssl=ssl,
- verify_ssl=verify_ssl,
- fingerprint=fingerprint,
- ssl_context=ssl_context,
- proxy_headers=proxy_headers,
- compress=compress,
- max_msg_size=max_msg_size,
- )
- )
- async def _ws_connect(
- self,
- url: StrOrURL,
- *,
- method: str = hdrs.METH_GET,
- protocols: Iterable[str] = (),
- timeout: float = 10.0,
- receive_timeout: Optional[float] = None,
- autoclose: bool = True,
- autoping: bool = True,
- heartbeat: Optional[float] = None,
- auth: Optional[BasicAuth] = None,
- origin: Optional[str] = None,
- headers: Optional[LooseHeaders] = None,
- proxy: Optional[StrOrURL] = None,
- proxy_auth: Optional[BasicAuth] = None,
- ssl: Union[SSLContext, bool, None, Fingerprint] = None,
- verify_ssl: Optional[bool] = None,
- fingerprint: Optional[bytes] = None,
- ssl_context: Optional[SSLContext] = None,
- proxy_headers: Optional[LooseHeaders] = None,
- compress: int = 0,
- max_msg_size: int = 4 * 1024 * 1024,
- ) -> ClientWebSocketResponse:
- if headers is None:
- real_headers = CIMultiDict() # type: CIMultiDict[str]
- else:
- real_headers = CIMultiDict(headers)
- default_headers = {
- hdrs.UPGRADE: "websocket",
- hdrs.CONNECTION: "upgrade",
- hdrs.SEC_WEBSOCKET_VERSION: "13",
- }
- for key, value in default_headers.items():
- real_headers.setdefault(key, value)
- sec_key = base64.b64encode(os.urandom(16))
- real_headers[hdrs.SEC_WEBSOCKET_KEY] = sec_key.decode()
- if protocols:
- real_headers[hdrs.SEC_WEBSOCKET_PROTOCOL] = ",".join(protocols)
- if origin is not None:
- real_headers[hdrs.ORIGIN] = origin
- if compress:
- extstr = ws_ext_gen(compress=compress)
- real_headers[hdrs.SEC_WEBSOCKET_EXTENSIONS] = extstr
- ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint)
- # send request
- resp = await self.request(
- method,
- url,
- headers=real_headers,
- read_until_eof=False,
- auth=auth,
- proxy=proxy,
- proxy_auth=proxy_auth,
- ssl=ssl,
- proxy_headers=proxy_headers,
- )
- try:
- # check handshake
- if resp.status != 101:
- raise WSServerHandshakeError(
- resp.request_info,
- resp.history,
- message="Invalid response status",
- status=resp.status,
- headers=resp.headers,
- )
- if resp.headers.get(hdrs.UPGRADE, "").lower() != "websocket":
- raise WSServerHandshakeError(
- resp.request_info,
- resp.history,
- message="Invalid upgrade header",
- status=resp.status,
- headers=resp.headers,
- )
- if resp.headers.get(hdrs.CONNECTION, "").lower() != "upgrade":
- raise WSServerHandshakeError(
- resp.request_info,
- resp.history,
- message="Invalid connection header",
- status=resp.status,
- headers=resp.headers,
- )
- # key calculation
- r_key = resp.headers.get(hdrs.SEC_WEBSOCKET_ACCEPT, "")
- match = base64.b64encode(hashlib.sha1(sec_key + WS_KEY).digest()).decode()
- if r_key != match:
- raise WSServerHandshakeError(
- resp.request_info,
- resp.history,
- message="Invalid challenge response",
- status=resp.status,
- headers=resp.headers,
- )
- # websocket protocol
- protocol = None
- if protocols and hdrs.SEC_WEBSOCKET_PROTOCOL in resp.headers:
- resp_protocols = [
- proto.strip()
- for proto in resp.headers[hdrs.SEC_WEBSOCKET_PROTOCOL].split(",")
- ]
- for proto in resp_protocols:
- if proto in protocols:
- protocol = proto
- break
- # websocket compress
- notakeover = False
- if compress:
- compress_hdrs = resp.headers.get(hdrs.SEC_WEBSOCKET_EXTENSIONS)
- if compress_hdrs:
- try:
- compress, notakeover = ws_ext_parse(compress_hdrs)
- except WSHandshakeError as exc:
- raise WSServerHandshakeError(
- resp.request_info,
- resp.history,
- message=exc.args[0],
- status=resp.status,
- headers=resp.headers,
- ) from exc
- else:
- compress = 0
- notakeover = False
- conn = resp.connection
- assert conn is not None
- conn_proto = conn.protocol
- assert conn_proto is not None
- transport = conn.transport
- assert transport is not None
- reader = FlowControlDataQueue(
- conn_proto, 2 ** 16, loop=self._loop
- ) # type: FlowControlDataQueue[WSMessage]
- conn_proto.set_parser(WebSocketReader(reader, max_msg_size), reader)
- writer = WebSocketWriter(
- conn_proto,
- transport,
- use_mask=True,
- compress=compress,
- notakeover=notakeover,
- )
- except BaseException:
- resp.close()
- raise
- else:
- return self._ws_response_class(
- reader,
- writer,
- protocol,
- resp,
- timeout,
- autoclose,
- autoping,
- self._loop,
- receive_timeout=receive_timeout,
- heartbeat=heartbeat,
- compress=compress,
- client_notakeover=notakeover,
- )
- def _prepare_headers(self, headers: Optional[LooseHeaders]) -> "CIMultiDict[str]":
- """Add default headers and transform it to CIMultiDict"""
- # Convert headers to MultiDict
- result = CIMultiDict(self._default_headers)
- if headers:
- if not isinstance(headers, (MultiDictProxy, MultiDict)):
- headers = CIMultiDict(headers)
- added_names = set() # type: Set[str]
- for key, value in headers.items():
- if key in added_names:
- result.add(key, value)
- else:
- result[key] = value
- added_names.add(key)
- return result
- def get(
- self, url: StrOrURL, *, allow_redirects: bool = True, **kwargs: Any
- ) -> "_RequestContextManager":
- """Perform HTTP GET request."""
- return _RequestContextManager(
- self._request(hdrs.METH_GET, url, allow_redirects=allow_redirects, **kwargs)
- )
- def options(
- self, url: StrOrURL, *, allow_redirects: bool = True, **kwargs: Any
- ) -> "_RequestContextManager":
- """Perform HTTP OPTIONS request."""
- return _RequestContextManager(
- self._request(
- hdrs.METH_OPTIONS, url, allow_redirects=allow_redirects, **kwargs
- )
- )
- def head(
- self, url: StrOrURL, *, allow_redirects: bool = False, **kwargs: Any
- ) -> "_RequestContextManager":
- """Perform HTTP HEAD request."""
- return _RequestContextManager(
- self._request(
- hdrs.METH_HEAD, url, allow_redirects=allow_redirects, **kwargs
- )
- )
- def post(
- self, url: StrOrURL, *, data: Any = None, **kwargs: Any
- ) -> "_RequestContextManager":
- """Perform HTTP POST request."""
- return _RequestContextManager(
- self._request(hdrs.METH_POST, url, data=data, **kwargs)
- )
- def put(
- self, url: StrOrURL, *, data: Any = None, **kwargs: Any
- ) -> "_RequestContextManager":
- """Perform HTTP PUT request."""
- return _RequestContextManager(
- self._request(hdrs.METH_PUT, url, data=data, **kwargs)
- )
- def patch(
- self, url: StrOrURL, *, data: Any = None, **kwargs: Any
- ) -> "_RequestContextManager":
- """Perform HTTP PATCH request."""
- return _RequestContextManager(
- self._request(hdrs.METH_PATCH, url, data=data, **kwargs)
- )
- def delete(self, url: StrOrURL, **kwargs: Any) -> "_RequestContextManager":
- """Perform HTTP DELETE request."""
- return _RequestContextManager(self._request(hdrs.METH_DELETE, url, **kwargs))
- async def close(self) -> None:
- """Close underlying connector.
- Release all acquired resources.
- """
- if not self.closed:
- if self._connector is not None and self._connector_owner:
- await self._connector.close()
- self._connector = None
- @property
- def closed(self) -> bool:
- """Is client session closed.
- A readonly property.
- """
- return self._connector is None or self._connector.closed
- @property
- def connector(self) -> Optional[BaseConnector]:
- """Connector instance used for the session."""
- return self._connector
- @property
- def cookie_jar(self) -> AbstractCookieJar:
- """The session cookies."""
- return self._cookie_jar
- @property
- def version(self) -> Tuple[int, int]:
- """The session HTTP protocol version."""
- return self._version
- @property
- def requote_redirect_url(self) -> bool:
- """Do URL requoting on redirection handling."""
- return self._requote_redirect_url
- @requote_redirect_url.setter
- def requote_redirect_url(self, val: bool) -> None:
- """Do URL requoting on redirection handling."""
- warnings.warn(
- "session.requote_redirect_url modification " "is deprecated #2778",
- DeprecationWarning,
- stacklevel=2,
- )
- self._requote_redirect_url = val
- @property
- def loop(self) -> asyncio.AbstractEventLoop:
- """Session's loop."""
- warnings.warn(
- "client.loop property is deprecated", DeprecationWarning, stacklevel=2
- )
- return self._loop
- @property
- def timeout(self) -> Union[object, ClientTimeout]:
- """Timeout for the session."""
- return self._timeout
- @property
- def headers(self) -> "CIMultiDict[str]":
- """The default headers of the client session."""
- return self._default_headers
- @property
- def skip_auto_headers(self) -> FrozenSet[istr]:
- """Headers for which autogeneration should be skipped"""
- return self._skip_auto_headers
- @property
- def auth(self) -> Optional[BasicAuth]:
- """An object that represents HTTP Basic Authorization"""
- return self._default_auth
- @property
- def json_serialize(self) -> JSONEncoder:
- """Json serializer callable"""
- return self._json_serialize
- @property
- def connector_owner(self) -> bool:
- """Should connector be closed on session closing"""
- return self._connector_owner
- @property
- def raise_for_status(
- self,
- ) -> Union[bool, Callable[[ClientResponse], Awaitable[None]]]:
- """
- Should `ClientResponse.raise_for_status()`
- be called for each response
- """
- return self._raise_for_status
- @property
- def auto_decompress(self) -> bool:
- """Should the body response be automatically decompressed"""
- return self._auto_decompress
- @property
- def trust_env(self) -> bool:
- """
- Should get proxies information
- from HTTP_PROXY / HTTPS_PROXY environment variables
- or ~/.netrc file if present
- """
- return self._trust_env
- @property
- def trace_configs(self) -> List[TraceConfig]:
- """A list of TraceConfig instances used for client tracing"""
- return self._trace_configs
- def detach(self) -> None:
- """Detach connector from session without closing the former.
- Session is switched to closed state anyway.
- """
- self._connector = None
- def __enter__(self) -> None:
- raise TypeError("Use async with instead")
- def __exit__(
- self,
- exc_type: Optional[Type[BaseException]],
- exc_val: Optional[BaseException],
- exc_tb: Optional[TracebackType],
- ) -> None:
- # __exit__ should exist in pair with __enter__ but never executed
- pass # pragma: no cover
- async def __aenter__(self) -> "ClientSession":
- return self
- async def __aexit__(
- self,
- exc_type: Optional[Type[BaseException]],
- exc_val: Optional[BaseException],
- exc_tb: Optional[TracebackType],
- ) -> None:
- await self.close()
- class _BaseRequestContextManager(Coroutine[Any, Any, _RetType], Generic[_RetType]):
- __slots__ = ("_coro", "_resp")
- def __init__(self, coro: Coroutine["asyncio.Future[Any]", None, _RetType]) -> None:
- self._coro = coro
- def send(self, arg: None) -> "asyncio.Future[Any]":
- return self._coro.send(arg)
- def throw(self, arg: BaseException) -> None: # type: ignore
- self._coro.throw(arg)
- def close(self) -> None:
- return self._coro.close()
- def __await__(self) -> Generator[Any, None, _RetType]:
- ret = self._coro.__await__()
- return ret
- def __iter__(self) -> Generator[Any, None, _RetType]:
- return self.__await__()
- async def __aenter__(self) -> _RetType:
- self._resp = await self._coro
- return self._resp
- class _RequestContextManager(_BaseRequestContextManager[ClientResponse]):
- async def __aexit__(
- self,
- exc_type: Optional[Type[BaseException]],
- exc: Optional[BaseException],
- tb: Optional[TracebackType],
- ) -> None:
- # We're basing behavior on the exception as it can be caused by
- # user code unrelated to the status of the connection. If you
- # would like to close a connection you must do that
- # explicitly. Otherwise connection error handling should kick in
- # and close/recycle the connection as required.
- self._resp.release()
- class _WSRequestContextManager(_BaseRequestContextManager[ClientWebSocketResponse]):
- async def __aexit__(
- self,
- exc_type: Optional[Type[BaseException]],
- exc: Optional[BaseException],
- tb: Optional[TracebackType],
- ) -> None:
- await self._resp.close()
- class _SessionRequestContextManager:
- __slots__ = ("_coro", "_resp", "_session")
- def __init__(
- self,
- coro: Coroutine["asyncio.Future[Any]", None, ClientResponse],
- session: ClientSession,
- ) -> None:
- self._coro = coro
- self._resp = None # type: Optional[ClientResponse]
- self._session = session
- async def __aenter__(self) -> ClientResponse:
- try:
- self._resp = await self._coro
- except BaseException:
- await self._session.close()
- raise
- else:
- return self._resp
- async def __aexit__(
- self,
- exc_type: Optional[Type[BaseException]],
- exc: Optional[BaseException],
- tb: Optional[TracebackType],
- ) -> None:
- assert self._resp is not None
- self._resp.close()
- await self._session.close()
- def request(
- method: str,
- url: StrOrURL,
- *,
- params: Optional[Mapping[str, str]] = None,
- data: Any = None,
- json: Any = None,
- headers: Optional[LooseHeaders] = None,
- skip_auto_headers: Optional[Iterable[str]] = None,
- auth: Optional[BasicAuth] = None,
- allow_redirects: bool = True,
- max_redirects: int = 10,
- compress: Optional[str] = None,
- chunked: Optional[bool] = None,
- expect100: bool = False,
- raise_for_status: Optional[bool] = None,
- read_until_eof: bool = True,
- proxy: Optional[StrOrURL] = None,
- proxy_auth: Optional[BasicAuth] = None,
- timeout: Union[ClientTimeout, object] = sentinel,
- cookies: Optional[LooseCookies] = None,
- version: HttpVersion = http.HttpVersion11,
- connector: Optional[BaseConnector] = None,
- read_bufsize: Optional[int] = None,
- loop: Optional[asyncio.AbstractEventLoop] = None,
- ) -> _SessionRequestContextManager:
- """Constructs and sends a request. Returns response object.
- method - HTTP method
- url - request url
- params - (optional) Dictionary or bytes to be sent in the query
- string of the new request
- data - (optional) Dictionary, bytes, or file-like object to
- send in the body of the request
- json - (optional) Any json compatible python object
- headers - (optional) Dictionary of HTTP Headers to send with
- the request
- cookies - (optional) Dict object to send with the request
- auth - (optional) BasicAuth named tuple represent HTTP Basic Auth
- auth - aiohttp.helpers.BasicAuth
- allow_redirects - (optional) If set to False, do not follow
- redirects
- version - Request HTTP version.
- compress - Set to True if request has to be compressed
- with deflate encoding.
- chunked - Set to chunk size for chunked transfer encoding.
- expect100 - Expect 100-continue response from server.
- connector - BaseConnector sub-class instance to support
- connection pooling.
- read_until_eof - Read response until eof if response
- does not have Content-Length header.
- loop - Optional event loop.
- timeout - Optional ClientTimeout settings structure, 5min
- total timeout by default.
- Usage::
- >>> import aiohttp
- >>> resp = await aiohttp.request('GET', 'http://python.org/')
- >>> resp
- <ClientResponse(python.org/) [200]>
- >>> data = await resp.read()
- """
- connector_owner = False
- if connector is None:
- connector_owner = True
- connector = TCPConnector(loop=loop, force_close=True)
- session = ClientSession(
- loop=loop,
- cookies=cookies,
- version=version,
- timeout=timeout,
- connector=connector,
- connector_owner=connector_owner,
- )
- return _SessionRequestContextManager(
- session._request(
- method,
- url,
- params=params,
- data=data,
- json=json,
- headers=headers,
- skip_auto_headers=skip_auto_headers,
- auth=auth,
- allow_redirects=allow_redirects,
- max_redirects=max_redirects,
- compress=compress,
- chunked=chunked,
- expect100=expect100,
- raise_for_status=raise_for_status,
- read_until_eof=read_until_eof,
- proxy=proxy,
- proxy_auth=proxy_auth,
- read_bufsize=read_bufsize,
- ),
- session,
- )
|