# SPDX-License-Identifier: GPL-2.0-or-later # # Utilities for python-based QEMU tests # # Copyright 2024 Red Hat, Inc. # # Authors: # Thomas Huth import os from subprocess import check_call, run, DEVNULL import tarfile from urllib.parse import urlparse import zipfile from .asset import Asset def tar_extract(archive, dest_dir, member=None): with tarfile.open(archive) as tf: if hasattr(tarfile, 'data_filter'): tf.extraction_filter = getattr(tarfile, 'data_filter', (lambda member, path: member)) if member: tf.extract(member=member, path=dest_dir) else: tf.extractall(path=dest_dir) def cpio_extract(archive, output_path): cwd = os.getcwd() os.chdir(output_path) # Not passing 'check=True' as cpio exits with non-zero # status if the archive contains any device nodes :-( if type(archive) == str: run(['cpio', '-i', '-F', archive], stdout=DEVNULL, stderr=DEVNULL) else: run(['cpio', '-i'], input=archive.read(), stdout=DEVNULL, stderr=DEVNULL) os.chdir(cwd) def zip_extract(archive, dest_dir, member=None): with zipfile.ZipFile(archive, 'r') as zf: if member: zf.extract(member=member, path=dest_dir) else: zf.extractall(path=dest_dir) def deb_extract(archive, dest_dir, member=None): cwd = os.getcwd() os.chdir(dest_dir) try: proc = run(['ar', 't', archive], check=True, capture_output=True, encoding='utf8') file_path = proc.stdout.split()[2] check_call(['ar', 'x', archive, file_path], stdout=DEVNULL, stderr=DEVNULL) tar_extract(file_path, dest_dir, member) finally: os.chdir(cwd) ''' @params archive: filename, Asset, or file-like object to extract @params dest_dir: target directory to extract into @params member: optional member file to limit extraction to Extracts @archive into @dest_dir. All files are extracted unless @member specifies a limit. If @format is None, heuristics will be applied to guess the format from the filename or Asset URL. @format must be non-None if @archive is a file-like object. ''' def archive_extract(archive, dest_dir, format=None, member=None): if format is None: format = guess_archive_format(archive) if type(archive) == Asset: archive = str(archive) if format == "tar": tar_extract(archive, dest_dir, member) elif format == "zip": zip_extract(archive, dest_dir, member) elif format == "cpio": if member is not None: raise Exception("Unable to filter cpio extraction") cpio_extract(archive, dest_dir) elif format == "deb": if type(archive) != str: raise Exception("Unable to use file-like object with deb archives") deb_extract(archive, dest_dir, "./" + member) else: raise Exception(f"Unknown archive format {format}") ''' @params archive: filename, or Asset to guess Guess the format of @compressed, raising an exception if no format can be determined ''' def guess_archive_format(archive): if type(archive) == Asset: archive = urlparse(archive.url).path elif type(archive) != str: raise Exception(f"Unable to guess archive format for {archive}") if ".tar." in archive or archive.endswith("tgz"): return "tar" elif archive.endswith(".zip"): return "zip" elif archive.endswith(".cpio"): return "cpio" elif archive.endswith(".deb") or archive.endswith(".udeb"): return "deb" else: raise Exception(f"Unknown archive format for {archive}")