123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381 |
- import asyncio
- import signal
- import socket
- from abc import ABC, abstractmethod
- from typing import Any, List, Optional, Set
- from yarl import URL
- from .web_app import Application
- from .web_server import Server
- try:
- from ssl import SSLContext
- except ImportError:
- SSLContext = object # type: ignore
- __all__ = (
- "BaseSite",
- "TCPSite",
- "UnixSite",
- "NamedPipeSite",
- "SockSite",
- "BaseRunner",
- "AppRunner",
- "ServerRunner",
- "GracefulExit",
- )
- class GracefulExit(SystemExit):
- code = 1
- def _raise_graceful_exit() -> None:
- raise GracefulExit()
- class BaseSite(ABC):
- __slots__ = ("_runner", "_shutdown_timeout", "_ssl_context", "_backlog", "_server")
- def __init__(
- self,
- runner: "BaseRunner",
- *,
- shutdown_timeout: float = 60.0,
- ssl_context: Optional[SSLContext] = None,
- backlog: int = 128,
- ) -> None:
- if runner.server is None:
- raise RuntimeError("Call runner.setup() before making a site")
- self._runner = runner
- self._shutdown_timeout = shutdown_timeout
- self._ssl_context = ssl_context
- self._backlog = backlog
- self._server = None # type: Optional[asyncio.AbstractServer]
- @property
- @abstractmethod
- def name(self) -> str:
- pass # pragma: no cover
- @abstractmethod
- async def start(self) -> None:
- self._runner._reg_site(self)
- async def stop(self) -> None:
- self._runner._check_site(self)
- if self._server is None:
- self._runner._unreg_site(self)
- return # not started yet
- self._server.close()
- # named pipes do not have wait_closed property
- if hasattr(self._server, "wait_closed"):
- await self._server.wait_closed()
- await self._runner.shutdown()
- assert self._runner.server
- await self._runner.server.shutdown(self._shutdown_timeout)
- self._runner._unreg_site(self)
- class TCPSite(BaseSite):
- __slots__ = ("_host", "_port", "_reuse_address", "_reuse_port")
- def __init__(
- self,
- runner: "BaseRunner",
- host: Optional[str] = None,
- port: Optional[int] = None,
- *,
- shutdown_timeout: float = 60.0,
- ssl_context: Optional[SSLContext] = None,
- backlog: int = 128,
- reuse_address: Optional[bool] = None,
- reuse_port: Optional[bool] = None,
- ) -> None:
- super().__init__(
- runner,
- shutdown_timeout=shutdown_timeout,
- ssl_context=ssl_context,
- backlog=backlog,
- )
- self._host = host
- if port is None:
- port = 8443 if self._ssl_context else 8080
- self._port = port
- self._reuse_address = reuse_address
- self._reuse_port = reuse_port
- @property
- def name(self) -> str:
- scheme = "https" if self._ssl_context else "http"
- host = "0.0.0.0" if self._host is None else self._host
- return str(URL.build(scheme=scheme, host=host, port=self._port))
- async def start(self) -> None:
- await super().start()
- loop = asyncio.get_event_loop()
- server = self._runner.server
- assert server is not None
- self._server = await loop.create_server(
- server,
- self._host,
- self._port,
- ssl=self._ssl_context,
- backlog=self._backlog,
- reuse_address=self._reuse_address,
- reuse_port=self._reuse_port,
- )
- class UnixSite(BaseSite):
- __slots__ = ("_path",)
- def __init__(
- self,
- runner: "BaseRunner",
- path: str,
- *,
- shutdown_timeout: float = 60.0,
- ssl_context: Optional[SSLContext] = None,
- backlog: int = 128,
- ) -> None:
- super().__init__(
- runner,
- shutdown_timeout=shutdown_timeout,
- ssl_context=ssl_context,
- backlog=backlog,
- )
- self._path = path
- @property
- def name(self) -> str:
- scheme = "https" if self._ssl_context else "http"
- return f"{scheme}://unix:{self._path}:"
- async def start(self) -> None:
- await super().start()
- loop = asyncio.get_event_loop()
- server = self._runner.server
- assert server is not None
- self._server = await loop.create_unix_server(
- server, self._path, ssl=self._ssl_context, backlog=self._backlog
- )
- class NamedPipeSite(BaseSite):
- __slots__ = ("_path",)
- def __init__(
- self, runner: "BaseRunner", path: str, *, shutdown_timeout: float = 60.0
- ) -> None:
- loop = asyncio.get_event_loop()
- if not isinstance(loop, asyncio.ProactorEventLoop): # type: ignore
- raise RuntimeError(
- "Named Pipes only available in proactor" "loop under windows"
- )
- super().__init__(runner, shutdown_timeout=shutdown_timeout)
- self._path = path
- @property
- def name(self) -> str:
- return self._path
- async def start(self) -> None:
- await super().start()
- loop = asyncio.get_event_loop()
- server = self._runner.server
- assert server is not None
- _server = await loop.start_serving_pipe(server, self._path) # type: ignore
- self._server = _server[0]
- class SockSite(BaseSite):
- __slots__ = ("_sock", "_name")
- def __init__(
- self,
- runner: "BaseRunner",
- sock: socket.socket,
- *,
- shutdown_timeout: float = 60.0,
- ssl_context: Optional[SSLContext] = None,
- backlog: int = 128,
- ) -> None:
- super().__init__(
- runner,
- shutdown_timeout=shutdown_timeout,
- ssl_context=ssl_context,
- backlog=backlog,
- )
- self._sock = sock
- scheme = "https" if self._ssl_context else "http"
- if hasattr(socket, "AF_UNIX") and sock.family == socket.AF_UNIX:
- name = f"{scheme}://unix:{sock.getsockname()}:"
- else:
- host, port = sock.getsockname()[:2]
- name = str(URL.build(scheme=scheme, host=host, port=port))
- self._name = name
- @property
- def name(self) -> str:
- return self._name
- async def start(self) -> None:
- await super().start()
- loop = asyncio.get_event_loop()
- server = self._runner.server
- assert server is not None
- self._server = await loop.create_server(
- server, sock=self._sock, ssl=self._ssl_context, backlog=self._backlog
- )
- class BaseRunner(ABC):
- __slots__ = ("_handle_signals", "_kwargs", "_server", "_sites")
- def __init__(self, *, handle_signals: bool = False, **kwargs: Any) -> None:
- self._handle_signals = handle_signals
- self._kwargs = kwargs
- self._server = None # type: Optional[Server]
- self._sites = [] # type: List[BaseSite]
- @property
- def server(self) -> Optional[Server]:
- return self._server
- @property
- def addresses(self) -> List[Any]:
- ret = [] # type: List[Any]
- for site in self._sites:
- server = site._server
- if server is not None:
- sockets = server.sockets
- if sockets is not None:
- for sock in sockets:
- ret.append(sock.getsockname())
- return ret
- @property
- def sites(self) -> Set[BaseSite]:
- return set(self._sites)
- async def setup(self) -> None:
- loop = asyncio.get_event_loop()
- if self._handle_signals:
- try:
- loop.add_signal_handler(signal.SIGINT, _raise_graceful_exit)
- loop.add_signal_handler(signal.SIGTERM, _raise_graceful_exit)
- except NotImplementedError: # pragma: no cover
- # add_signal_handler is not implemented on Windows
- pass
- self._server = await self._make_server()
- @abstractmethod
- async def shutdown(self) -> None:
- pass # pragma: no cover
- async def cleanup(self) -> None:
- loop = asyncio.get_event_loop()
- if self._server is None:
- # no started yet, do nothing
- return
- # The loop over sites is intentional, an exception on gather()
- # leaves self._sites in unpredictable state.
- # The loop guaranties that a site is either deleted on success or
- # still present on failure
- for site in list(self._sites):
- await site.stop()
- await self._cleanup_server()
- self._server = None
- if self._handle_signals:
- try:
- loop.remove_signal_handler(signal.SIGINT)
- loop.remove_signal_handler(signal.SIGTERM)
- except NotImplementedError: # pragma: no cover
- # remove_signal_handler is not implemented on Windows
- pass
- @abstractmethod
- async def _make_server(self) -> Server:
- pass # pragma: no cover
- @abstractmethod
- async def _cleanup_server(self) -> None:
- pass # pragma: no cover
- def _reg_site(self, site: BaseSite) -> None:
- if site in self._sites:
- raise RuntimeError(f"Site {site} is already registered in runner {self}")
- self._sites.append(site)
- def _check_site(self, site: BaseSite) -> None:
- if site not in self._sites:
- raise RuntimeError(f"Site {site} is not registered in runner {self}")
- def _unreg_site(self, site: BaseSite) -> None:
- if site not in self._sites:
- raise RuntimeError(f"Site {site} is not registered in runner {self}")
- self._sites.remove(site)
- class ServerRunner(BaseRunner):
- """Low-level web server runner"""
- __slots__ = ("_web_server",)
- def __init__(
- self, web_server: Server, *, handle_signals: bool = False, **kwargs: Any
- ) -> None:
- super().__init__(handle_signals=handle_signals, **kwargs)
- self._web_server = web_server
- async def shutdown(self) -> None:
- pass
- async def _make_server(self) -> Server:
- return self._web_server
- async def _cleanup_server(self) -> None:
- pass
- class AppRunner(BaseRunner):
- """Web Application runner"""
- __slots__ = ("_app",)
- def __init__(
- self, app: Application, *, handle_signals: bool = False, **kwargs: Any
- ) -> None:
- super().__init__(handle_signals=handle_signals, **kwargs)
- if not isinstance(app, Application):
- raise TypeError(
- "The first argument should be web.Application "
- "instance, got {!r}".format(app)
- )
- self._app = app
- @property
- def app(self) -> Application:
- return self._app
- async def shutdown(self) -> None:
- await self._app.shutdown()
- async def _make_server(self) -> Server:
- loop = asyncio.get_event_loop()
- self._app._set_loop(loop)
- self._app.on_startup.freeze()
- await self._app.startup()
- self._app.freeze()
- return self._app._make_handler(loop=loop, **self._kwargs)
- async def _cleanup_server(self) -> None:
- await self._app.cleanup()
|