"""
open/DurusWorks/qp/pub/publish.py
"""
from base64 import b64decode
from durus.btree import BTree
from durus.connection import Connection
from durus.error import ConflictError
from durus.file_storage import TempFileStorage, FileStorage
from durus.storage_server import StorageServer
from durus.utils import as_bytes
from os import getpid
from pprint import pformat
from qp.fill.directory import Directory
from qp.fill.form import Form
from qp.hub.web import run_web
from qp.lib.site import Site
from qp.lib.spec import spec, add_getters, specify, match
from qp.lib.tz import UTC
from qp.lib.util import randbytes, urljoin, as_str
from qp.mail.send import Email
from qp.pub.common import get_hit, set_publisher, get_request, get_response
from qp.pub.common import get_user, get_session, site_now
from qp.pub.hit import Hit
from qp.pub.session import Session
from qp.pub.user import User, compute_digest
from qpy import xml
from socket import getfqdn
from sys import exc_info
from traceback import format_exception
import binascii
import qp.lib.site
import sys
class RespondNow (Exception):
"""
This Exception is used to break out of the path traversal.
The Publisher expects the contents of the response to be
set before this exception is raised.
"""
class Publisher (object):
"""
The basic QP Publisher class. This is implements the normal traversal,
page generation, and error handling pattern used in QP. Subclasses may
override fill_response() or other methods to customize this as desired.
"""
site_is = spec(
Site,
"the QP Site")
root_directory_is = spec(
Directory,
"The root directory")
hit_is = spec(
Hit,
"The Hit (Request, Response, Session, etc.) currently being "
"processed, or None.")
test_site_name = "proto"
def __init__(self, site=None, hub=False, **other):
set_publisher(self)
self.set_hit(None)
specify(self, site=site or Site(self.test_site_name))
self.root_directory = None
self.hub = hub
def run_web(site):
"""
Launch the web server(s) for this site.
The default implementation uses the qp web server to
provide http and scgi service if addresses for those
services are present in self.configuration.
"""
run_web(site)
run_web = staticmethod(run_web)
def __call__(self, wsgi_env, wsgi_start_response):
"""(wsgi_env:dict, wsgi_start_response:callable)
This makes it possible to use a Publisher instance as a
wsgi application.
"""
input = wsgi_env[as_bytes('wsgi.input')]
hit = self.process(input, wsgi_env)
response = hit.get_response()
status = as_bytes("%03d %s" % response.get_status())
headers = [(as_bytes(k), as_bytes(v))
for k,v in response.generate_headers()]
wsgi_start_response(status, headers)
return response.generate_body_chunks()
def process(self, stdin, env):
"""
Return a Hit with a complete response.
Subclasses that want to customize the Hit class may do so by overriding
this method.
"""
if env.get('SCRIPT_NAME') and env.get('PATH_INFO') == '':
# The "script" here is your QP application, so the SCRIPT_NAME
# should be the part of the url, after the host name, that
# causes requests to be handled by your QP application.
# The PATH_INFO part of the path after the SCRIPT_NAME.
# We expect applications running at the top of the URL space
# to get requests where the SCRIPT_NAME is the empty string
# and the PATH_INFO starts with a '/'.
# Your HTTP server is not providing the expected CGI environment.
# The full path part of the URL is being put in the SCRIPT_NAME.
# We'll try to make it work anyway, but we are assuming here
# that your application is running at the top of your URL space.
# If you want your application to run somewhere else in your URL
# space, you must change your server so that it provides the
# correct environment variables. Alternatively, your publisher
# class can override this method and hack the environment as
# needed to fit QP's expectations.
env['PATH_INFO'] = env['SCRIPT_NAME']
env['SCRIPT_NAME'] = ""
hit = Hit(stdin, env)
self.process_hit(hit)
return hit
def process_hit(self, hit):
"""(hit:Hit)
This processes the request on the hit and sets the response.
Subclasses that want to customize error handling may do so by
overriding this method.
"""
self.set_hit(hit)
try:
hit.init_response()
self.fill_response()
except RespondNow:
pass
except:
self.handle_exception()
self.log_hit()
def get_fields(self):
try:
return self.get_hit().get_request().get_fields()
except (ValueError, UnicodeDecodeError, IOError):
print(sys.exc_info()[1])
self.log_hit()
self.respond("Bad Request",
"We could not process your request.", status=400)
def fill_response(self):
"""
Set the response. Subclasses that want to use some other
template system may do so by overriding this method.
"""
self.fill_response_using_root_directory()
def set_hit(self, hit):
"""(hit:Hit)
Set the current Hit.
"""
self.hit = hit
def respond_now(self):
"""
This breaks out of path traversal. When this is called, the
response should have already been prepared for sending.
"""
raise RespondNow
def handle_exception(self):
"""
This is called when there is an error exception while processing
a request.
"""
self.log_exception()
if exc_info()[0] is SystemExit:
raise
self.get_hit().init_response()
self.get_hit().get_response().set_status(500)
if self.display_exceptions():
self.display_exception()
else:
self.hide_exception()
def secure(self):
"""If the scheme is not https, redirect so that it will be.
"""
if (get_request().get_scheme() != 'https' and
self.get_site().get_https_address()):
self.redirect(self.complete_url('', secure=True))
def complete_path(self, path):
"""(path:str) -> str
Turn path into a complete path, prepending the script_name if
path starts with a slash.
"""
s = str(path)
if s.startswith('/'):
return get_request().get_script_name() + s
else:
return s
def complete_url(self, path, secure=False):
"""(path:str, secure:bool=False) -> str
Turn path into a complete url to this publisher, changing the
scheme to https if secure is True.
"""
s = str(path)
if self.get_hit() is None:
return s
if not secure and s.startswith('http://'):
return s
if s.startswith('https://'):
return s
if secure:
host, port = self.get_site().get_https_address()[:2]
if not host:
host = get_request().get_server().split(':')[0]
if port == 443:
address = str(host)
else:
address = "%s:%s" % (host, port)
base = 'https://%s%s' % (address,
get_request().get_path_query())
else:
base = get_request().get_url()
return urljoin(base, self.complete_path(s))
def redirect(self, location, permanent=False):
"""(location:str, permanent:boolean=False)
This prepares a redirect response and uses an exception
to break out of the path traversal and return the response
immediately.
"""
if permanent:
status = 301
else:
status = 302
response = self.get_hit().get_response()
response.set_status(status)
response.set_header('location', self.complete_url(location))
response.set_content_type('text/plain', 'iso-8859-1')
response.set_body(
"Your browser should have redirected you to %s" % location)
self.respond_now()
def not_found(self, body=None):
"""(body:str)
This prepares a 404 response and uses an exception
to break out of the path traversal and return the response
immediately.
"""
self.respond('Not Found', body or 'That page is not here.', status=404)
def is_live_host(self):
return self.get_site().get_live_host_name() == getfqdn()
def is_staging_host(self):
return self.get_site().get_staging_host_name() == getfqdn()
def is_hub(self):
"""() -> boolean
Was this Publisher created as part of the qp.hub scgi or http server?
"""
return self.hub
def log_exception(self):
"""
This is called to record an error exception.
"""
report = self.format_exception_report()
print(report)
if self.is_live_host():
self.send_to_administrator(report)
def display_exception(self):
"""
This places an exception report in the response.
"""
get_hit().get_response().set_body(self.format_exception_report())
get_hit().get_response().set_content_type('text/plain', 'iso-8859-1')
def format_user_info_for_log(self):
return '- -'
def format_log_hit_line(self):
"""() -> str
"""
hit = get_hit()
code = hit.get_response().get_status_code()
length = hit.get_response().get_content_length() or 0
mime_type = hit.get_response().get_mime_type() or '-'
charset = (hit.get_response().get_charset() or '-').replace(' ', '_')
duration = str(site_now() - hit.get_time()).lstrip('0:')
request = hit.get_request()
path_query = request.get_path_query()
method = request.get_method()
remote = request.get_remote_address()
scheme = request.get_scheme()
try:
user = self.format_user_info_for_log()
except:
user = "! !"
print("[!%s in format_user_info_for_log()]" % exc_info()[0])
referrer = request.get_header('http-referer', '-').replace(' ', '_')
agent = request.get_header('user-agent', '-').replace(' ', '_')
time = hit.get_time().strftime("%Y-%m-%d %H:%M:%S")
pid = getpid()
msg = ('%(time)s %(code)s %(length)s %(mime_type)s %(charset)s %(duration)s %(remote)s %(user)s %(pid)s '
'%(scheme)s %(method)s %(path_query)s %(referrer)s %(agent)s' %
locals())
return msg
def log_hit(self):
"""
This logs the processing of a request.
"""
print(self.format_log_hit_line())
def format_exception_report(self):
"""() -> str
This returns a string that reports an error exception.
"""
hit = get_hit()
request = hit.get_request()
report = ''.join(format_exception(*exc_info()))
report += '\npath = %r' % request.get_path()
report += '\nquery= %r' % request.get_query()
report += '\n\ninfo:\n' + pformat(hit.get_info())
report += '\n\nvars(request):\n' + pformat(vars(request))
report += '\n'
return report
def hide_exception(self):
"""
This sets the response to be a page that can be shown when there
is an error exception, but you don't want to expose the details
of the exception.
"""
get_hit().get_response().set_status(500)
get_hit().get_response().set_body(
self.page('Regrets',
xml("
This page is temporarily unavailable.
"
"Please try again later.
")))
def get_root_directory(self):
if self.root_directory is None:
self.root_directory = self.site.get_root_directory_class()()
return self.root_directory
def fill_response_using_root_directory(self):
"""
Traverse the components, set the response body, using the
normal _q_traverse() method.
"""
path = get_request().get_path_info()
components = path[1:].split('/')
body = self.get_root_directory()._q_traverse(components)
get_response().set_body(body)
def display_exceptions(self):
"""() -> bool
Should the details of error exceptions be reported in responses?
"""
return not self.is_live_host()
def respond(self, title, *content, **kwargs):
"""(title:str, *content, **kwargs)
Fill the response using the page() method and the given title,
content, and **kwargs and return immediately.
"""
self.get_hit().init_response()
if 'status' in kwargs:
status = kwargs['status']
del kwargs['status']
else:
status = 400
self.get_hit().get_response().set_status(status)
self.get_hit().get_response().set_body(
self.page(title, *content, **kwargs))
self.respond_now()
def respond_plain(self, body):
"""(body:str)
Respond immediately with the given text/plain body.
"""
self.get_hit().get_response().set_status(200)
self.get_hit().get_response().set_content_type('text/plain', 'utf-8')
self.get_hit().get_response().set_body(body)
self.respond_now()
# Email-related methods.
def get_webmaster_address(self):
"""() -> email_address:str
"""
return 'webmaster@%s' % getfqdn()
def get_debug_address(self):
"""() -> email_address:str|None
If this returns a nonempty string, the qp.mail.send.Email
class will use this as the smtp recipient for all messages it
sends. If you want email sent by that class to go to the normal
recipients, you must override this to return None.
"""
if self.is_live_host():
return None
else:
return self.get_webmaster_address()
def is_email_enabled(self):
"""() -> bool
The qp.mail.send.Email class will never send any email unless this
returns true. If you want any email sent (using that class),
you must override this to return True.
"""
return False
def get_smtp_server(self):
"""() -> str
"""
return 'localhost'
def send_to_administrator(self, message):
"""(message:str)
"""
if self.is_email_enabled():
email = Email()
email.set_subject(self.get_site().get_name())
email.set_body(message)
if email.send():
print('\nemail:sent')
# Page formatting methods.
def header(self, title, doctype='xhtml1-strict', style=None, **kwargs):
"""(title, **kwargs) -> xml
Return the site-standard html header.
"""
if doctype == 'xhtml1-transitional':
s = xml(
'')
elif doctype == 'xhtml1-strict':
s = xml(
'')
else:
s = xml()
s += xml('')
s += xml('')
if style is not None:
s += xml('') % style
s += xml('%s') % title
s += xml('') % self.get_agent_class()
return s
def footer(self, *args, **kwargs):
"""(**kwargs) -> xml
Return the site-standard html footer.
"""
return xml('')
def page(self, title, *content, **kwargs):
"""(title, *content, **kwargs) -> xml
Return a page formatted according to the site-standard.
"""
return (self.header(title, **kwargs) +
xml("").join(content) +
self.footer(title=title, **kwargs))
def get_time_zone(self):
return UTC
def format_date_time(self, date):
"""(date : datetime) -> str
"""
if date:
tz = self.get_time_zone()
if tz is not date.tzinfo:
date = date.astimezone(tz)
return date.strftime('%Y-%m-%d %H:%M %Z')
else:
return ''
def format_date(self, date):
"""(date : date) -> str
"""
if date:
return date.strftime('%Y-%m-%d')
else:
return ''
def format_user(self, user):
if user:
return user.get_id()
else:
return ''
def get_platform_class(self):
agent = get_request().get_header('user-agent', '-')
if 'Windows' in agent:
return 'Windows'
if 'Mac OS X' in agent:
return 'OSX'
return 'standard-platform'
def get_agent_class(self):
agent = get_request().get_header('user-agent', '-')
if 'MSIE 5.' in agent:
if 'Mac' in agent:
return 'MacIE5'
else:
return 'IE5'
if 'MSIE 6.' in agent:
return 'IE6'
if 'MSIE 7.' in agent:
return 'IE7'
if 'MSIE 8.' in agent:
return 'IE8'
if 'MSIE 9.' in agent:
return 'IE9'
if 'iPhone' in agent:
return 'iPhone'
if 'iPad' in agent:
return 'iPad'
if 'Android 2.2' in agent:
return 'Android22'
if 'Android 2.3' in agent:
return 'Android23'
if 'Android 3.0' in agent:
return 'Android30'
if 'Android' in agent:
return 'Android'
if 'Firefox' in agent:
return 'Firefox'
return 'standard-agent'
add_getters(Publisher)
class DurusPublisher (Publisher):
"""
This is the standard Publisher class for QP sites that use
a Durus Database.
"""
connection_is = spec(
Connection,
"the Durus Connection")
test_site_name = "proto"
def __init__(self, connection=None, **other):
Publisher.__init__(self, **other)
self.connection = connection or Connection(TempFileStorage())
durus_cache_size = self.get_site().get_durus_cache_size()
self.connection.set_cache_size(durus_cache_size)
self.ensure_database_initialized()
def ensure_database_initialized(self):
"""
Make sure that database root has sessions and users mappings.
If not, make them and commit().
"""
self.ensure_sessions_initialized()
self.ensure_users_initialized()
def ensure_sessions_initialized(self):
if self.get_sessions() is None:
self.get_root()['sessions'] = BTree()
self.commit()
def ensure_users_initialized(self):
if self.get_users() is None:
users = BTree()
users[''] = self.create_user("")
self.get_root()['users'] = users
self.commit()
def run_durus(site):
"""
Launch the durus server for this site.
"""
server = StorageServer(
FileStorage(site.get_durus_file()),
gcbytes=site.get_gcbytes(),
address=site.get_durus_address())
site.ensure_uid_gid_not_root()
server.serve()
run_durus = staticmethod(run_durus)
def create_user(self, user_id):
return User(user_id)
def create_session(self):
return Session()
def commit(self):
"""Commit changes to the database."""
self.get_connection().commit()
def abort(self):
"""
Make sure that the connection has no uncommitted object state.
"""
self.get_connection().abort()
def get_root(self):
"""() -> PersistentDict
Return the root object of the database.
"""
return self.get_connection().get_root()
def get_sessions(self):
"""() -> BTree
Return the session mapping.
"""
return self.get_root().get('sessions')
def get_users(self):
"""() -> BTree
Returns the mapping from user ids to user instances.
"""
return self.get_root().get('users')
def gen_active_users(self):
for user in self.get_users().itervalues():
yield user
def add_user(self, user):
"""(User)
Add a new user to the application.
"""
users = self.get_users()
assert user.get_id() not in users
users[user.get_id()] = user
def lookup_user(self, id=None, email=None, **kwargs):
"""
Returns a User or None if no such user is found.
"""
user = self.get_users().get(id)
if user is not None:
return user
if (email is not None and
match(email, self.get_users().get('').email_is)):
lower_email = email.lower()
for user in self.get_users().itervalues():
user_email = user.get_email()
if user_email is not None and user_email.lower() == email:
return user
return None
# Authentication methods
def sign_out(self, url):
"""(url:str)
Un-authenticate the current user and redirect to `url`.
"""
get_session().clear_authentication()
self.redirect(url)
def delete_current_session(self):
current_session = get_session()
current_key = None
for key, session in self.get_sessions().items():
if session is session:
current_key = key
break
if current_key is not None:
del self.get_sessions()[current_key]
def ensure_signed_in_using_form(self, title='Please Sign In',
realm=None, **kwargs):
"""
Make sure that the current user is signed in.
This presents a form to the user. Because the form transmits the
password, this redirects to the https address before presenting the
form.
The realm, if given, identifies which of the user's passwords
should be used. The default realm is the name of the site.
"""
if not get_user():
self.secure()
form = Form(use_tokens=False)
default_user_id = None
if get_session().get_owner():
default_user_id = get_session().get_owner().get_id()
form.add_string(
'name', value=default_user_id, title="User Name", required=True)
form.add_password('password', title="Password", required=True)
form.add_submit('login', 'Sign in')
def show_form():
self.respond(title, form.render(), status=403, **kwargs)
if not form.is_submitted() or form.has_errors():
show_form()
user = self.get_users().get(form.get('name'))
if user and user.has_password(form.get('password') or '',
realm=realm):
get_session().set_authenticated(user)
self.redirect('')
if not user:
form.set_error('name', 'unknown user')
else:
form.set_error('password', 'wrong password')
show_form()
def ensure_signed_in_using_basic(self, realm=None):
"""
Make sure that the current user is signed in.
This uses HTTP Basic Authentication. Because Basic Authentication
transmits the password, this implementation redirects if necessary
to make sure that the challenge only happens when the scheme is
https.
The realm, if given, identifies which of the user's passwords
should be used. The default realm is the name of the site.
"""
if not get_user():
# Look to see if authentication credentials have been delivered.
authorization = get_request().get_header(
'HTTP_AUTHORIZATION', '').split()
if len(authorization) == 2:
scheme, encoded = authorization
try:
b64decoded = b64decode(as_bytes(encoded))
except binascii.Error:
pass
else:
decoded = as_str(b64decoded).split(':')
if scheme.lower() == 'basic' and len(decoded) == 2:
username, password = decoded
user = self.get_users().get(username)
if user and user.has_password(password, realm=realm):
get_session().set_authenticated(user) # success
if not get_user():
# Issue an authentication challenge.
assert self.get_site().get_https_address(), (
"If you want basic authentication, use https.")
self.secure() # This redirects to https.
if realm is None:
realm = self.get_site().get_name()
self.get_hit().init_response()
self.get_hit().get_response().set_status(401)
self.get_hit().get_response().set_header(
'WWW-Authenticate',
'Basic realm="%s"' % realm)
self.respond_now()
def ensure_signed_in_using_digest(self, realm=None):
"""
Make sure that the current user is signed in.
This uses HTTP Digest Authentication. Since this does not transmit
the password itself, we allow this to work even when https is not
available. If https is available, though, we redirect to it first.
The realm, if given, identifies which of the user's passwords
should be used. The default realm is the name of the site.
"""
if not get_user():
if realm is None:
realm = self.get_site().get_name()
def attempt_digest_authentication():
authorization = get_request().get_header(
'HTTP_AUTHORIZATION', '').split()
if len(authorization) < 5 and len(authorization) > 0:
authorization[-1:] = authorization[-1].split(',')
if len(authorization) < 5:
return
if authorization[0].lower() != 'digest':
return
parameters = {}
for item in authorization[1:]:
item_split = item.split('=', 1)
if len(item_split) != 2:
continue
name, value = item_split
if value[-1] == ',':
value = value[:-1]
if value and value[0]=='"' and value[-1]=='"':
value = value[1:-1]
parameters[name.lower()] = value
username = parameters.get('username', None)
if not username:
return
user = self.get_users().get(username)
if not user:
return
nonce = parameters.get('nonce')
if nonce not in get_user().get_tokens():
return
get_user().get_tokens().remove(nonce)
cnonce = parameters.get('cnonce')
if not cnonce:
return
nc = parameters.get('nc')
if not nc:
return
qop = parameters.get('qop')
if not qop:
return
request_digest = parameters.get('response', '')
if len(request_digest) != 32:
return
realm = parameters.get('realm')
user_digest = user.get_digester().get_digest(realm)
if not user_digest:
return
method = get_request().get_method()
if request_digest == compute_digest(
user_digest,
nonce,
nc,
cnonce,
qop,
compute_digest(method, parameters.get('uri', ''))):
# success
get_session().set_authenticated(user)
attempt_digest_authentication()
if not get_user():
# Issue an authentication challenge.
self.secure()
self.get_hit().init_response()
self.get_hit().get_response().set_status(401)
self.get_hit().get_response().set_body("Do I know you?")
nonce = get_user().get_tokens().new_token()
self.get_hit().get_response().set_header(
'WWW-Authenticate',
('Digest realm="%s", nonce="%s", opaque="0%s", stale=false, '
'algorithm=MD5, qop="auth"' % (realm, nonce, nonce)))
self.respond_now()
def ensure_signed_in(self):
"""
Make sure the user is authenticated using the default method.
"""
self.ensure_signed_in_using_digest()
def get_fields(self):
try:
return Publisher.get_fields(self)
except RespondNow:
self.abort()
raise
def process_hit(self, hit):
"""(hit:Hit)
This processes the request on the hit and sets the response
on the hit. It handles conflict errors and commits changes
when there are no error exceptions. Note that this overrides
a method of the base class.
"""
self.set_hit(hit)
try:
for attempt in range(5):
try:
hit.init_response()
self.abort()
try:
self.fill_response()
except RespondNow:
pass
self.log_hit()
self.commit()
return
except ConflictError:
self.log_exception()
self.abort()
else:
raise RuntimeError("too many Conflict errors")
except:
self.handle_exception()
self.log_hit()
self.abort()
def fill_response(self):
"""
Use the request to prepare the response.
This method takes care of session management, and calls the
fill_response_with_session_present() method to do the rest.
Note that this overrides a method of the base class.
"""
cookie_name = self.get_site().get_name()
cookie = (get_request().get_cookie(cookie_name) or
get_request().get_query())
if len(cookie) != 16: # len of randbytes(8) cookie generated below
cookie = None
session = cookie and self.get_sessions().get(cookie)
if session and not session.is_valid():
del self.get_sessions()[cookie]
session = None
if session is None:
session = self.create_session()
self.get_hit().set_session(session)
session.access()
try:
self.fill_response_using_root_directory()
finally:
session = self.get_hit().get_session()
if session.needs_saving():
idle_session_cookie = session.get_owner().get_idle_session_cookie()
idle_session = None
if idle_session_cookie is not None:
idle_session = self.get_sessions().get(idle_session_cookie)
if idle_session is not None:
if idle_session.get_owner() is not session.get_owner():
idle_session = None
if idle_session_cookie and idle_session and not idle_session.get_effective_user():
# Recycle idle Session.
idle_session.set_authenticated(session.get_effective_user())
cookie = idle_session_cookie
else:
# Save new Session.
cookie = randbytes(8).decode('latin1')
self.get_sessions()[cookie] = session
get_response().set_cookie(
cookie_name, cookie,
path=get_request().get_script_name() or "/",
secure=(get_request().get_scheme() == 'https'))
def format_exception_report(self):
"""() -> str
This returns a string that reports an error exception.
"""
report = "user = %s\n\n" % self.format_user_info_for_log()
report += Publisher.format_exception_report(self)
return report
def format_user_info_for_log(self):
def format_id(user):
user_id = user.get_id()
if user_id == '':
return '-'
try:
return str(user_id).replace(' ', '_')
except UnicodeDecodeError:
return repr(user_id).replace(' ', '_')
session = get_session()
owner = session.get_owner()
owner_part = format_id(owner)
user = session.get_effective_user()
if user.get_id() != '' and user is owner:
user_part = '+'
else:
user_part = format_id(user)
return owner_part + " " + user_part
add_getters(DurusPublisher)