""" open/dulcinea/lib/stored_file.py Store a file in the filesystem. """ from durus.persistent import PersistentObject from os.path import exists from qp.http.request import Upload from qp.lib.spec import spec, specify, add_getters_and_setters, either from qp.lib.spec import string, Specified, datetime_with_tz, optional from qp.lib.util import rand_str, get_output from qp.pub.common import get_config_value, site_now from qp.pub.user import User import errno import os, grp, sys try: import pypdf from pypdf.pdf import PDFReader, PDFWriter except ImportError: pypdf = None try: from PIL import Image except ImportError: Image = None digits = '0123456789' lowercase = 'abcdefghijklmnopqrstuvwxyz' ALLOWABLE_SPECIAL_CHARS = "+-_.@" def path_check(path): """(path:string) -> string | None Return a lower-case and normalized version of the 'path' after checking for allowable characters. Return None if path has '..' or any special characters other than what's in lowercase, digits, and the ALLOWABLE_SPECIAL_CHARS string """ if '..' in path: return None path = path.lower() for letter in path: if (letter != '/' and letter not in lowercase and letter not in digits and letter not in ALLOWABLE_SPECIAL_CHARS): return None return path def get_file_store(): return get_config_value('file_store', '/var/tmp') class StoredFile (PersistentObject, Specified): """ Class for reading from or writing to a stored file A StoredFile is a wrapper around a file in a place in the filesystem known as the 'file_store'. StoredFiles can be instantiated with a fresh source file to move a new file into the file_store, or instantiated by referencing an existing file in the file_store. """ path_is = spec( string, "Path to the physical file containing the stored file's data.") filename_is = spec( string, "Filename to use when downloading this file") mime_type_is = spec( either(None, string), "MIME type of the contents of this file.") charset_is = spec( (None, string), "The charset encoding of the file, if known.") description_is = spec( (string, None), "Description of this file.") owner_is = spec( (User, None), "The file's owner") date_is = spec( datetime_with_tz, "timestamp") sha512_digest_is = optional( string, "If present, the computed sha512 digest.") hidden_is = optional( bool, "Should this attachment be restricted to users with manage access?") def __init__(self, path): path = os.path.normpath(path) self.path = path_check(path) assert self.path, 'Bad name: %r' % path specify(self, description=None, owner=None, date=site_now(), filename=os.path.basename(self.path), mime_type=None, charset=None) def get_hidden(self): return getattr(self, 'hidden', False) def get_id(self): return self.path.replace('/', '') def get_ids(self): return [self.get_id()] def open(self): """() -> file Returns a Python file object opened for reading. This can be used to read the file's content. """ return open(self.get_full_path(), 'rb') def compute_sha512_digest(self): fp = self.open() from hashlib import sha512 # Not available in python2.4 digester = sha512() while 1: chunk = fp.read(10000) if not chunk: break digester.update(chunk) fp.close() return digester.hexdigest() def get_sha512_digest(self): if not hasattr(self, 'sha512_digest'): self.set_sha512_digest(self.compute_sha512_digest()) return self.sha512_digest def chgrp(self, grp_id): """(grp_id : string | int) Change the category ownership of this file to the specified ID. """ if isinstance(grp_id, string): grnam, grpass, gid, grmembers = grp.getgrnam(grp_id) grp_id = gid os.chown(self.get_full_path(), -1, grp_id) def set_mime_type(self, mime_type): """(mime_type:string) Sets the MIME type for the file. """ # Check there's a single '/' in the MIME type if mime_type.count('/') != 1: raise ValueError("Invalid MIME type %r" % mime_type) specify(self, mime_type = mime_type) def get_stat(self): """() -> stat instance Return the size of the file, measured in bytes, or None if the file doesn't exist. """ path = self.get_full_path() if not os.path.exists(path): return None return os.stat(path) def get_size(self): """() -> int Return the size of the file, measured in bytes, or None if the file doesn't exist. """ stat = self.get_stat() if stat is None: return None else: return stat.st_size def get_mtime(self): """() -> int Return the mtime of the file, or None if the file doesn't exist. """ stat = self.get_stat() if stat is None: return None else: return stat.st_mtime def get_full_path(self): """() -> string """ return os.path.join(get_file_store(), self.path) def has_manage_access(self, user): return (user is self.owner or user.is_admin()) def is_image(self): return self.get_mime_type().startswith('image') def get_image(self): if Image and self.is_image() and self.exists(): return Image.open(self.get_full_path()) def get_image_width_height(self): image = self.get_image() if image: return image.size def is_mp4(self): return self.get_mime_type() == 'video/mp4' def is_mp4_with_ogv(self): return self.is_mp4() and exists(self.get_ogv_path()) def get_ogv_path(self): if self.is_mp4(): return self.get_full_path() + '.ogv' def create_ogv(self, force=False): if self.is_mp4() and self.exists(): converter = '/usr/local/bin/ffmpeg2theora' if exists(converter): if force or not exists(self.get_ogv_path()): command = "%s %s --nice 5 -o %s" % (converter, self.get_full_path(), self.get_ogv_path()) result = get_output(command, verbose=True, include_stderr=True) if exists(self.get_ogv_path()): return self.get_ogv_path() else: print(result) return 0 def exists(self): return exists(self.get_full_path()) def is_pdf(self): return self.get_mime_type() in ['application/pdf', 'application/x-pdf'] def get_png_page_path(self, page): if self.is_pdf(): return "%s_%s.png" % (self.get_full_path(), page) def is_pdf_with_png_pages(self): return self.is_pdf() and exists(self.get_png_page_path(1)) def list_existing_png_pages(self, max_pages=1000): result = [] for j in range(1, max_pages): png_page_path = self.get_png_page_path(j) if png_page_path and exists(png_page_path): result.append(png_page_path.replace(self.get_full_path(), '')) return result def create_png_pages(self, force=False): if self.is_pdf() and self.exists(): if not force and self.is_pdf_with_png_pages(): return True if exists('/usr/bin/sips'): command_skel = "/usr/bin/sips -s format png %s --out %s" elif exists('/usr/bin/convert'): command_skel = "/usr/bin/convert %s -resize 1600x1100 -enhance %s" else: command_skel = None if command_skel and pypdf: try: reader = PDFReader(self.open()) num_pages = reader.get_num_pages() except: return None if num_pages: for n in range(1, num_pages + 1): page_file = "%s_%s.pdf" % (self.get_full_path(), n) if not exists(page_file): page = reader.get_page(n-1) writer = PDFWriter() writer.add_page(page) f = file(page_file, "wb") writer.write(f) f.close() png_file = self.get_png_page_path(n) if not exists(png_file): command = command_skel % (page_file, png_file) result = get_output(command, verbose=True) if not exists(png_file): print(command, result) return 0 return num_pages def create_variants(self): self.create_png_pages() self.create_ogv() def create_easy_variants(self): self.create_png_pages() add_getters_and_setters(StoredFile) def get_random_path(): h = rand_str(6) return '%s/%s/%s' % (h[-1], h[-3:-1], h[0:-3]) def new_file(fp): root_directory = get_file_store() flags = os.O_WRONLY|os.O_CREAT|os.O_EXCL try: flags |= os.O_BINARY # for Windows except AttributeError: pass for attempt in range(1000): path = get_random_path() full_path = os.path.join(root_directory, path) destdir = os.path.dirname(full_path) if not os.path.exists(destdir): os.makedirs(destdir, mode=int("775", 8)) try: fd = os.open(full_path, flags) except OSError: exc = sys.exc_info()[1] if exc.errno == errno.EEXIST: pass # destination exists, retry else: raise # some other error else: break else: raise RuntimeError("Unable to make temp file.") if isinstance(fp, Upload) and exists(fp.get_full_path()): fp.move(full_path) os.chmod(full_path, int('0644', 8)) os.close(fd) else: if isinstance(fp, Upload): print("copying Upload to StoredFile") dest_fp = os.fdopen(fd, "wb") while 1: chunk = fp.read(10000) if not chunk: break dest_fp.write(chunk) if hasattr(fp, 'seek'): fp.seek(0) # seek back, just in case this needs to be repeated. dest_fp.close() return StoredFile(path) def fix_stored_file_classes(connection): ''' Update_db function to convert all StoredFile instances and references ''' from qp.lib.stored_file import StoredFile from durus.connection import gen_every_instance, touch_every_reference touch_every_reference(connection, 'StoredFile') for stored_file in gen_every_instance(connection, StoredFile): stored_file._p_note_change()