""" open/DurusWorks/durus/storage.py """ from collections import deque from durus.serialize import unpack_record, split_durus_ids, extract_class_name from durus.utils import int8_to_str import durus.connection class Storage (object): """ This is the interface that Connection requires for Storage. """ def __init__(self): raise RuntimeError("Storage is abstract") def load(self, durus_id): """Return the record for this durus_id. Raises a KeyError if there is no such record. May also raise a ReadConflictError. """ raise NotImplementedError def begin(self): """ Begin a commit. """ raise NotImplementedError def store(self, durus_id, record): """Include this record in the commit underway.""" raise NotImplementedError def end(self, handle_invalidations=None): """Conclude a commit. This may raise a ConflictError. """ raise NotImplementedError def sync(self): """() -> [durus_id:str] Return a list of durus_ids that should be invalidated. """ raise NotImplementedError def new_durus_id(self): """() -> durus_id:str Return an unused durus_id. Used by Connection for serializing new persistent instances. """ raise NotImplementedError def close(self): """Clean up as needed. """ def get_packer(self): """ Return an incremental packer (a generator), or None if this storage does not support incremental packing. Used by StorageServer. """ return None def pack(self): """If this storage supports it, remove obsolete records.""" return None def bulk_load(self, durus_ids): """(durus_ids:sequence(durus_id:str)) -> sequence(record:str) """ for durus_id in durus_ids: yield self.load(durus_id) def gen_durus_id_record(self, start_durus_id=None, batch_size=100): """(start_durus_id:str = None, batch_size:int = 100) -> sequence((durus_id:str, record:str)) Returns a generator for the sequence of (durus_id, record) pairs. If a start_durus_id is given, the resulting sequence follows a breadth-first traversal of the object graph, starting at the given start_durus_id. This uses the storage's bulk_load() method because that is faster in some cases. The batch_size argument sets the number of object records loaded on each call to bulk_load(). If no start_durus_id is given, the sequence may include durus_ids and records that are not reachable from the root. """ if start_durus_id is None: start_durus_id = durus.connection.ROOT_DURUS_ID todo = deque([start_durus_id]) seen = set() while todo: batch = [] while todo and len(batch) < batch_size: durus_id = todo.popleft() if durus_id not in seen: batch.append(durus_id) seen.add(durus_id) for record in self.bulk_load(batch): durus_id, data, refdata = unpack_record(record) yield durus_id, record for ref in split_durus_ids(refdata): if ref not in seen: todo.append(ref) def gen_referring_durus_id_record(storage, referred_durus_id): """(storage:Storage, referred_durus_id:str) -> sequence([durus_id:str, record:str]) Generate durus_id, record pairs for all objects that include a reference to the `referred_durus_id`. """ for durus_id, record in storage.gen_durus_id_record(): if referred_durus_id in split_durus_ids(unpack_record(record)[2]): yield durus_id, record def gen_durus_id_class(storage, *classes): """(storage:Storage, classes:(str)) -> sequence([(durus_id:str, class_name:str)]) Generate a sequence of durus_id, class_name pairs. If classes are provided, only output pairs for which the class_name is in `classes`. """ for durus_id, record in storage.gen_durus_id_record(): class_name = extract_class_name(record) if not classes or class_name in classes: yield durus_id, class_name def get_census(storage): """(storage:Storage) -> {class_name:str, instance_count:int}""" result = {} for durus_id, class_name in gen_durus_id_class(storage): result[class_name] = result.get(class_name, 0) + 1 return result def get_reference_index(storage): """(storage:Storage) -> {durus_id:str : [referring_durus_id:str]} Return a full index giving the referring durus_ids for each durus_id. This might be large. """ result = {} for durus_id, record in storage.gen_durus_id_record(): for ref in split_durus_ids(unpack_record(record)[2]): result.setdefault(ref, []).append(durus_id) return result class MemoryStorage (Storage): """ A concrete Storage that keeps everything in memory. This may be useful for testing purposes. """ def __init__(self): self.records = {} self.transaction = None self.durus_id = -1 def new_durus_id(self): self.durus_id += 1 return int8_to_str(self.durus_id) def load(self, durus_id): return self.records[durus_id] def begin(self): self.transaction = {} def store(self, durus_id, record): self.transaction[durus_id] = record def end(self, handle_invalidations=None): self.records.update(self.transaction) self.transaction = None def sync(self): return []