""" open/DurusWorks/qp/lib/util.py """ from binascii import hexlify from datetime import datetime from subprocess import Popen, PIPE import os import sys if sys.version < "3": from cStringIO import StringIO from urlparse import urljoin from rfc822 import Message as message_from_file else: from io import StringIO from urllib.parse import urljoin from email import message_from_file if sys.version < "2.6": from sha import new as sha1 else: from hashlib import sha1 _used = [StringIO, sha1, urljoin, message_from_file] # to quiet code checker. def as_str(x): if type(x) is type(''): return x else: return x.decode('latin1') def integer(arg, default=None): try: return int(arg) except (ValueError, TypeError): return default def import_object(name): """ Import and return the object with the given name. """ i = name.rfind('.') if i != -1: module_name = name[:i] object_name = name[i+1:] __import__(module_name) try: return getattr(sys.modules[module_name], object_name) except AttributeError: pass __import__(name) return sys.modules[name] def randbytes(bytes): """Return bits of random data as a hex string.""" return hexlify(os.urandom(bytes)) def rand_str(n): return as_str(randbytes(n)) def rand_int(bits): bytes, remainder = divmod(bits, 8) if remainder == 0: return int(randbytes(bytes), 16) else: return int(randbytes(bytes + 1), 16) >> (8 - remainder) def trace(f): def trace(*args, **kwargs): start = datetime.now() # no tz, ok name = getattr(f, 'func_name', None) or getattr(f, '__name__') s = '%s(' % name for arg in args: s += "%r, " % arg for k, v in kwargs.items(): s += "%s=%r, " % (k, v) if args or kwargs: s = s[:-2] s += ')' print("\n" + s) e = None try: result = f(*args, **kwargs) except: e = sys.exc_info()[0] duration = datetime.now() - start # no tz, ok print('%s finished in %s -> %r' % (s, duration, e or result)) if e is not None: print(e) raise e return result return trace def get_status_output(cmd, input=None, include_stderr=False, verbose=False): if type(cmd) is not list: cmd = cmd.split() if verbose: print(" ".join(cmd)) p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE) stdout_output, stderr_output = p.communicate(input=input) if include_stderr and stderr_output: output = stderr_output + '\n' + stdout_output else: output = stdout_output return p.returncode, output def get_output(cmd, input=None, include_stderr=False, verbose=False): return get_status_output(cmd, input=input, include_stderr=include_stderr, verbose=verbose)[1] def get_memory_usage(): output = get_output("ps -p %s -orss,vsz" % os.getpid()) rsz, vsz = output.split()[-2:] return int(rsz), int(vsz)