# SPDX-License-Identifier: GPL-2.0-or-later # # Utilities for python-based QEMU tests # # Copyright 2024 Red Hat, Inc. # # Authors: # Thomas Huth import gzip import lzma import os import stat import shutil from urllib.parse import urlparse from subprocess import check_call, CalledProcessError from .asset import Asset def gzip_uncompress(gz_path, output_path): if os.path.exists(output_path): return with gzip.open(gz_path, 'rb') as gz_in: try: with open(output_path, 'wb') as raw_out: shutil.copyfileobj(gz_in, raw_out) except: os.remove(output_path) raise def lzma_uncompress(xz_path, output_path): if os.path.exists(output_path): return with lzma.open(xz_path, 'rb') as lzma_in: try: with open(output_path, 'wb') as raw_out: shutil.copyfileobj(lzma_in, raw_out) except: os.remove(output_path) raise def zstd_uncompress(zstd_path, output_path): if os.path.exists(output_path): return try: check_call(['zstd', "-f", "-d", zstd_path, "-o", output_path]) except CalledProcessError as e: os.remove(output_path) raise Exception( f"Unable to decompress zstd file {zstd_path} with {e}") from e # zstd copies source archive permissions for the output # file, so must make this writable for QEMU os.chmod(output_path, stat.S_IRUSR | stat.S_IWUSR) ''' @params compressed: filename, Asset, or file-like object to uncompress @params uncompressed: filename to uncompress into @params format: optional compression format (gzip, lzma) Uncompresses @compressed into @uncompressed If @format is None, heuristics will be applied to guess the format from the filename or Asset URL. @format must be non-None if @uncompressed is a file-like object. Returns the fully qualified path to the uncompessed file ''' def uncompress(compressed, uncompressed, format=None): if format is None: format = guess_uncompress_format(compressed) if format == "xz": lzma_uncompress(str(compressed), uncompressed) elif format == "gz": gzip_uncompress(str(compressed), uncompressed) elif format == "zstd": zstd_uncompress(str(compressed), uncompressed) else: raise Exception(f"Unknown compression format {format}") ''' @params compressed: filename, Asset, or file-like object to guess Guess the format of @compressed, raising an exception if no format can be determined ''' def guess_uncompress_format(compressed): if type(compressed) == Asset: compressed = urlparse(compressed.url).path elif type(compressed) != str: raise Exception(f"Unable to guess compression cformat for {compressed}") (name, ext) = os.path.splitext(compressed) if ext == ".xz": return "xz" elif ext == ".gz": return "gz" elif ext in [".zstd", ".zst"]: return 'zstd' else: raise Exception(f"Unknown compression format for {compressed}")