""" open/DurusWorks/qp/fill/static.qpy The original versions of StaticFile and StaticDirectory were contributed to Quixote by Hamish Lawson. """ from qp.fill.directory import Directory from qp.fill.html import href from qp.http.response import Stream, formatdate from qp.pub.common import get_request, get_response, get_path, not_found from qp.pub.common import header, footer import mimetypes import os import sys class FileStream (Stream): CHUNK_SIZE = 20000 def __iter__(self): if self.range is None: while 1: yield self.next() else: for chunk in self.iter_range(): yield chunk def iter_range(self): first_byte, last_byte = self.range length = self.length fp = self.iterable if (first_byte is None or last_byte is None) and length is None: raise ValueError("cannot satisfy open-ended range request with file stream that has no length") if first_byte is None: first_byte = length - last_byte last_byte = length - 1 if first_byte is None or last_byte is None or (length is not None and last_byte >= length): last_byte = length - 1 fp.seek(first_byte) while True: if self.CHUNK_SIZE < last_byte - first_byte + 1: chunk_size = self.CHUNK_SIZE else: chunk_size = last_byte - first_byte + 1 yield fp.read(chunk_size) if chunk_size < self.CHUNK_SIZE: break first_byte += self.CHUNK_SIZE def next(self): chunk = self.iterable.read(self.CHUNK_SIZE) if not chunk: raise StopIteration else: return chunk class StaticFile (object): """ Wrapper for a static file on the filesystem. """ def __init__(self, path, mime_type=None, encoding=None, cache_time=None, cache_control=None, charset='iso-8859-1'): """ If omitted, the MIME type will be guessed, defaulting to text/plain. If a non-None cache_time value is provided, it will be used to set an Expires header in the response. """ if not os.path.isabs(path): raise ValueError("Path %r is not absolute" % path) self.path = path guess_mime, guess_enc = mimetypes.guess_type( os.path.basename(path), strict=False) self.mime_type = mime_type or guess_mime or 'text/plain' self.encoding = encoding or guess_enc or None self.cache_time = cache_time self.cache_control = cache_control self.charset = charset def __call__(self): try: stat = os.stat(self.path) except OSError: print(sys.exc_info()[1]) not_found() gz_path = self.path + '.gz' try: gz_stat = os.stat(gz_path) if gz_stat.st_mtime < stat.st_mtime: print("\nRecompress %s" % self.path) gz_stat = None except OSError: gz_stat = None if self.cache_time is None: get_response().set_expires(None) # don't set the Expires header else: # explicitly allow client to cache page by setting the Expires # header, this is even more efficient than the using # Last-Modified/If-Modified-Since since the browser does not need # to contact the server get_response().set_expires(seconds=self.cache_time) if self.cache_control is not None: get_response().set_header("Cache-Control", self.cache_control) last_modified = formatdate(stat.st_mtime) if last_modified == get_request().get_header('If-Modified-Since'): # handle exact match of If-Modified-Since header get_response().set_status(304) return '' get_response().set_header('Last-Modified', last_modified) get_response().set_content_type(self.mime_type, self.charset) file_path = self.path size = stat.st_size if self.encoding: get_response().set_header("Content-Encoding", self.encoding) elif gz_stat: get_response().set_header("Content-Encoding", "gzip") get_response().set_header("Vary", "Accept-Encoding") file_path = gz_path size = gz_stat.st_size return FileStream(open(file_path, 'rb'), size) class StaticDirectory(Directory): """ Wrap a filesystem directory containing static files. """ file_class = StaticFile def __init__(self, path, list_directory=False, cache_time=None, cache_control=None, index_filenames=None, follow_links=False, charset='iso-8859-1'): """ Initialize instance with the absolute path to the file. If 'list_directory' is true, users can request a directory listing. Optional parameter cache_time allows setting of Expires header in response object (see note for StaticFile for more detail). Optional parameter 'index_filenames' specifies a list of filenames to be used as index files in the directory. First file found searching left to right is returned. """ if not os.path.isabs(path): raise ValueError("Path %r is not absolute" % path) self.path = path self.list_directory = list_directory self.cache_time = cache_time self.cache_control = cache_control self.index_filenames = index_filenames self.follow_links = follow_links self.charset = charset def generate_visible_names(self): files = sorted(os.listdir(self.path)) for name in files: if name == '.svn': continue if name == '.htaccess': continue if name.endswith('~'): continue file_path = os.path.join(self.path, name) if not self.follow_links and os.path.islink(file_path): continue yield name def get_exports(self): if self.list_directory: yield ('', '_q_index', os.path.basename(self.path), None) for name in self.generate_visible_names(): if os.path.isdir(os.path.join(self.path, name)): yield (name, None, name + '/', None) else: yield (name, None, name, None) def get_static_index(self): for name in self.index_filenames or []: obj = self._q_lookup(name) if (not isinstance(obj, StaticDirectory) and hasattr(obj, '__call__')): return obj() return None def _q_index:xml(self): index = self.get_static_index() if index: return index header(get_path()) '

%s

' % get_path() br = '
' if len(get_path()) > 1: href("..", "..") + br for export, name, crumb, title in self.get_exports(): if not export: continue href(export, crumb) br footer() def _q_lookup(self, name): """ Get a file from the filesystem directory and return the StaticFile or StaticDirectory wrapper of it; use caching if that is in use. """ if name in ('.', '..'): print("Attempt to use %r" % name) return None item_filepath = os.path.join(self.path, name) if not self.follow_links and os.path.islink(item_filepath): return None if os.path.isdir(item_filepath): return self.__class__( item_filepath, list_directory=self.list_directory, cache_time=self.cache_time, cache_control=self.cache_control, index_filenames=self.index_filenames, follow_links=self.follow_links, charset=self.charset) elif os.path.isfile(item_filepath): return self.file_class( item_filepath, cache_time=self.cache_time, charset=self.charset, cache_control=self.cache_control) return None