""" open/DurusWorks/qp/lib/profiler.py """ from durus.utils import BytesIO, as_bytes from qp.lib.spec import string_classes from qp.pub.common import get_publisher from tempfile import NamedTemporaryFile import os import sys try: from profile import runctx from pstats import Stats except ImportError: runctx = Stats = None def return_false(*args, **kwargs): return False class Page (object): def __init__(self, PATH_INFO='/', user='', content='', site_name=None, **env): if site_name is None: site_name = os.environ.get('SITE') publisher = get_publisher() if not publisher: from qp.lib.site import Site publisher = Site(site_name).get_publisher() if site_name is not None: assert publisher.get_site().get_name() == site_name assert not publisher.is_live_host() publisher.is_email_enabled = return_false # Nothing can be saved by a test. Make sure. publisher.commit = return_false publisher.abort = return_false publisher.get_connection().get_storage().begin = return_false publisher.get_connection().get_storage().store = return_false publisher.get_connection().get_storage().end = return_false self.publisher = publisher assert get_publisher() is self.publisher if isinstance(user, string_classes): self.user = publisher.get_users().get(user) else: self.user = user env['PATH_INFO'] = PATH_INFO env.setdefault('SCRIPT_NAME', '') env.setdefault('HTTPS', 'on') env.setdefault('SERVER_NAME', 'example.org') env.setdefault('REMOTE_ADDR', '127.0.0.1') if content: env.setdefault('REQUEST_METHOD', 'POST') self.content_bytes = as_bytes(content) env.setdefault('CONTENT_LENGTH', len(self.content_bytes)) else: self.content_bytes = '' env.setdefault('REQUEST_METHOD', 'GET') self.session_id = 'bogus_session_id' env.setdefault('HTTP_COOKIE', '%s=%s' % (publisher.get_site().get_name(), self.session_id)) self.env = env self.stats = None self.response_string = 'not yet processed.' def get_publisher(self): return self.publisher def get_root(self): return self.get_publisher().get_connection().get_root() def ensure_session(self): session = self.publisher.create_session() session.owner = self.user session.effective_user = self.user session.remote_address = self.env.get('REMOTE_ADDR', '127.0.0.1') self.publisher.get_sessions()[self.session_id] = session def process(self): self.ensure_session() self.publisher.process(BytesIO(self.content_bytes), self.env) output = BytesIO() self.hit = self.publisher.get_hit() self.hit.get_response().write(output) self.response_string = output.getvalue() def get_response_string(self): return self.response_string def get_hit(self): return self.publisher.hit def get_status_code(self): return self.get_hit().get_response().get_status_code() def get_response_header(self, name): return self.get_hit().get_response().get_header(name) def profile(self): tmpfile = NamedTemporaryFile(suffix=".profile") context = dict() runctx("self.process()", locals(), context, tmpfile.name) self.stats = Stats(tmpfile.name) self.stats.sort_stats('time', 'calls') tmpfile.close() def get_stats(self): assert self.stats, "Call profile() to creates stats." return self.stats def sort(self, *columns): self.get_stats().sort_stats(*columns) def format(self, *restrictions): stdout = sys.stdout output = BytesIO() try: sys.stdout = output self.get_stats().print_stats(*restrictions) finally: sys.stdout = stdout return output.getvalue() def calls(self, function_name): return [value[0] for key, value in self.get_stats().stats.items() if key[-1] == function_name] def loads(self): return sum(self.calls('_p_load_state')) def callers(self, *args): self.get_stats().print_callers(*args) def callees(self, *args): self.get_stats().print_callees(*args) def __str__(self): if self.stats: self.sort('time', 'calls') return self.format(10) else: return self.get_response_string() def Profile (*args, **kwargs): page = Page(*args, **kwargs) page.profile() return page