aboutsummaryrefslogtreecommitdiff
path: root/tests/functional/qemu_test/uncompress.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/functional/qemu_test/uncompress.py')
-rw-r--r--tests/functional/qemu_test/uncompress.py107
1 files changed, 107 insertions, 0 deletions
diff --git a/tests/functional/qemu_test/uncompress.py b/tests/functional/qemu_test/uncompress.py
new file mode 100644
index 0000000..b7ef8f7
--- /dev/null
+++ b/tests/functional/qemu_test/uncompress.py
@@ -0,0 +1,107 @@
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+# Utilities for python-based QEMU tests
+#
+# Copyright 2024 Red Hat, Inc.
+#
+# Authors:
+# Thomas Huth <thuth@redhat.com>
+
+import gzip
+import lzma
+import os
+import stat
+import shutil
+from urllib.parse import urlparse
+from subprocess import run, CalledProcessError
+
+from .asset import Asset
+
+
+def gzip_uncompress(gz_path, output_path):
+ if os.path.exists(output_path):
+ return
+ with gzip.open(gz_path, 'rb') as gz_in:
+ try:
+ with open(output_path, 'wb') as raw_out:
+ shutil.copyfileobj(gz_in, raw_out)
+ except:
+ os.remove(output_path)
+ raise
+
+def lzma_uncompress(xz_path, output_path):
+ if os.path.exists(output_path):
+ return
+ with lzma.open(xz_path, 'rb') as lzma_in:
+ try:
+ with open(output_path, 'wb') as raw_out:
+ shutil.copyfileobj(lzma_in, raw_out)
+ except:
+ os.remove(output_path)
+ raise
+
+
+def zstd_uncompress(zstd_path, output_path):
+ if os.path.exists(output_path):
+ return
+
+ try:
+ run(['zstd', "-f", "-d", zstd_path,
+ "-o", output_path], capture_output=True, check=True)
+ except CalledProcessError as e:
+ os.remove(output_path)
+ raise Exception(
+ f"Unable to decompress zstd file {zstd_path} with {e}") from e
+
+ # zstd copies source archive permissions for the output
+ # file, so must make this writable for QEMU
+ os.chmod(output_path, stat.S_IRUSR | stat.S_IWUSR)
+
+
+'''
+@params compressed: filename, Asset, or file-like object to uncompress
+@params uncompressed: filename to uncompress into
+@params format: optional compression format (gzip, lzma)
+
+Uncompresses @compressed into @uncompressed
+
+If @format is None, heuristics will be applied to guess the format
+from the filename or Asset URL. @format must be non-None if @uncompressed
+is a file-like object.
+
+Returns the fully qualified path to the uncompessed file
+'''
+def uncompress(compressed, uncompressed, format=None):
+ if format is None:
+ format = guess_uncompress_format(compressed)
+
+ if format == "xz":
+ lzma_uncompress(str(compressed), uncompressed)
+ elif format == "gz":
+ gzip_uncompress(str(compressed), uncompressed)
+ elif format == "zstd":
+ zstd_uncompress(str(compressed), uncompressed)
+ else:
+ raise Exception(f"Unknown compression format {format}")
+
+'''
+@params compressed: filename, Asset, or file-like object to guess
+
+Guess the format of @compressed, raising an exception if
+no format can be determined
+'''
+def guess_uncompress_format(compressed):
+ if type(compressed) == Asset:
+ compressed = urlparse(compressed.url).path
+ elif type(compressed) != str:
+ raise Exception(f"Unable to guess compression cformat for {compressed}")
+
+ (name, ext) = os.path.splitext(compressed)
+ if ext == ".xz":
+ return "xz"
+ elif ext == ".gz":
+ return "gz"
+ elif ext in [".zstd", ".zst"]:
+ return 'zstd'
+ else:
+ raise Exception(f"Unknown compression format for {compressed}")