""" open/DurusWorks/qp/hub/dispatcher.py For systems that support passfd, this provides a pre-forking server that uses file descriptor passing to off-load requests to child worker processes. """ try: from qp.hub import passfd from qp.hub.passfd import recvfd, sendfd, socketpair except ImportError: # If passfd is not installed, we can still run in non-forking mode. passfd = None from durus.utils import as_bytes from qp.lib.util import as_str from select import select, error as select_error from errno import EPIPE, EWOULDBLOCK, EINTR from fcntl import fcntl, F_GETFL, F_SETFL from socket import SOCK_STREAM, AF_UNIX, SOL_SOCKET, SO_REUSEADDR, AF_INET from socket import socket, fromfd, MSG_PEEK, error as socket_error from stat import ST_MTIME from os import close, getpid, fork, O_NONBLOCK, read, write, waitpid, stat from os import environ, WNOHANG, execve, kill import errno import re import signal import sys def log(s): sys.stdout.write("hub[%s] %s\n" % (getpid(), s)) sys.stdout.flush() def select_ready(fds): try: return select(fds, [], [], None)[0] except select_error: e = sys.exc_info()[1] if e[0] == EINTR: # got a signal, try again return [] raise class Child (object): """ This represents a child process that is running a service loop. """ __slots__ = ['pid', 'fd', 'client_address', 'client_cookie', 'ready_for_connection', 'close_has_been_called'] def __init__(self, pid, child_fd): self.pid = pid self.fd = child_fd self.client_address = None self.client_cookie = None self.ready_for_connection = False self.close_has_been_called = False def fileno(self): return self.fd def close(self): """ Close the connection to the child process. """ self.close_has_been_called = True try: close(self.fd) except OSError: log("close() failed for %s" % self.pid) def wait(self): """ Wait for the child process to terminate. """ log("Wait for %s" % self.pid) waitpid(self.pid, 0) log("%s is done." % self.pid) def kill(self): try: kill(self.pid, signal.SIGKILL) log("kill() %s succeeded" % self.pid) except OSError: log("kill() %s failed" % self.pid) self.close() def is_ready_for_connection(self): return self.ready_for_connection def update_ready_for_connection(self): if not self.ready_for_connection: ready_byte = '' try: ready_byte = read(self.fd, 1) except socket_error: exc = sys.exc_info()[1] if exc[0] != EWOULDBLOCK: raise except (OSError, IOError): pass if not ready_byte: self.close() else: self.ready_for_connection = (ready_byte == as_bytes('1')) def send_connection(self, conn, connection_address, connection_cookie): if not self.is_ready_for_connection(): return False try: sendfd(self.fd, conn.fileno()) except IOError: exc = sys.exc_info()[1] if exc.errno == EPIPE: return False else: raise self.client_address = connection_address self.client_cookie = connection_cookie self.ready_for_connection = False conn.close() return True def get_peername_address(connection): try: address = connection.getpeername()[0] except socket_error: log("get_peername_address: %s" % sys.exc_info()[1]) address = None return address class HubServer (object): banned_msg = ( 'HTTP/1.0 403 Forbidden\r\n' 'Content-Type: text/plain\r\n\r\n' 'Your IP address has been banned from accessing this site.\n' 'Please email the webmaster here if you need the ban removed.') def __init__(self, site_name, max_children=5, banned=None, busy_limit=1, dispatcher_command_prefix=None): """(site_name:string, max_children:int=5, banned:str|None=None, busy_limit:int=1, dispatcher_command_prefix:str|None=None) max_children is the maximum number of child processes. If the value of max_children is 0, the server runs in non-forking mode. banned is the path to a file, if any, containing IP addresses from which connections are not permitted. busy_limit is the maximum number of processes that may be kept busy by connections from any single IP address. dispatcher_command_prefix is a uri prefix for dispatcher commands, if dispatcher commands are enabled. """ self.site_name = site_name if passfd: self.max_children = max_children else: # We can't pass file descriptors. # Fall back to non-forking mode. log("No passfd installed. Using non-forking mode.") self.max_children = 0 self.children = [] self.banned = banned self.banned_time = -1 self.banned_addresses = set() self.connection_queue = [] self.busy_limit = busy_limit self.dispatcher_command_prefix = dispatcher_command_prefix def spawn_child(self): """ Unless we have reached the maximum number, start up a new child process. """ if len(self.children) < self.max_children: parent_fd, child_fd = socketpair(AF_UNIX, SOCK_STREAM) # make child fd non-blocking flags = fcntl(child_fd, F_GETFL, 0) fcntl(child_fd, F_SETFL, flags | O_NONBLOCK) pid = fork() if pid == 0: # child close(child_fd) for s in self.listening_sockets + self.connection_queue: s.close() import qp.hub.web args = [sys.executable, qp.hub.web.__file__, self.site_name, str(parent_fd)] execve(sys.executable, args, environ) close(parent_fd) self.children.append(Child(pid, child_fd)) log("Child [%s] %s of %s started." % ( pid, len(self.children), self.max_children)) def reap_children(self): """ Check to see if any of the child processes have died, and if so, remove them from the list. """ while self.children: (pid, status) = waitpid(-1, WNOHANG) if pid <= 0: break for child in self.children: if child.pid == pid: log("reap %s" % pid) child.close() self.children.remove(child) break def do_stop(self): """ This may be called from a signal handler to stop the child processes. """ log("stopping children") for child in self.children: child.close() for child in self.children: child.wait() self.children = [] log("children stopped") remote_address_re = re.compile('.*REMOTE_ADDR.([.\d]+)') x_forwarded_for_re = re.compile('.*X_FORWARDED_FOR: ([.\d]+)') def get_connection_headers(self, connection): try: connection.settimeout(0.1) headers = connection.recv(2048, MSG_PEEK) connection.settimeout(3) except socket_error: exc_type, exc_value, exc_tb = sys.exc_info() if str(exc_value) != 'timed out': log("in get_connection_headers: %s" % exc_value) headers = '' headers = as_str(headers) #log("in get_connection_headers: %r" % headers) return headers def get_connection_address(self, connection, headers): if hasattr(connection, 'ssl_version'): return get_peername_address(connection) address = None address_match = (self.remote_address_re.match(headers) or self.x_forwarded_for_re.match(headers)) if address_match: address = address_match.group(1) else: address = get_peername_address(connection) return address cookie_re = re.compile('.*[Cc][Oo][Oo][Kk][Ii][Ee]:?[\x00| ]([^\r\n\x00]+)') def get_connection_cookie(self, headers): cookie_search = self.cookie_re.search(headers) if cookie_search: cookie = cookie_search.group(1) else: cookie = None #log("in get_connection_cookie: %r" % cookie) return cookie def delegated(self, connection): """ Try to find a suitable child process to handle this connection, and pass it to the child. If sucessful, return True. Otherwise, return False and wait for another chance. """ headers = self.get_connection_headers(connection) connection_address = self.get_connection_address(connection, headers) if connection_address in self.banned_addresses: # Shut up. Go away. connection.send(self.banned_msg) connection.close() return True connection_cookie = self.get_connection_cookie(headers) children_with_this_address_and_cookie = [] if connection_cookie is None and connection_address in (None, '127.0.0.1'): for child in self.children: if child.send_connection(connection, None, connection_cookie): return True else: children_with_this_address_and_cookie = [ child for child in self.children if child.client_address == connection_address and child.client_cookie == connection_cookie] for child in children_with_this_address_and_cookie: if child.send_connection(connection, connection_address, connection_cookie): return True if len(children_with_this_address_and_cookie) < self.busy_limit: for child in self.children: if ((child.client_address != connection_address or child.client_cookie != connection_cookie) and child.send_connection(connection, connection_address, connection_cookie)): return True # Maybe a child process died? self.reap_children() if len(children_with_this_address_and_cookie) < self.busy_limit: # Spawn a new child if we haven't reached the max_children limit. self.spawn_child() return False def listen(self, *addresses): """ Set self.listening_sockets to a list of sockets, bound the the given addresses and listening for new connections. To allow quick restarts without re-binding sockets, this method will, instead of doing the usual thing, use an environment variable to identify file descriptors for sockets that are presumed to already be bound to the given set of addresses. """ name = 'QP_HUBSERVER_FILE_DESCRIPTORS' sockets = [] addresses = [a for a in addresses if a] if environ.get(name): fds = [int(c) for c in environ.get(name).split(',')] assert len(fds) == len(addresses) sockets = [fromfd(fd, AF_INET, SOCK_STREAM) for fd in fds] if not sockets: def get_socket_bound_to_address(address): s = socket(AF_INET, SOCK_STREAM) s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) s.bind(address[:2]) s.listen(128) log("LISTEN %s" % repr(address)) return s sockets = [get_socket_bound_to_address(address) for address in addresses] environ[name] = ','.join([str(s.fileno()) for s in sockets]) self.listening_sockets = sockets def update_banned_addresses(self): """ Consult the file named by self.banned, if any, and update self.banned_addresses to reflect the current content of the file. """ if self.banned: try: mtime = stat(self.banned)[ST_MTIME] except OSError: pass else: if mtime > self.banned_time: self.banned_time = mtime f = open(self.banned) new_banned_addresses = set( [a.strip() for a in f.read().split()]) f.close() for address in (self.banned_addresses - new_banned_addresses): log('UNBANNED: %s' % address) for address in (new_banned_addresses - self.banned_addresses): log('BANNED: %s' % address) self.banned_addresses = new_banned_addresses def remove_hogs(self): connections_by_ip_address = {} for c in self.connection_queue: address = self.get_connection_address(c) if address not in connections_by_ip_address: connections_by_ip_address[address] = [] connections_by_ip_address[address].append(c) def value_length(x): return len(x[1]) by_frequency = sorted( connections_by_ip_address.items(), key=value_length) hog_address, hogs = by_frequency[-1] log("REMOVE %s HOGS FROM %s" % (len(hogs), hog_address)) try: for c in hogs: c.close() except: pass # Don't let the server die for this. for child in list(self.children): if child.client_address == hog_address: child.kill() self.children.remove(child) self.connection_queue = [ c for c in self.connection_queue if c not in hogs] def run(self, handle_connection=None): """ The main service loop of the parent. """ while True: if not handle_connection and not self.children: self.spawn_child() if len(self.connection_queue) > 50: self.remove_hogs() self.children = [ child for child in self.children if not child.close_has_been_called] # Accept any new connections. to_check = self.children + self.listening_sockets self.update_banned_addresses() if len(self.connection_queue) > 0: log("connections in queue: %s." % len(self.connection_queue)) # Wait until there is something to read. ready = select_ready(to_check) for s in ready: if s in self.listening_sockets: try: conn, addr = s.accept() except socket_error: e = sys.exc_info()[1] if e[0] != EINTR: # signal raise self.connection_queue.append(conn) # Go ahead and read the ready-byte from children that # appear to have sent it. for child in self.children: if child in ready: child.update_ready_for_connection() # Now try to actually handle connections in the queue. if handle_connection and self.max_children == 0: # We are in non-forking mode. Handle waiting connections now. for c in self.connection_queue: handle_connection(c) c.close() self.connection_queue = [] elif self.connection_queue: self.connection_queue = [ c for c in self.connection_queue if not self.delegated(c)] def worker(handle_connection, parent_fd): parent_fd = int(parent_fd) while True: try: # Tell the parent that we are ready. write(parent_fd, as_bytes("1")) # Receive the descriptor for the connection fd = recvfd(parent_fd) except (IOError, OSError): # The parent probably exited # (EPIPE comes thru as OSError). log("Terminating %s." % getpid()) raise SystemExit # Make a blocking socket from the file descriptor. conn = fromfd(fd, AF_INET, SOCK_STREAM) conn.setblocking(1) close(fd) handle_connection(conn) conn.close()