123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844 |
- """Generic socket server classes.
- This module tries to capture the various aspects of defining a server:
- For socket-based servers:
- - address family:
- - AF_INET{,6}: IP (Internet Protocol) sockets (default)
- - AF_UNIX: Unix domain sockets
- - others, e.g. AF_DECNET are conceivable (see <socket.h>
- - socket type:
- - SOCK_STREAM (reliable stream, e.g. TCP)
- - SOCK_DGRAM (datagrams, e.g. UDP)
- For request-based servers (including socket-based):
- - client address verification before further looking at the request
- (This is actually a hook for any processing that needs to look
- at the request before anything else, e.g. logging)
- - how to handle multiple requests:
- - synchronous (one request is handled at a time)
- - forking (each request is handled by a new process)
- - threading (each request is handled by a new thread)
- The classes in this module favor the server type that is simplest to
- write: a synchronous TCP/IP server. This is bad class design, but
- saves some typing. (There's also the issue that a deep class hierarchy
- slows down method lookups.)
- There are five classes in an inheritance diagram, four of which represent
- synchronous servers of four types:
- +------------+
- | BaseServer |
- +------------+
- |
- v
- +-----------+ +------------------+
- | TCPServer |------->| UnixStreamServer |
- +-----------+ +------------------+
- |
- v
- +-----------+ +--------------------+
- | UDPServer |------->| UnixDatagramServer |
- +-----------+ +--------------------+
- Note that UnixDatagramServer derives from UDPServer, not from
- UnixStreamServer -- the only difference between an IP and a Unix
- stream server is the address family, which is simply repeated in both
- unix server classes.
- Forking and threading versions of each type of server can be created
- using the ForkingMixIn and ThreadingMixIn mix-in classes. For
- instance, a threading UDP server class is created as follows:
- class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
- The Mix-in class must come first, since it overrides a method defined
- in UDPServer! Setting the various member variables also changes
- the behavior of the underlying server mechanism.
- To implement a service, you must derive a class from
- BaseRequestHandler and redefine its handle() method. You can then run
- various versions of the service by combining one of the server classes
- with your request handler class.
- The request handler class must be different for datagram or stream
- services. This can be hidden by using the request handler
- subclasses StreamRequestHandler or DatagramRequestHandler.
- Of course, you still have to use your head!
- For instance, it makes no sense to use a forking server if the service
- contains state in memory that can be modified by requests (since the
- modifications in the child process would never reach the initial state
- kept in the parent process and passed to each child). In this case,
- you can use a threading server, but you will probably have to use
- locks to avoid two requests that come in nearly simultaneous to apply
- conflicting changes to the server state.
- On the other hand, if you are building e.g. an HTTP server, where all
- data is stored externally (e.g. in the file system), a synchronous
- class will essentially render the service "deaf" while one request is
- being handled -- which may be for a very long time if a client is slow
- to read all the data it has requested. Here a threading or forking
- server is appropriate.
- In some cases, it may be appropriate to process part of a request
- synchronously, but to finish processing in a forked child depending on
- the request data. This can be implemented by using a synchronous
- server and doing an explicit fork in the request handler class
- handle() method.
- Another approach to handling multiple simultaneous requests in an
- environment that supports neither threads nor fork (or where these are
- too expensive or inappropriate for the service) is to maintain an
- explicit table of partially finished requests and to use a selector to
- decide which request to work on next (or whether to handle a new
- incoming request). This is particularly important for stream services
- where each client can potentially be connected for a long time (if
- threads or subprocesses cannot be used).
- Future work:
- - Standard classes for Sun RPC (which uses either UDP or TCP)
- - Standard mix-in classes to implement various authentication
- and encryption schemes
- XXX Open problems:
- - What to do with out-of-band data?
- BaseServer:
- - split generic "request" functionality out into BaseServer class.
- Copyright (C) 2000 Luke Kenneth Casson Leighton <lkcl@samba.org>
- example: read entries from a SQL database (requires overriding
- get_request() to return a table entry from the database).
- entry is processed by a RequestHandlerClass.
- """
- # Author of the BaseServer patch: Luke Kenneth Casson Leighton
- __version__ = "0.4"
- import socket
- import selectors
- import os
- import sys
- import threading
- from io import BufferedIOBase
- from time import monotonic as time
- __all__ = ["BaseServer", "TCPServer", "UDPServer",
- "ThreadingUDPServer", "ThreadingTCPServer",
- "BaseRequestHandler", "StreamRequestHandler",
- "DatagramRequestHandler", "ThreadingMixIn"]
- if hasattr(os, "fork"):
- __all__.extend(["ForkingUDPServer","ForkingTCPServer", "ForkingMixIn"])
- if hasattr(socket, "AF_UNIX"):
- __all__.extend(["UnixStreamServer","UnixDatagramServer",
- "ThreadingUnixStreamServer",
- "ThreadingUnixDatagramServer"])
- # poll/select have the advantage of not requiring any extra file descriptor,
- # contrarily to epoll/kqueue (also, they require a single syscall).
- if hasattr(selectors, 'PollSelector'):
- _ServerSelector = selectors.PollSelector
- else:
- _ServerSelector = selectors.SelectSelector
- class BaseServer:
- """Base class for server classes.
- Methods for the caller:
- - __init__(server_address, RequestHandlerClass)
- - serve_forever(poll_interval=0.5)
- - shutdown()
- - handle_request() # if you do not use serve_forever()
- - fileno() -> int # for selector
- Methods that may be overridden:
- - server_bind()
- - server_activate()
- - get_request() -> request, client_address
- - handle_timeout()
- - verify_request(request, client_address)
- - server_close()
- - process_request(request, client_address)
- - shutdown_request(request)
- - close_request(request)
- - service_actions()
- - handle_error()
- Methods for derived classes:
- - finish_request(request, client_address)
- Class variables that may be overridden by derived classes or
- instances:
- - timeout
- - address_family
- - socket_type
- - allow_reuse_address
- Instance variables:
- - RequestHandlerClass
- - socket
- """
- timeout = None
- def __init__(self, server_address, RequestHandlerClass):
- """Constructor. May be extended, do not override."""
- self.server_address = server_address
- self.RequestHandlerClass = RequestHandlerClass
- self.__is_shut_down = threading.Event()
- self.__shutdown_request = False
- def server_activate(self):
- """Called by constructor to activate the server.
- May be overridden.
- """
- pass
- def serve_forever(self, poll_interval=0.5):
- """Handle one request at a time until shutdown.
- Polls for shutdown every poll_interval seconds. Ignores
- self.timeout. If you need to do periodic tasks, do them in
- another thread.
- """
- self.__is_shut_down.clear()
- try:
- # XXX: Consider using another file descriptor or connecting to the
- # socket to wake this up instead of polling. Polling reduces our
- # responsiveness to a shutdown request and wastes cpu at all other
- # times.
- with _ServerSelector() as selector:
- selector.register(self, selectors.EVENT_READ)
- while not self.__shutdown_request:
- ready = selector.select(poll_interval)
- # bpo-35017: shutdown() called during select(), exit immediately.
- if self.__shutdown_request:
- break
- if ready:
- self._handle_request_noblock()
- self.service_actions()
- finally:
- self.__shutdown_request = False
- self.__is_shut_down.set()
- def shutdown(self):
- """Stops the serve_forever loop.
- Blocks until the loop has finished. This must be called while
- serve_forever() is running in another thread, or it will
- deadlock.
- """
- self.__shutdown_request = True
- self.__is_shut_down.wait()
- def service_actions(self):
- """Called by the serve_forever() loop.
- May be overridden by a subclass / Mixin to implement any code that
- needs to be run during the loop.
- """
- pass
- # The distinction between handling, getting, processing and finishing a
- # request is fairly arbitrary. Remember:
- #
- # - handle_request() is the top-level call. It calls selector.select(),
- # get_request(), verify_request() and process_request()
- # - get_request() is different for stream or datagram sockets
- # - process_request() is the place that may fork a new process or create a
- # new thread to finish the request
- # - finish_request() instantiates the request handler class; this
- # constructor will handle the request all by itself
- def handle_request(self):
- """Handle one request, possibly blocking.
- Respects self.timeout.
- """
- # Support people who used socket.settimeout() to escape
- # handle_request before self.timeout was available.
- timeout = self.socket.gettimeout()
- if timeout is None:
- timeout = self.timeout
- elif self.timeout is not None:
- timeout = min(timeout, self.timeout)
- if timeout is not None:
- deadline = time() + timeout
- # Wait until a request arrives or the timeout expires - the loop is
- # necessary to accommodate early wakeups due to EINTR.
- with _ServerSelector() as selector:
- selector.register(self, selectors.EVENT_READ)
- while True:
- ready = selector.select(timeout)
- if ready:
- return self._handle_request_noblock()
- else:
- if timeout is not None:
- timeout = deadline - time()
- if timeout < 0:
- return self.handle_timeout()
- def _handle_request_noblock(self):
- """Handle one request, without blocking.
- I assume that selector.select() has returned that the socket is
- readable before this function was called, so there should be no risk of
- blocking in get_request().
- """
- try:
- request, client_address = self.get_request()
- except OSError:
- return
- if self.verify_request(request, client_address):
- try:
- self.process_request(request, client_address)
- except Exception:
- self.handle_error(request, client_address)
- self.shutdown_request(request)
- except:
- self.shutdown_request(request)
- raise
- else:
- self.shutdown_request(request)
- def handle_timeout(self):
- """Called if no new request arrives within self.timeout.
- Overridden by ForkingMixIn.
- """
- pass
- def verify_request(self, request, client_address):
- """Verify the request. May be overridden.
- Return True if we should proceed with this request.
- """
- return True
- def process_request(self, request, client_address):
- """Call finish_request.
- Overridden by ForkingMixIn and ThreadingMixIn.
- """
- self.finish_request(request, client_address)
- self.shutdown_request(request)
- def server_close(self):
- """Called to clean-up the server.
- May be overridden.
- """
- pass
- def finish_request(self, request, client_address):
- """Finish one request by instantiating RequestHandlerClass."""
- self.RequestHandlerClass(request, client_address, self)
- def shutdown_request(self, request):
- """Called to shutdown and close an individual request."""
- self.close_request(request)
- def close_request(self, request):
- """Called to clean up an individual request."""
- pass
- def handle_error(self, request, client_address):
- """Handle an error gracefully. May be overridden.
- The default is to print a traceback and continue.
- """
- print('-'*40, file=sys.stderr)
- print('Exception occurred during processing of request from',
- client_address, file=sys.stderr)
- import traceback
- traceback.print_exc()
- print('-'*40, file=sys.stderr)
- def __enter__(self):
- return self
- def __exit__(self, *args):
- self.server_close()
- class TCPServer(BaseServer):
- """Base class for various socket-based server classes.
- Defaults to synchronous IP stream (i.e., TCP).
- Methods for the caller:
- - __init__(server_address, RequestHandlerClass, bind_and_activate=True)
- - serve_forever(poll_interval=0.5)
- - shutdown()
- - handle_request() # if you don't use serve_forever()
- - fileno() -> int # for selector
- Methods that may be overridden:
- - server_bind()
- - server_activate()
- - get_request() -> request, client_address
- - handle_timeout()
- - verify_request(request, client_address)
- - process_request(request, client_address)
- - shutdown_request(request)
- - close_request(request)
- - handle_error()
- Methods for derived classes:
- - finish_request(request, client_address)
- Class variables that may be overridden by derived classes or
- instances:
- - timeout
- - address_family
- - socket_type
- - request_queue_size (only for stream sockets)
- - allow_reuse_address
- Instance variables:
- - server_address
- - RequestHandlerClass
- - socket
- """
- address_family = socket.AF_INET
- socket_type = socket.SOCK_STREAM
- request_queue_size = 5
- allow_reuse_address = False
- def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
- """Constructor. May be extended, do not override."""
- BaseServer.__init__(self, server_address, RequestHandlerClass)
- self.socket = socket.socket(self.address_family,
- self.socket_type)
- if bind_and_activate:
- try:
- self.server_bind()
- self.server_activate()
- except:
- self.server_close()
- raise
- def server_bind(self):
- """Called by constructor to bind the socket.
- May be overridden.
- """
- if self.allow_reuse_address:
- self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.socket.bind(self.server_address)
- self.server_address = self.socket.getsockname()
- def server_activate(self):
- """Called by constructor to activate the server.
- May be overridden.
- """
- self.socket.listen(self.request_queue_size)
- def server_close(self):
- """Called to clean-up the server.
- May be overridden.
- """
- self.socket.close()
- def fileno(self):
- """Return socket file number.
- Interface required by selector.
- """
- return self.socket.fileno()
- def get_request(self):
- """Get the request and client address from the socket.
- May be overridden.
- """
- return self.socket.accept()
- def shutdown_request(self, request):
- """Called to shutdown and close an individual request."""
- try:
- #explicitly shutdown. socket.close() merely releases
- #the socket and waits for GC to perform the actual close.
- request.shutdown(socket.SHUT_WR)
- except OSError:
- pass #some platforms may raise ENOTCONN here
- self.close_request(request)
- def close_request(self, request):
- """Called to clean up an individual request."""
- request.close()
- class UDPServer(TCPServer):
- """UDP server class."""
- allow_reuse_address = False
- socket_type = socket.SOCK_DGRAM
- max_packet_size = 8192
- def get_request(self):
- data, client_addr = self.socket.recvfrom(self.max_packet_size)
- return (data, self.socket), client_addr
- def server_activate(self):
- # No need to call listen() for UDP.
- pass
- def shutdown_request(self, request):
- # No need to shutdown anything.
- self.close_request(request)
- def close_request(self, request):
- # No need to close anything.
- pass
- if hasattr(os, "fork"):
- class ForkingMixIn:
- """Mix-in class to handle each request in a new process."""
- timeout = 300
- active_children = None
- max_children = 40
- # If true, server_close() waits until all child processes complete.
- block_on_close = True
- def collect_children(self, *, blocking=False):
- """Internal routine to wait for children that have exited."""
- if self.active_children is None:
- return
- # If we're above the max number of children, wait and reap them until
- # we go back below threshold. Note that we use waitpid(-1) below to be
- # able to collect children in size(<defunct children>) syscalls instead
- # of size(<children>): the downside is that this might reap children
- # which we didn't spawn, which is why we only resort to this when we're
- # above max_children.
- while len(self.active_children) >= self.max_children:
- try:
- pid, _ = os.waitpid(-1, 0)
- self.active_children.discard(pid)
- except ChildProcessError:
- # we don't have any children, we're done
- self.active_children.clear()
- except OSError:
- break
- # Now reap all defunct children.
- for pid in self.active_children.copy():
- try:
- flags = 0 if blocking else os.WNOHANG
- pid, _ = os.waitpid(pid, flags)
- # if the child hasn't exited yet, pid will be 0 and ignored by
- # discard() below
- self.active_children.discard(pid)
- except ChildProcessError:
- # someone else reaped it
- self.active_children.discard(pid)
- except OSError:
- pass
- def handle_timeout(self):
- """Wait for zombies after self.timeout seconds of inactivity.
- May be extended, do not override.
- """
- self.collect_children()
- def service_actions(self):
- """Collect the zombie child processes regularly in the ForkingMixIn.
- service_actions is called in the BaseServer's serve_forever loop.
- """
- self.collect_children()
- def process_request(self, request, client_address):
- """Fork a new subprocess to process the request."""
- pid = os.fork()
- if pid:
- # Parent process
- if self.active_children is None:
- self.active_children = set()
- self.active_children.add(pid)
- self.close_request(request)
- return
- else:
- # Child process.
- # This must never return, hence os._exit()!
- status = 1
- try:
- self.finish_request(request, client_address)
- status = 0
- except Exception:
- self.handle_error(request, client_address)
- finally:
- try:
- self.shutdown_request(request)
- finally:
- os._exit(status)
- def server_close(self):
- super().server_close()
- self.collect_children(blocking=self.block_on_close)
- class _Threads(list):
- """
- Joinable list of all non-daemon threads.
- """
- def append(self, thread):
- self.reap()
- if thread.daemon:
- return
- super().append(thread)
- def pop_all(self):
- self[:], result = [], self[:]
- return result
- def join(self):
- for thread in self.pop_all():
- thread.join()
- def reap(self):
- self[:] = (thread for thread in self if thread.is_alive())
- class _NoThreads:
- """
- Degenerate version of _Threads.
- """
- def append(self, thread):
- pass
- def join(self):
- pass
- class ThreadingMixIn:
- """Mix-in class to handle each request in a new thread."""
- # Decides how threads will act upon termination of the
- # main process
- daemon_threads = False
- # If true, server_close() waits until all non-daemonic threads terminate.
- block_on_close = True
- # Threads object
- # used by server_close() to wait for all threads completion.
- _threads = _NoThreads()
- def process_request_thread(self, request, client_address):
- """Same as in BaseServer but as a thread.
- In addition, exception handling is done here.
- """
- try:
- self.finish_request(request, client_address)
- except Exception:
- self.handle_error(request, client_address)
- finally:
- self.shutdown_request(request)
- def process_request(self, request, client_address):
- """Start a new thread to process the request."""
- if self.block_on_close:
- vars(self).setdefault('_threads', _Threads())
- t = threading.Thread(target = self.process_request_thread,
- args = (request, client_address))
- t.daemon = self.daemon_threads
- self._threads.append(t)
- t.start()
- def server_close(self):
- super().server_close()
- self._threads.join()
- if hasattr(os, "fork"):
- class ForkingUDPServer(ForkingMixIn, UDPServer): pass
- class ForkingTCPServer(ForkingMixIn, TCPServer): pass
- class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
- class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
- if hasattr(socket, 'AF_UNIX'):
- class UnixStreamServer(TCPServer):
- address_family = socket.AF_UNIX
- class UnixDatagramServer(UDPServer):
- address_family = socket.AF_UNIX
- class ThreadingUnixStreamServer(ThreadingMixIn, UnixStreamServer): pass
- class ThreadingUnixDatagramServer(ThreadingMixIn, UnixDatagramServer): pass
- class BaseRequestHandler:
- """Base class for request handler classes.
- This class is instantiated for each request to be handled. The
- constructor sets the instance variables request, client_address
- and server, and then calls the handle() method. To implement a
- specific service, all you need to do is to derive a class which
- defines a handle() method.
- The handle() method can find the request as self.request, the
- client address as self.client_address, and the server (in case it
- needs access to per-server information) as self.server. Since a
- separate instance is created for each request, the handle() method
- can define other arbitrary instance variables.
- """
- def __init__(self, request, client_address, server):
- self.request = request
- self.client_address = client_address
- self.server = server
- self.setup()
- try:
- self.handle()
- finally:
- self.finish()
- def setup(self):
- pass
- def handle(self):
- pass
- def finish(self):
- pass
- # The following two classes make it possible to use the same service
- # class for stream or datagram servers.
- # Each class sets up these instance variables:
- # - rfile: a file object from which receives the request is read
- # - wfile: a file object to which the reply is written
- # When the handle() method returns, wfile is flushed properly
- class StreamRequestHandler(BaseRequestHandler):
- """Define self.rfile and self.wfile for stream sockets."""
- # Default buffer sizes for rfile, wfile.
- # We default rfile to buffered because otherwise it could be
- # really slow for large data (a getc() call per byte); we make
- # wfile unbuffered because (a) often after a write() we want to
- # read and we need to flush the line; (b) big writes to unbuffered
- # files are typically optimized by stdio even when big reads
- # aren't.
- rbufsize = -1
- wbufsize = 0
- # A timeout to apply to the request socket, if not None.
- timeout = None
- # Disable nagle algorithm for this socket, if True.
- # Use only when wbufsize != 0, to avoid small packets.
- disable_nagle_algorithm = False
- def setup(self):
- self.connection = self.request
- if self.timeout is not None:
- self.connection.settimeout(self.timeout)
- if self.disable_nagle_algorithm:
- self.connection.setsockopt(socket.IPPROTO_TCP,
- socket.TCP_NODELAY, True)
- self.rfile = self.connection.makefile('rb', self.rbufsize)
- if self.wbufsize == 0:
- self.wfile = _SocketWriter(self.connection)
- else:
- self.wfile = self.connection.makefile('wb', self.wbufsize)
- def finish(self):
- if not self.wfile.closed:
- try:
- self.wfile.flush()
- except socket.error:
- # A final socket error may have occurred here, such as
- # the local error ECONNABORTED.
- pass
- self.wfile.close()
- self.rfile.close()
- class _SocketWriter(BufferedIOBase):
- """Simple writable BufferedIOBase implementation for a socket
- Does not hold data in a buffer, avoiding any need to call flush()."""
- def __init__(self, sock):
- self._sock = sock
- def writable(self):
- return True
- def write(self, b):
- self._sock.sendall(b)
- with memoryview(b) as view:
- return view.nbytes
- def fileno(self):
- return self._sock.fileno()
- class DatagramRequestHandler(BaseRequestHandler):
- """Define self.rfile and self.wfile for datagram sockets."""
- def setup(self):
- from io import BytesIO
- self.packet, self.socket = self.request
- self.rfile = BytesIO(self.packet)
- self.wfile = BytesIO()
- def finish(self):
- self.socket.sendto(self.wfile.getvalue(), self.client_address)
|