123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171 |
- """An ISAPI extension base class implemented using a thread-pool."""
- # $Id$
- import sys
- import time
- from isapi import isapicon, ExtensionError
- import isapi.simple
- from win32file import GetQueuedCompletionStatus, CreateIoCompletionPort, \
- PostQueuedCompletionStatus, CloseHandle
- from win32security import SetThreadToken
- from win32event import INFINITE
- from pywintypes import OVERLAPPED
- import threading
- import traceback
- ISAPI_REQUEST = 1
- ISAPI_SHUTDOWN = 2
- class WorkerThread(threading.Thread):
- def __init__(self, extension, io_req_port):
- self.running = False
- self.io_req_port = io_req_port
- self.extension = extension
- threading.Thread.__init__(self)
- # We wait 15 seconds for a thread to terminate, but if it fails to,
- # we don't want the process to hang at exit waiting for it...
- self.setDaemon(True)
- def run(self):
- self.running = True
- while self.running:
- errCode, bytes, key, overlapped = \
- GetQueuedCompletionStatus(self.io_req_port, INFINITE)
- if key == ISAPI_SHUTDOWN and overlapped is None:
- break
- # Let the parent extension handle the command.
- dispatcher = self.extension.dispatch_map.get(key)
- if dispatcher is None:
- raise RuntimeError("Bad request '%s'" % (key,))
-
- dispatcher(errCode, bytes, key, overlapped)
- def call_handler(self, cblock):
- self.extension.Dispatch(cblock)
- # A generic thread-pool based extension, using IO Completion Ports.
- # Sub-classes can override one method to implement a simple extension, or
- # may leverage the CompletionPort to queue their own requests, and implement a
- # fully asynch extension.
- class ThreadPoolExtension(isapi.simple.SimpleExtension):
- "Base class for an ISAPI extension based around a thread-pool"
- max_workers = 20
- worker_shutdown_wait = 15000 # 15 seconds for workers to quit...
- def __init__(self):
- self.workers = []
- # extensible dispatch map, for sub-classes that need to post their
- # own requests to the completion port.
- # Each of these functions is called with the result of
- # GetQueuedCompletionStatus for our port.
- self.dispatch_map = {
- ISAPI_REQUEST: self.DispatchConnection,
- }
- def GetExtensionVersion(self, vi):
- isapi.simple.SimpleExtension.GetExtensionVersion(self, vi)
- # As per Q192800, the CompletionPort should be created with the number
- # of processors, even if the number of worker threads is much larger.
- # Passing 0 means the system picks the number.
- self.io_req_port = CreateIoCompletionPort(-1, None, 0, 0)
- # start up the workers
- self.workers = []
- for i in range(self.max_workers):
- worker = WorkerThread(self, self.io_req_port)
- worker.start()
- self.workers.append(worker)
- def HttpExtensionProc(self, control_block):
- overlapped = OVERLAPPED()
- overlapped.object = control_block
- PostQueuedCompletionStatus(self.io_req_port, 0, ISAPI_REQUEST, overlapped)
- return isapicon.HSE_STATUS_PENDING
- def TerminateExtension(self, status):
- for worker in self.workers:
- worker.running = False
- for worker in self.workers:
- PostQueuedCompletionStatus(self.io_req_port, 0, ISAPI_SHUTDOWN, None)
- # wait for them to terminate - pity we aren't using 'native' threads
- # as then we could do a smart wait - but now we need to poll....
- end_time = time.time() + self.worker_shutdown_wait/1000
- alive = self.workers
- while alive:
- if time.time() > end_time:
- # xxx - might be nice to log something here.
- break
- time.sleep(0.2)
- alive = [w for w in alive if w.is_alive()]
- self.dispatch_map = {} # break circles
- CloseHandle(self.io_req_port)
- # This is the one operation the base class supports - a simple
- # Connection request. We setup the thread-token, and dispatch to the
- # sub-class's 'Dispatch' method.
- def DispatchConnection(self, errCode, bytes, key, overlapped):
- control_block = overlapped.object
- # setup the correct user for this request
- hRequestToken = control_block.GetImpersonationToken()
- SetThreadToken(None, hRequestToken)
- try:
- try:
- self.Dispatch(control_block)
- except:
- self.HandleDispatchError(control_block)
- finally:
- # reset the security context
- SetThreadToken(None, None)
- def Dispatch(self, ecb):
- """Overridden by the sub-class to handle connection requests.
-
- This class creates a thread-pool using a Windows completion port,
- and dispatches requests via this port. Sub-classes can generally
- implement each connection request using blocking reads and writes, and
- the thread-pool will still provide decent response to the end user.
-
- The sub-class can set a max_workers attribute (default is 20). Note
- that this generally does *not* mean 20 threads will all be concurrently
- running, via the magic of Windows completion ports.
-
- There is no default implementation - sub-classes must implement this.
- """
- raise NotImplementedError("sub-classes should override Dispatch")
- def HandleDispatchError(self, ecb):
- """Handles errors in the Dispatch method.
-
- When a Dispatch method call fails, this method is called to handle
- the exception. The default implementation formats the traceback
- in the browser.
- """
- ecb.HttpStatusCode = isapicon.HSE_STATUS_ERROR
- #control_block.LogData = "we failed!"
- exc_typ, exc_val, exc_tb = sys.exc_info()
- limit = None
- try:
- try:
- import cgi
- ecb.SendResponseHeaders("200 OK", "Content-type: text/html\r\n\r\n",
- False)
- print(file=ecb)
- print("<H3>Traceback (most recent call last):</H3>", file=ecb)
- list = traceback.format_tb(exc_tb, limit) + \
- traceback.format_exception_only(exc_typ, exc_val)
- print("<PRE>%s<B>%s</B></PRE>" % (
- cgi.escape("".join(list[:-1])), cgi.escape(list[-1]),), file=ecb)
- except ExtensionError:
- # The client disconnected without reading the error body -
- # its probably not a real browser at the other end, ignore it.
- pass
- except:
- print("FAILED to render the error message!")
- traceback.print_exc()
- print("ORIGINAL extension error:")
- traceback.print_exception(exc_typ, exc_val, exc_tb)
- finally:
- # holding tracebacks in a local of a frame that may itself be
- # part of a traceback used to be evil and cause leaks!
- exc_tb = None
- ecb.DoneWithSession()
|