""" open/dulcinea/lib/archive.py """ from qp.lib.stored_file import new_file from durus.utils import BytesIO from os.path import basename from qp.fill.static import FileStream from qp.pub.common import not_found, get_user, get_hit import mimetypes import tarfile import zipfile import struct class ArchiveWrapper (object): def __init__(self, path_to_archive): self.path_to_archive = path_to_archive self.archive = None def get_archive(self): if not self.archive: self.archive = self.open_archive(self.path_to_archive) return self.archive def open_archive(self, path_to_archive): """(path_to_archive : str) Return a reference to archive reading class saved on self.archive """ raise NotImplementedError def get_names(self): """() -> [str] Return a list of file and directory names in the order they appear in the archive. Directory names end with a slash '/' character. """ raise NotImplementedError def has_member(self, member_name): """(name : str) -> bool True if member is in the archive """ raise NotImplementedError def get_member_file(self, member_name): """(member_name : str) -> file """ raise NotImplementedError def get_member_response(self, member_name): """(member_name : str) -> str | Stream Return the member named by member_name as a string or if member is streamable as a Stream """ raise NotImplementedError def get_member(self, member_name): """(member_name : str) -> str Return the member named by member_name as a string. """ raise NotImplementedError def get_member_stored_file(self, member_name): """(member_name : str) -> StoredFile | None Return the member named by member_name as a StoredFile """ if not self.has_member(member_name): return None stored_file = new_file(self.get_member_file(member_name)) filename = basename(member_name) stored_file.set_mime_type(mimetypes.guess_type(filename.lower())[0]) stored_file.set_filename(filename) if get_hit(): stored_file.set_owner(get_user()) return stored_file class TarFileWrapper (ArchiveWrapper): def open_archive(self, path_to_archive): try: return tarfile.open(path_to_archive, 'r:gz') except tarfile.ReadError: try: return tarfile.open(path_to_archive, 'r:') except: not_found("Can't open tar file") def get_names(self): fixed_names = [] for tarinfo in self.get_archive().getmembers(): name = tarinfo.name if name.endswith('//'): fixed_names.append(name[:-1]) else: if not name.endswith('/') and tarinfo.isdir(): name += '/' fixed_names.append(name) return fixed_names def has_member(self, member_name): if member_name.endswith('/'): try: member = self.get_archive().getmember(member_name) except (KeyError, NameError): member = None if member is None: try: member = self.get_archive().getmember(member_name[:-1]) except (KeyError, NameError): member = None if member is None: try: member = self.get_archive().getmember(member_name + '/') except (KeyError, NameError): member = None return member and member.isdir() else: try: member = self.get_archive().getmember(member_name) return member.isfile() except (KeyError, NameError): return False def get_member_file(self, member_name): try: return self.get_archive().extractfile(member_name) except KeyError: return None return None def get_member_response(self, member_name): return FileStream(self.get_member_file(member_name)) def get_member(self, member_name): return self.get_member_file(member_name).read() class NearZipFile (zipfile.ZipFile): """ This is identical to a standard ZipFile, except that does not fail when opening archives that have "extra" fields of length 1, 2, or 3. """ def _RealGetContents(self): """Read in the table of contents for the ZIP file.""" fp = self.fp endrec = zipfile._EndRecData(fp) if not endrec: raise zipfile.BadZipfile("File is not a zip file") if self.debug > 1: print(endrec) size_cd = endrec[zipfile._ECD_SIZE] # bytes in central directory offset_cd = endrec[zipfile._ECD_OFFSET] # offset of central directory self.comment = endrec[zipfile._ECD_COMMENT] # archive comment # "concat" is zero, unless zip was concatenated to another file concat = endrec[zipfile._ECD_LOCATION] - size_cd - offset_cd if endrec[zipfile._ECD_SIGNATURE] == zipfile.stringEndArchive64: # If Zip64 extension structures are present, account for them concat -= (zipfile.sizeEndCentDir64 + zipfile.sizeEndCentDir64Locator) if self.debug > 2: inferred = concat + offset_cd print("given, inferred, offset %s %s %s" % (offset_cd, inferred, concat)) # self.start_dir: Position of start of central directory self.start_dir = offset_cd + concat fp.seek(self.start_dir, 0) data = fp.read(size_cd) fp = BytesIO(data) total = 0 while total < size_cd: centdir = fp.read(zipfile.sizeCentralDir) if centdir[0:4] != zipfile.stringCentralDir: raise zipfile.BadZipfile("Bad magic number for central directory") centdir = struct.unpack(zipfile.structCentralDir, centdir) if self.debug > 2: print(centdir) filename = fp.read(centdir[zipfile._CD_FILENAME_LENGTH]) # Create ZipInfo instance to store file information x = zipfile.ZipInfo(filename) x.extra = fp.read(centdir[zipfile._CD_EXTRA_FIELD_LENGTH]) x.comment = fp.read(centdir[zipfile._CD_COMMENT_LENGTH]) x.header_offset = centdir[zipfile._CD_LOCAL_HEADER_OFFSET] (x.create_version, x.create_system, x.extract_version, x.reserved, x.flag_bits, x.compress_type, t, d, x.CRC, x.compress_size, x.file_size) = centdir[1:12] x.volume, x.internal_attr, x.external_attr = centdir[15:18] # Convert date/time code to (year, month, day, hour, min, sec) x._raw_time = t x.date_time = ( (d>>9)+1980, (d>>5)&0xF, d&0x1F, t>>11, (t>>5)&0x3F, (t&0x1F) * 2 ) if len(x.extra) >= 4: ## This is the added check. x._decodeExtra() x.header_offset = x.header_offset + concat x.filename = x._decodeFilename() self.filelist.append(x) self.NameToInfo[x.filename] = x # update total bytes read from central directory total = (total + zipfile.sizeCentralDir + centdir[zipfile._CD_FILENAME_LENGTH] + centdir[zipfile._CD_EXTRA_FIELD_LENGTH] + centdir[zipfile._CD_COMMENT_LENGTH]) if self.debug > 2: print("total %s" % total) class ZipFileWrapper (ArchiveWrapper): def open_archive(self, path_to_archive): if not zipfile.is_zipfile(path_to_archive): not_found('Not a zip file') try: return NearZipFile(path_to_archive, 'r') except zipfile.error: not_found("Can't open zip file") def get_names(self): names = self.get_archive().namelist() directories = set() for name in names: parts = name.split('/') for j in range(len(parts)): directories.add('/'.join(parts[:j]) + '/') for d in directories: if d not in names: names.append(d) return names def has_member(self, member_name): try: return bool(self.get_archive().getinfo(member_name)) except KeyError: return False def get_member_file(self, member_name): return BytesIO(self.get_member_response(member_name)) def get_member_response(self, member_name): return self.get_archive().read(member_name) def get_member(self, member_name): return self.get_archive().read(member_name)