""" open/DurusWorks/qp/pub/user.py """ from durus.persistent import PersistentObject from durus.persistent_dict import PersistentDict from durus.persistent_set import PersistentSet from durus.utils import as_bytes from qp.lib.spec import require, specify, spec, add_getters, sequence, either from qp.lib.spec import pattern, string_classes, optional, string from qp.pub.common import get_publisher from qp.lib.util import rand_str import sys if sys.version < "2.6": from md5 import md5 else: from hashlib import md5 class Permissions (PersistentDict): data_is = { either(*string_classes): sequence(either(PersistentObject, True), PersistentSet)} __slots__ = ['data'] def grant(self, permission, granter): require(permission, either(*string_classes)) require(granter, either(PersistentObject, True)) if permission not in self: self[permission] = PersistentSet([granter]) else: self[permission].add(granter) def ungrant(self, permission, granter): require(permission, either(*string_classes)) require(granter, either(PersistentObject, True)) if self.is_granted(permission, granter): self.data[permission].remove(granter) if len(self.data[permission]) == 0: del self[permission] def is_granted(self, permission, granter): return granter in self.get(permission, []) def compute_digest(*args): x = as_bytes(":".join(str(a) for a in args if a is not None)) return md5(x).hexdigest() class Digester (PersistentObject): digests_is = {either(*string_classes):either(*string_classes)} __slots__ = ['digests'] def __init__(self): self.digests = {} def set_digest(self, realm, digest): self._p_note_change() self.digests[realm] = digest def get_digest(self, realm): return self.digests.get(realm) def get_realms(self): return list(self.digests.keys()) def remove_digest(self, realm): self._p_note_change() del self.digests[realm] def discard_digest(self, realm): if realm in self.digests: self.remove_digest(realm) class TokenSet (PersistentObject): """ A set of randomly generated tokens that have been delivered with forms. These are used to make sure that forms can't be replayed. """ MAX_TOKENS = 16 tokens_is = spec( [str], "The tokens that have been delivered with forms recently.") __slots__ = ['tokens'] def __init__(self): self.clear() def clear(self): self.tokens = [] def __contains__(self, token): return token in self.tokens def remove(self, token): self._p_note_change() self.tokens.remove(token) def discard(self, token): if token in self.tokens: self.remove(token) def new_token(self, bytes=8): token = rand_str(bytes) self.tokens = ([token] + self.tokens)[:self.MAX_TOKENS] return token class User (PersistentObject): id_is = spec( pattern('^[-A-Za-z0-9_@.]*$'), "unique among users here") digester_is = spec( Digester, "holds password hashes") permissions_is = spec( Permissions, "Records permissions granted.") tokens_is = spec( TokenSet, "a nonce dealer") email_is = spec( (None, pattern("^.+@.+\..{2,4}$")), "User's email address") idle_session_cookie_is = optional( string, "If present and not None, the cookie of an unauthenticated Session owned by this User.") __slots__ = ['id', 'digester', 'permissions', 'tokens', 'email', 'idle_session_cookie'] def __init__(self, user_id): specify(self, digester=self.digester_is(), permissions=self.permissions_is(), tokens=self.tokens_is(), email=None) if user_id is None: self.id = None else: specify(self, id=user_id) def __nonzero__(self): """The null user is the one with id == ''. """ return self.id != '' __bool__ = __nonzero__ def set_password(self, password, realm=None): if realm is None: realm = get_publisher().get_site().get_name() digester = self.get_digester() if not password: digester.discard_digest(realm) else: digester.set_digest( realm, compute_digest(self.get_id(), realm, password)) def has_password(self, password, realm=None): if realm is None: realm = get_publisher().get_site().get_name() digester = self.get_digester() digest = digester.get_digest(realm) return bool( digest and digest == compute_digest(self.get_id(), realm, password)) def is_granted(self, permission, other=True): """(permission:str, other:either(Persistent,True)=True) -> bool Does this user have the `permission` granted from the `other`? """ return self.permissions.is_granted(permission, other) def is_admin(self): """() -> bool Has the True granted 'administrator' to this user? """ return self.permissions.is_granted('administrator', True) def get_idle_session_cookie(self): return getattr(self, 'idle_session_cookie', None) def set_idle_session_cookie(self, value): specify(self, idle_session_cookie=value) add_getters(User)