""" open/DurusWorks/qp/pub/session.py """ from datetime import timedelta from durus.persistent import PersistentObject from qp.lib.spec import specify, spec, either, add_getters, datetime_with_tz from qp.lib.spec import string_classes from qp.pub.common import get_request, get_publisher, site_now, get_hit from qp.pub.user import User import socket class Session (PersistentObject): """ Class attribute: lease_time: timedelta The amount of time for which an authentication is valid. access_period: timedelta The amount of time that must have elapsed since the last access time was recorded for a new access time to be recorded. """ lease_time = timedelta(days=30) access_period = timedelta(hours=1) remote_address_is = spec( either(None, *string_classes), "The remote address of the request when this session was created.") authentication_time_is = spec( either(None, datetime_with_tz), "The time of the current authentication into this session, or " "None if there is no current authentication.") effective_user_is = spec( either(None, User), "The user that should be used in ui code at this time. " "This is the null user if the authentication_time is None.") owner_is = spec( either(None, User), "The last user to have authenticated into this session. " "If no user has ever authenticated, this is the null user. " "The id of this user is used to initialize the value of the " "username field on a login form.") access_time_is = spec( either(None, datetime_with_tz), "The most recent recorded time of the last access to this session.") __slots__ = ['remote_address', 'authentication_time', 'effective_user', 'owner', 'access_time'] def __init__(self): null_user = get_publisher().get_users()[""] specify(self, authentication_time=None, access_time=None, remote_address=get_hit() and get_request().get_remote_address(), owner=null_user, effective_user=null_user) def get_cookie(self): for cookie, session in get_publisher().get_sessions().iteritems(): if session is self: return cookie def clear_authentication(self): """ Remove the current authentication record. Set the authentication-time to None, Set the effective_user to be the null user. """ if self.get_effective_user() is self.get_owner(): self.get_effective_user().set_idle_session_cookie(self.get_cookie()) specify(self, authentication_time=None, effective_user=get_publisher().get_users()[""]) def is_valid(self): """() -> bool Can this Session continue as authenticated? """ if self.remote_address != get_request().get_remote_address(): if str(get_request().get_remote_address()).startswith('127.'): return True try: local_ip_addresses = [ ip for ip in socket.gethostbyname_ex(socket.gethostname())[2]] except socket.gaierror: try: print("\nsocket.gethostname(): %s" % socket.gethostname()) except socket.gaierror: print("\nsocket.gethostname(): failed") return False if get_request().get_remote_address() not in local_ip_addresses: return False if self.authentication_time is None: return True if self.authentication_time + self.lease_time < site_now(): return False return True def set_authenticated(self, user): """(user:User) Called when the User has just delivered valid evidence of authenticity. """ specify(self, effective_user=user, owner=user, authentication_time=site_now(), remote_address=get_hit() and get_request().get_remote_address()) def set_effective_user(self, user): """(user:User) This is a way to set the effective user *without* changing the authentication time. This allows administrative users to temporarily act as other users. """ specify(self, effective_user=user) def needs_saving(self): """Should this session be *added* to database?""" if self._p_durus_id: # This is already in the Durus database return False if not self.owner: # Don't bother maintaining sessions for anonymous users. return False return True def access(self): """Called when this becomes the session of a request.""" now = site_now() if (self.access_time is None or self.access_time + self.access_period < now): self.access_time = now add_getters(Session)