aboutsummaryrefslogtreecommitdiff
path: root/tests/functional/qemu_test
diff options
context:
space:
mode:
Diffstat (limited to 'tests/functional/qemu_test')
-rw-r--r--tests/functional/qemu_test/__init__.py12
-rw-r--r--tests/functional/qemu_test/archive.py117
-rw-r--r--tests/functional/qemu_test/asset.py87
-rw-r--r--tests/functional/qemu_test/cmd.py159
-rw-r--r--tests/functional/qemu_test/config.py12
-rw-r--r--tests/functional/qemu_test/decorators.py151
-rw-r--r--tests/functional/qemu_test/linuxkernel.py52
-rw-r--r--tests/functional/qemu_test/ports.py55
-rw-r--r--tests/functional/qemu_test/tesseract.py20
-rw-r--r--tests/functional/qemu_test/testcase.py272
-rw-r--r--tests/functional/qemu_test/tuxruntest.py136
-rw-r--r--tests/functional/qemu_test/uncompress.py107
-rw-r--r--tests/functional/qemu_test/utils.py65
13 files changed, 1061 insertions, 184 deletions
diff --git a/tests/functional/qemu_test/__init__.py b/tests/functional/qemu_test/__init__.py
index f33282e..6e666a0 100644
--- a/tests/functional/qemu_test/__init__.py
+++ b/tests/functional/qemu_test/__init__.py
@@ -7,8 +7,14 @@
from .asset import Asset
-from .config import BUILD_DIR
-from .cmd import has_cmd, has_cmds, run_cmd, is_readable_executable_file, \
+from .config import BUILD_DIR, dso_suffix
+from .cmd import is_readable_executable_file, \
interrupt_interactive_console_until_pattern, wait_for_console_pattern, \
- exec_command, exec_command_and_wait_for_pattern, get_qemu_img
+ exec_command, exec_command_and_wait_for_pattern, get_qemu_img, which
from .testcase import QemuBaseTest, QemuUserTest, QemuSystemTest
+from .linuxkernel import LinuxKernelTest
+from .decorators import skipIfMissingCommands, skipIfNotMachine, \
+ skipFlakyTest, skipUntrustedTest, skipBigDataTest, skipSlowTest, \
+ skipIfMissingImports, skipIfOperatingSystem, skipLockedMemoryTest
+from .archive import archive_extract
+from .uncompress import uncompress
diff --git a/tests/functional/qemu_test/archive.py b/tests/functional/qemu_test/archive.py
new file mode 100644
index 0000000..c803fda
--- /dev/null
+++ b/tests/functional/qemu_test/archive.py
@@ -0,0 +1,117 @@
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+# Utilities for python-based QEMU tests
+#
+# Copyright 2024 Red Hat, Inc.
+#
+# Authors:
+# Thomas Huth <thuth@redhat.com>
+
+import os
+from subprocess import check_call, run, DEVNULL
+import tarfile
+from urllib.parse import urlparse
+import zipfile
+
+from .asset import Asset
+
+
+def tar_extract(archive, dest_dir, member=None):
+ with tarfile.open(archive) as tf:
+ if hasattr(tarfile, 'data_filter'):
+ tf.extraction_filter = getattr(tarfile, 'data_filter',
+ (lambda member, path: member))
+ if member:
+ tf.extract(member=member, path=dest_dir)
+ else:
+ tf.extractall(path=dest_dir)
+
+def cpio_extract(archive, output_path):
+ cwd = os.getcwd()
+ os.chdir(output_path)
+ # Not passing 'check=True' as cpio exits with non-zero
+ # status if the archive contains any device nodes :-(
+ if type(archive) == str:
+ run(['cpio', '-i', '-F', archive],
+ stdout=DEVNULL, stderr=DEVNULL)
+ else:
+ run(['cpio', '-i'],
+ input=archive.read(),
+ stdout=DEVNULL, stderr=DEVNULL)
+ os.chdir(cwd)
+
+def zip_extract(archive, dest_dir, member=None):
+ with zipfile.ZipFile(archive, 'r') as zf:
+ if member:
+ zf.extract(member=member, path=dest_dir)
+ else:
+ zf.extractall(path=dest_dir)
+
+def deb_extract(archive, dest_dir, member=None):
+ cwd = os.getcwd()
+ os.chdir(dest_dir)
+ try:
+ proc = run(['ar', 't', archive],
+ check=True, capture_output=True, encoding='utf8')
+ file_path = proc.stdout.split()[2]
+ check_call(['ar', 'x', archive, file_path],
+ stdout=DEVNULL, stderr=DEVNULL)
+ tar_extract(file_path, dest_dir, member)
+ finally:
+ os.chdir(cwd)
+
+'''
+@params archive: filename, Asset, or file-like object to extract
+@params dest_dir: target directory to extract into
+@params member: optional member file to limit extraction to
+
+Extracts @archive into @dest_dir. All files are extracted
+unless @member specifies a limit.
+
+If @format is None, heuristics will be applied to guess the format
+from the filename or Asset URL. @format must be non-None if @archive
+is a file-like object.
+'''
+def archive_extract(archive, dest_dir, format=None, member=None):
+ if format is None:
+ format = guess_archive_format(archive)
+ if type(archive) == Asset:
+ archive = str(archive)
+
+ if format == "tar":
+ tar_extract(archive, dest_dir, member)
+ elif format == "zip":
+ zip_extract(archive, dest_dir, member)
+ elif format == "cpio":
+ if member is not None:
+ raise Exception("Unable to filter cpio extraction")
+ cpio_extract(archive, dest_dir)
+ elif format == "deb":
+ if type(archive) != str:
+ raise Exception("Unable to use file-like object with deb archives")
+ deb_extract(archive, dest_dir, "./" + member)
+ else:
+ raise Exception(f"Unknown archive format {format}")
+
+'''
+@params archive: filename, or Asset to guess
+
+Guess the format of @compressed, raising an exception if
+no format can be determined
+'''
+def guess_archive_format(archive):
+ if type(archive) == Asset:
+ archive = urlparse(archive.url).path
+ elif type(archive) != str:
+ raise Exception(f"Unable to guess archive format for {archive}")
+
+ if ".tar." in archive or archive.endswith("tgz"):
+ return "tar"
+ elif archive.endswith(".zip"):
+ return "zip"
+ elif archive.endswith(".cpio"):
+ return "cpio"
+ elif archive.endswith(".deb") or archive.endswith(".udeb"):
+ return "deb"
+ else:
+ raise Exception(f"Unknown archive format for {archive}")
diff --git a/tests/functional/qemu_test/asset.py b/tests/functional/qemu_test/asset.py
index d3be2af..704b84d 100644
--- a/tests/functional/qemu_test/asset.py
+++ b/tests/functional/qemu_test/asset.py
@@ -8,14 +8,23 @@
import hashlib
import logging
import os
-import subprocess
+import stat
import sys
import unittest
import urllib.request
from time import sleep
from pathlib import Path
from shutil import copyfileobj
+from urllib.error import HTTPError
+class AssetError(Exception):
+ def __init__(self, asset, msg, transient=False):
+ self.url = asset.url
+ self.msg = msg
+ self.transient = transient
+
+ def __str__(self):
+ return "%s: %s" % (self.url, self.msg)
# Instances of this class must be declared as class level variables
# starting with a name "ASSET_". This enables the pre-caching logic
@@ -39,23 +48,38 @@ class Asset:
return "Asset: url=%s hash=%s cache=%s" % (
self.url, self.hash, self.cache_file)
+ def __str__(self):
+ return str(self.cache_file)
+
def _check(self, cache_file):
if self.hash is None:
return True
if len(self.hash) == 64:
- sum_prog = 'sha256sum'
+ hl = hashlib.sha256()
elif len(self.hash) == 128:
- sum_prog = 'sha512sum'
+ hl = hashlib.sha512()
else:
- raise Exception("unknown hash type")
+ raise AssetError(self, "unknown hash type")
- checksum = subprocess.check_output(
- [sum_prog, str(cache_file)]).split()[0]
- return self.hash == checksum.decode("utf-8")
+ # Calculate the hash of the file:
+ with open(cache_file, 'rb') as file:
+ while True:
+ chunk = file.read(1 << 20)
+ if not chunk:
+ break
+ hl.update(chunk)
+
+ return self.hash == hl.hexdigest()
def valid(self):
return self.cache_file.exists() and self._check(self.cache_file)
+ def fetchable(self):
+ return not os.environ.get("QEMU_TEST_NO_DOWNLOAD", False)
+
+ def available(self):
+ return self.valid() or self.fetchable()
+
def _wait_for_other_download(self, tmp_cache_file):
# Another thread already seems to download the asset, so wait until
# it is done, while also checking the size to see whether it is stuck
@@ -94,8 +118,9 @@ class Asset:
self.cache_file, self.url)
return str(self.cache_file)
- if os.environ.get("QEMU_TEST_NO_DOWNLOAD", False):
- raise Exception("Asset cache is invalid and downloads disabled")
+ if not self.fetchable():
+ raise AssetError(self,
+ "Asset cache is invalid and downloads disabled")
self.log.info("Downloading %s to %s...", self.url, self.cache_file)
tmp_cache_file = self.cache_file.with_suffix(".download")
@@ -105,6 +130,20 @@ class Asset:
with tmp_cache_file.open("xb") as dst:
with urllib.request.urlopen(self.url) as resp:
copyfileobj(resp, dst)
+ length_hdr = resp.getheader("Content-Length")
+
+ # Verify downloaded file size against length metadata, if
+ # available.
+ if length_hdr is not None:
+ length = int(length_hdr)
+ fsize = tmp_cache_file.stat().st_size
+ if fsize != length:
+ self.log.error("Unable to download %s: "
+ "connection closed before "
+ "transfer complete (%d/%d)",
+ self.url, fsize, length)
+ tmp_cache_file.unlink()
+ continue
break
except FileExistsError:
self.log.debug("%s already exists, "
@@ -117,10 +156,23 @@ class Asset:
tmp_cache_file)
tmp_cache_file.unlink()
continue
+ except HTTPError as e:
+ tmp_cache_file.unlink()
+ self.log.error("Unable to download %s: HTTP error %d",
+ self.url, e.code)
+ # Treat 404 as fatal, since it is highly likely to
+ # indicate a broken test rather than a transient
+ # server or networking problem
+ if e.code == 404:
+ raise AssetError(self, "Unable to download: "
+ "HTTP error %d" % e.code)
+ continue
except Exception as e:
- self.log.error("Unable to download %s: %s", self.url, e)
tmp_cache_file.unlink()
- raise
+ raise AssetError(self, "Unable to download: " % e)
+
+ if not os.path.exists(tmp_cache_file):
+ raise AssetError(self, "Download retries exceeded", transient=True)
try:
# Set these just for informational purposes
@@ -134,9 +186,10 @@ class Asset:
if not self._check(tmp_cache_file):
tmp_cache_file.unlink()
- raise Exception("Hash of %s does not match %s" %
- (self.url, self.hash))
+ raise AssetError(self, "Hash does not match %s" % self.hash)
tmp_cache_file.replace(self.cache_file)
+ # Remove write perms to stop tests accidentally modifying them
+ os.chmod(self.cache_file, stat.S_IRUSR | stat.S_IRGRP)
self.log.info("Cached %s at %s" % (self.url, self.cache_file))
return str(self.cache_file)
@@ -153,7 +206,13 @@ class Asset:
for name, asset in vars(test.__class__).items():
if name.startswith("ASSET_") and type(asset) == Asset:
log.info("Attempting to cache '%s'" % asset)
- asset.fetch()
+ try:
+ asset.fetch()
+ except AssetError as e:
+ if not e.transient:
+ raise
+ log.error("%s: skipping asset precache" % e)
+
log.removeHandler(handler)
def precache_suite(suite):
diff --git a/tests/functional/qemu_test/cmd.py b/tests/functional/qemu_test/cmd.py
index 3acd617..dc5f422 100644
--- a/tests/functional/qemu_test/cmd.py
+++ b/tests/functional/qemu_test/cmd.py
@@ -14,77 +14,93 @@
import logging
import os
import os.path
-import subprocess
-from .config import BUILD_DIR
-
-def has_cmd(name, args=None):
- """
- This function is for use in a @skipUnless decorator, e.g.:
-
- @skipUnless(*has_cmd('sudo -n', ('sudo', '-n', 'true')))
- def test_something_that_needs_sudo(self):
- ...
- """
-
- if args is None:
- args = ('which', name)
-
- try:
- _, stderr, exitcode = run_cmd(args)
- except Exception as e:
- exitcode = -1
- stderr = str(e)
-
- if exitcode != 0:
- cmd_line = ' '.join(args)
- err = f'{name} required, but "{cmd_line}" failed: {stderr.strip()}'
- return (False, err)
- else:
- return (True, '')
-
-def has_cmds(*cmds):
- """
- This function is for use in a @skipUnless decorator and
- allows checking for the availability of multiple commands, e.g.:
-
- @skipUnless(*has_cmds(('cmd1', ('cmd1', '--some-parameter')),
- 'cmd2', 'cmd3'))
- def test_something_that_needs_cmd1_and_cmd2(self):
- ...
+def which(tool):
+ """ looks up the full path for @tool, returns None if not found
+ or if @tool does not have executable permissions.
"""
+ paths=os.getenv('PATH')
+ for p in paths.split(os.path.pathsep):
+ p = os.path.join(p, tool)
+ if os.access(p, os.X_OK):
+ return p
+ return None
- for cmd in cmds:
- if isinstance(cmd, str):
- cmd = (cmd,)
+def is_readable_executable_file(path):
+ return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK)
- ok, errstr = has_cmd(*cmd)
- if not ok:
- return (False, errstr)
+# @test: functional test to fail if @failure is seen
+# @vm: the VM whose console to process
+# @success: a non-None string to look for
+# @failure: a string to look for that triggers test failure, or None
+#
+# Read up to 1 line of text from @vm, looking for @success
+# and optionally @failure.
+#
+# If @success or @failure are seen, immediately return True,
+# even if end of line is not yet seen. ie remainder of the
+# line is left unread.
+#
+# If end of line is seen, with neither @success or @failure
+# return False
+#
+# If @failure is seen, then mark @test as failed
+def _console_read_line_until_match(test, vm, success, failure):
+ msg = bytes([])
+ done = False
+ while True:
+ c = vm.console_socket.recv(1)
+ if c is None:
+ done = True
+ test.fail(
+ f"EOF in console, expected '{success}'")
+ break
+ msg += c
- return (True, '')
+ if success in msg:
+ done = True
+ break
+ if failure and failure in msg:
+ done = True
+ vm.console_socket.close()
+ test.fail(
+ f"'{failure}' found in console, expected '{success}'")
-def run_cmd(args):
- subp = subprocess.Popen(args,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- universal_newlines=True)
- stdout, stderr = subp.communicate()
- ret = subp.returncode
+ if c == b'\n':
+ break
- return (stdout, stderr, ret)
+ console_logger = logging.getLogger('console')
+ try:
+ console_logger.debug(msg.decode().strip())
+ except:
+ console_logger.debug(msg)
-def is_readable_executable_file(path):
- return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK)
+ return done
def _console_interaction(test, success_message, failure_message,
send_string, keep_sending=False, vm=None):
assert not keep_sending or send_string
+ assert success_message or send_string
+
if vm is None:
vm = test.vm
- console = vm.console_file
- console_logger = logging.getLogger('console')
+
+ test.log.debug(
+ f"Console interaction: success_msg='{success_message}' " +
+ f"failure_msg='{failure_message}' send_string='{send_string}'")
+
+ # We'll process console in bytes, to avoid having to
+ # deal with unicode decode errors from receiving
+ # partial utf8 byte sequences
+ success_message_b = None
+ if success_message is not None:
+ success_message_b = success_message.encode()
+
+ failure_message_b = None
+ if failure_message is not None:
+ failure_message_b = failure_message.encode()
+
while True:
if send_string:
vm.console_socket.sendall(send_string.encode())
@@ -92,25 +108,15 @@ def _console_interaction(test, success_message, failure_message,
send_string = None # send only once
# Only consume console output if waiting for something
- if success_message is None and failure_message is None:
+ if success_message is None:
if send_string is None:
break
continue
- try:
- msg = console.readline().decode().strip()
- except UnicodeDecodeError:
- msg = None
- if not msg:
- continue
- console_logger.debug(msg)
- if success_message is None or success_message in msg:
+ if _console_read_line_until_match(test, vm,
+ success_message_b,
+ failure_message_b):
break
- if failure_message and failure_message in msg:
- console.close()
- fail = 'Failure message found in console: "%s". Expected: "%s"' % \
- (failure_message, success_message)
- test.fail(fail)
def interrupt_interactive_console_until_pattern(test, success_message,
failure_message=None,
@@ -135,6 +141,7 @@ def interrupt_interactive_console_until_pattern(test, success_message,
:param interrupt_string: a string to send to the console before trying
to read a new line
"""
+ assert success_message
_console_interaction(test, success_message, failure_message,
interrupt_string, True)
@@ -149,6 +156,7 @@ def wait_for_console_pattern(test, success_message, failure_message=None,
:param success_message: if this message appears, test succeeds
:param failure_message: if this message appears, test fails
"""
+ assert success_message
_console_interaction(test, success_message, failure_message, None, vm=vm)
def exec_command(test, command):
@@ -177,6 +185,7 @@ def exec_command_and_wait_for_pattern(test, command,
:param success_message: if this message appears, test succeeds
:param failure_message: if this message appears, test fails
"""
+ assert success_message
_console_interaction(test, success_message, failure_message, command + '\r')
def get_qemu_img(test):
@@ -184,10 +193,10 @@ def get_qemu_img(test):
# If qemu-img has been built, use it, otherwise the system wide one
# will be used.
- qemu_img = os.path.join(BUILD_DIR, 'qemu-img')
+ qemu_img = test.build_file('qemu-img')
if os.path.exists(qemu_img):
return qemu_img
- if has_cmd('qemu-img'):
- return 'qemu-img'
- test.skipTest('Could not find "qemu-img", which is required to '
- 'create temporary images')
+ qemu_img = which('qemu-img')
+ if qemu_img is not None:
+ return qemu_img
+ test.skipTest(f"qemu-img not found in build dir or '$PATH'")
diff --git a/tests/functional/qemu_test/config.py b/tests/functional/qemu_test/config.py
index edd75b7..6d4c9c3 100644
--- a/tests/functional/qemu_test/config.py
+++ b/tests/functional/qemu_test/config.py
@@ -13,6 +13,7 @@
import os
from pathlib import Path
+import platform
def _source_dir():
@@ -34,3 +35,14 @@ def _build_dir():
raise Exception("Cannot identify build dir, set QEMU_BUILD_ROOT")
BUILD_DIR = _build_dir()
+
+def dso_suffix():
+ '''Return the dynamic libraries suffix for the current platform'''
+
+ if platform.system() == "Darwin":
+ return "dylib"
+
+ if platform.system() == "Windows":
+ return "dll"
+
+ return "so"
diff --git a/tests/functional/qemu_test/decorators.py b/tests/functional/qemu_test/decorators.py
new file mode 100644
index 0000000..c0d1567
--- /dev/null
+++ b/tests/functional/qemu_test/decorators.py
@@ -0,0 +1,151 @@
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+# Decorators useful in functional tests
+
+import importlib
+import os
+import platform
+import resource
+from unittest import skipIf, skipUnless
+
+from .cmd import which
+
+'''
+Decorator to skip execution of a test if the list
+of command binaries is not available in $PATH.
+Example:
+
+ @skipIfMissingCommands("mkisofs", "losetup")
+'''
+def skipIfMissingCommands(*args):
+ has_cmds = True
+ for cmd in args:
+ if not which(cmd):
+ has_cmds = False
+ break
+
+ return skipUnless(has_cmds, 'required command(s) "%s" not installed' %
+ ", ".join(args))
+
+'''
+Decorator to skip execution of a test if the current
+host operating system does match one of the prohibited
+ones.
+Example
+
+ @skipIfOperatingSystem("Linux", "Darwin")
+'''
+def skipIfOperatingSystem(*args):
+ return skipIf(platform.system() in args,
+ 'running on an OS (%s) that is not able to run this test' %
+ ", ".join(args))
+
+'''
+Decorator to skip execution of a test if the current
+host machine does not match one of the permitted
+machines.
+Example
+
+ @skipIfNotMachine("x86_64", "aarch64")
+'''
+def skipIfNotMachine(*args):
+ return skipUnless(platform.machine() in args,
+ 'not running on one of the required machine(s) "%s"' %
+ ", ".join(args))
+
+'''
+Decorator to skip execution of flaky tests, unless
+the $QEMU_TEST_FLAKY_TESTS environment variable is set.
+A bug URL must be provided that documents the observed
+failure behaviour, so it can be tracked & re-evaluated
+in future.
+
+Historical tests may be providing "None" as the bug_url
+but this should not be done for new test.
+
+Example:
+
+ @skipFlakyTest("https://gitlab.com/qemu-project/qemu/-/issues/NNN")
+'''
+def skipFlakyTest(bug_url):
+ if bug_url is None:
+ bug_url = "FIXME: reproduce flaky test and file bug report or remove"
+ return skipUnless(os.getenv('QEMU_TEST_FLAKY_TESTS'),
+ f'Test is unstable: {bug_url}')
+
+'''
+Decorator to skip execution of tests which are likely
+to execute untrusted commands on the host, or commands
+which process untrusted code, unless the
+$QEMU_TEST_ALLOW_UNTRUSTED_CODE env var is set.
+Example:
+
+ @skipUntrustedTest()
+'''
+def skipUntrustedTest():
+ return skipUnless(os.getenv('QEMU_TEST_ALLOW_UNTRUSTED_CODE'),
+ 'Test runs untrusted code / processes untrusted data')
+
+'''
+Decorator to skip execution of tests which need large
+data storage (over around 500MB-1GB mark) on the host,
+unless the $QEMU_TEST_ALLOW_LARGE_STORAGE environment
+variable is set
+
+Example:
+
+ @skipBigDataTest()
+'''
+def skipBigDataTest():
+ return skipUnless(os.getenv('QEMU_TEST_ALLOW_LARGE_STORAGE'),
+ 'Test requires large host storage space')
+
+'''
+Decorator to skip execution of tests which have a really long
+runtime (and might e.g. time out if QEMU has been compiled with
+debugging enabled) unless the $QEMU_TEST_ALLOW_SLOW
+environment variable is set
+
+Example:
+
+ @skipSlowTest()
+'''
+def skipSlowTest():
+ return skipUnless(os.getenv('QEMU_TEST_ALLOW_SLOW'),
+ 'Test has a very long runtime and might time out')
+
+'''
+Decorator to skip execution of a test if the list
+of python imports is not available.
+Example:
+
+ @skipIfMissingImports("numpy", "cv2")
+'''
+def skipIfMissingImports(*args):
+ has_imports = True
+ for impname in args:
+ try:
+ importlib.import_module(impname)
+ except ImportError:
+ has_imports = False
+ break
+
+ return skipUnless(has_imports, 'required import(s) "%s" not installed' %
+ ", ".join(args))
+
+'''
+Decorator to skip execution of a test if the system's
+locked memory limit is below the required threshold.
+Takes required locked memory threshold in kB.
+Example:
+
+ @skipLockedMemoryTest(2_097_152)
+'''
+def skipLockedMemoryTest(locked_memory):
+ # get memlock hard limit in bytes
+ _, ulimit_memory = resource.getrlimit(resource.RLIMIT_MEMLOCK)
+
+ return skipUnless(
+ ulimit_memory == resource.RLIM_INFINITY or ulimit_memory >= locked_memory * 1024,
+ f'Test required {locked_memory} kB of available locked memory',
+ )
diff --git a/tests/functional/qemu_test/linuxkernel.py b/tests/functional/qemu_test/linuxkernel.py
new file mode 100644
index 0000000..2aca0ee
--- /dev/null
+++ b/tests/functional/qemu_test/linuxkernel.py
@@ -0,0 +1,52 @@
+# Test class for testing the boot process of a Linux kernel
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or
+# later. See the COPYING file in the top-level directory.
+
+import hashlib
+import urllib.request
+
+from .cmd import wait_for_console_pattern, exec_command_and_wait_for_pattern
+from .testcase import QemuSystemTest
+from .utils import get_usernet_hostfwd_port
+
+
+class LinuxKernelTest(QemuSystemTest):
+ KERNEL_COMMON_COMMAND_LINE = 'printk.time=0 '
+
+ def wait_for_console_pattern(self, success_message, vm=None):
+ wait_for_console_pattern(self, success_message,
+ failure_message='Kernel panic - not syncing',
+ vm=vm)
+
+ def launch_kernel(self, kernel, initrd=None, dtb=None, console_index=0,
+ wait_for=None):
+ self.vm.set_console(console_index=console_index)
+ self.vm.add_args('-kernel', kernel)
+ if initrd:
+ self.vm.add_args('-initrd', initrd)
+ if dtb:
+ self.vm.add_args('-dtb', dtb)
+ self.vm.launch()
+ if wait_for:
+ self.wait_for_console_pattern(wait_for)
+
+ def check_http_download(self, filename, hashsum, guestport=8080,
+ pythoncmd='python3 -m http.server'):
+ exec_command_and_wait_for_pattern(self,
+ f'{pythoncmd} {guestport} & sleep 1',
+ f'Serving HTTP on 0.0.0.0 port {guestport}')
+ hl = hashlib.sha256()
+ hostport = get_usernet_hostfwd_port(self.vm)
+ url = f'http://localhost:{hostport}{filename}'
+ self.log.info(f'Downloading {url} ...')
+ with urllib.request.urlopen(url) as response:
+ while True:
+ chunk = response.read(1 << 20)
+ if not chunk:
+ break
+ hl.update(chunk)
+
+ digest = hl.hexdigest()
+ self.log.info(f'sha256sum of download is {digest}.')
+ self.assertEqual(digest, hashsum)
diff --git a/tests/functional/qemu_test/ports.py b/tests/functional/qemu_test/ports.py
new file mode 100644
index 0000000..631b77a
--- /dev/null
+++ b/tests/functional/qemu_test/ports.py
@@ -0,0 +1,55 @@
+#!/usr/bin/env python3
+#
+# Simple functional tests for VNC functionality
+#
+# Copyright 2018, 2024 Red Hat, Inc.
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or
+# later. See the COPYING file in the top-level directory.
+
+import fcntl
+import os
+import socket
+
+from .config import BUILD_DIR
+from typing import List
+
+
+class Ports():
+
+ PORTS_ADDR = '127.0.0.1'
+ PORTS_RANGE_SIZE = 1024
+ PORTS_START = 49152 + ((os.getpid() * PORTS_RANGE_SIZE) % 16384)
+ PORTS_END = PORTS_START + PORTS_RANGE_SIZE
+
+ def __enter__(self):
+ lock_file = os.path.join(BUILD_DIR, "tests", "functional", "port_lock")
+ self.lock_fh = os.open(lock_file, os.O_CREAT)
+ fcntl.flock(self.lock_fh, fcntl.LOCK_EX)
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ fcntl.flock(self.lock_fh, fcntl.LOCK_UN)
+ os.close(self.lock_fh)
+
+ def check_bind(self, port: int) -> bool:
+ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
+ try:
+ sock.bind((self.PORTS_ADDR, port))
+ except OSError:
+ return False
+
+ return True
+
+ def find_free_ports(self, count: int) -> List[int]:
+ result = []
+ for port in range(self.PORTS_START, self.PORTS_END):
+ if self.check_bind(port):
+ result.append(port)
+ if len(result) >= count:
+ break
+ assert len(result) == count
+ return result
+
+ def find_free_port(self) -> int:
+ return self.find_free_ports(1)[0]
diff --git a/tests/functional/qemu_test/tesseract.py b/tests/functional/qemu_test/tesseract.py
index c4087b7..ede6c65 100644
--- a/tests/functional/qemu_test/tesseract.py
+++ b/tests/functional/qemu_test/tesseract.py
@@ -5,29 +5,19 @@
# This work is licensed under the terms of the GNU GPL, version 2 or
# later. See the COPYING file in the top-level directory.
-import re
import logging
+from subprocess import run
-from . import has_cmd, run_cmd
-
-def tesseract_available(expected_version):
- if not has_cmd('tesseract'):
- return False
- (stdout, stderr, ret) = run_cmd([ 'tesseract', '--version'])
- if ret:
- return False
- version = stdout.split()[1]
- return int(version.split('.')[0]) >= expected_version
def tesseract_ocr(image_path, tesseract_args=''):
console_logger = logging.getLogger('console')
console_logger.debug(image_path)
- (stdout, stderr, ret) = run_cmd(['tesseract', image_path,
- 'stdout'])
- if ret:
+ proc = run(['tesseract', image_path, 'stdout'],
+ capture_output=True, encoding='utf8')
+ if proc.returncode:
return None
lines = []
- for line in stdout.split('\n'):
+ for line in proc.stdout.split('\n'):
sline = line.strip()
if len(sline):
console_logger.debug(sline)
diff --git a/tests/functional/qemu_test/testcase.py b/tests/functional/qemu_test/testcase.py
index aa01462..2082c6f 100644
--- a/tests/functional/qemu_test/testcase.py
+++ b/tests/functional/qemu_test/testcase.py
@@ -13,49 +13,224 @@
import logging
import os
-import subprocess
+from pathlib import Path
import pycotap
+import shutil
+from subprocess import run
import sys
+import tempfile
import unittest
import uuid
from qemu.machine import QEMUMachine
-from qemu.utils import kvm_available, tcg_available
+from qemu.utils import hvf_available, kvm_available, tcg_available
+from .archive import archive_extract
from .asset import Asset
-from .cmd import run_cmd
-from .config import BUILD_DIR
+from .config import BUILD_DIR, dso_suffix
+from .uncompress import uncompress
class QemuBaseTest(unittest.TestCase):
- qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY')
- arch = None
-
- workdir = None
- log = None
- logdir = None
+ '''
+ @params compressed: filename, Asset, or file-like object to uncompress
+ @params format: optional compression format (gzip, lzma)
+
+ Uncompresses @compressed into the scratch directory.
+
+ If @format is None, heuristics will be applied to guess the format
+ from the filename or Asset URL. @format must be non-None if @uncompressed
+ is a file-like object.
+
+ Returns the fully qualified path to the uncompressed file
+ '''
+ def uncompress(self, compressed, format=None):
+ self.log.debug(f"Uncompress {compressed} format={format}")
+ if type(compressed) == Asset:
+ compressed.fetch()
+
+ (name, ext) = os.path.splitext(str(compressed))
+ uncompressed = self.scratch_file(os.path.basename(name))
+
+ uncompress(compressed, uncompressed, format)
+
+ return uncompressed
+
+ '''
+ @params archive: filename, Asset, or file-like object to extract
+ @params format: optional archive format (tar, zip, deb, cpio)
+ @params sub_dir: optional sub-directory to extract into
+ @params member: optional member file to limit extraction to
+
+ Extracts @archive into the scratch directory, or a directory beneath
+ named by @sub_dir. All files are extracted unless @member specifies
+ a limit.
+
+ If @format is None, heuristics will be applied to guess the format
+ from the filename or Asset URL. @format must be non-None if @archive
+ is a file-like object.
+
+ If @member is non-None, returns the fully qualified path to @member
+ '''
+ def archive_extract(self, archive, format=None, sub_dir=None, member=None):
+ self.log.debug(f"Extract {archive} format={format}" +
+ f"sub_dir={sub_dir} member={member}")
+ if type(archive) == Asset:
+ archive.fetch()
+ if sub_dir is None:
+ archive_extract(archive, self.scratch_file(), format, member)
+ else:
+ archive_extract(archive, self.scratch_file(sub_dir),
+ format, member)
+
+ if member is not None:
+ return self.scratch_file(member)
+ return None
+
+ '''
+ Create a temporary directory suitable for storing UNIX
+ socket paths.
+
+ Returns: a tempfile.TemporaryDirectory instance
+ '''
+ def socket_dir(self):
+ if self.socketdir is None:
+ self.socketdir = tempfile.TemporaryDirectory(
+ prefix="qemu_func_test_sock_")
+ return self.socketdir
+
+ '''
+ @params args list of zero or more subdirectories or file
+
+ Construct a path for accessing a data file located
+ relative to the source directory that is the root for
+ functional tests.
+
+ @args may be an empty list to reference the root dir
+ itself, may be a single element to reference a file in
+ the root directory, or may be multiple elements to
+ reference a file nested below. The path components
+ will be joined using the platform appropriate path
+ separator.
+
+ Returns: string representing a file path
+ '''
+ def data_file(self, *args):
+ return str(Path(Path(__file__).parent.parent, *args))
+
+ '''
+ @params args list of zero or more subdirectories or file
+
+ Construct a path for accessing a data file located
+ relative to the build directory root.
+
+ @args may be an empty list to reference the build dir
+ itself, may be a single element to reference a file in
+ the build directory, or may be multiple elements to
+ reference a file nested below. The path components
+ will be joined using the platform appropriate path
+ separator.
+
+ Returns: string representing a file path
+ '''
+ def build_file(self, *args):
+ return str(Path(BUILD_DIR, *args))
+
+ '''
+ @params args list of zero or more subdirectories or file
+
+ Construct a path for accessing/creating a scratch file
+ located relative to a temporary directory dedicated to
+ this test case. The directory and its contents will be
+ purged upon completion of the test.
+
+ @args may be an empty list to reference the scratch dir
+ itself, may be a single element to reference a file in
+ the scratch directory, or may be multiple elements to
+ reference a file nested below. The path components
+ will be joined using the platform appropriate path
+ separator.
+
+ Returns: string representing a file path
+ '''
+ def scratch_file(self, *args):
+ return str(Path(self.workdir, *args))
+
+ '''
+ @params args list of zero or more subdirectories or file
+
+ Construct a path for accessing/creating a log file
+ located relative to a temporary directory dedicated to
+ this test case. The directory and its log files will be
+ preserved upon completion of the test.
+
+ @args may be an empty list to reference the log dir
+ itself, may be a single element to reference a file in
+ the log directory, or may be multiple elements to
+ reference a file nested below. The path components
+ will be joined using the platform appropriate path
+ separator.
+
+ Returns: string representing a file path
+ '''
+ def log_file(self, *args):
+ return str(Path(self.outputdir, *args))
+
+ '''
+ @params plugin name
+
+ Return the full path to the plugin taking into account any host OS
+ specific suffixes.
+ '''
+ def plugin_file(self, plugin_name):
+ sfx = dso_suffix()
+ return os.path.join('tests', 'tcg', 'plugins', f'{plugin_name}.{sfx}')
+
+ def assets_available(self):
+ for name, asset in vars(self.__class__).items():
+ if name.startswith("ASSET_") and type(asset) == Asset:
+ if not asset.available():
+ self.log.debug(f"Asset {asset.url} not available")
+ return False
+ return True
- def setUp(self, bin_prefix):
+ def setUp(self):
+ self.qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY')
self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set')
self.arch = self.qemu_bin.split('-')[-1]
+ self.socketdir = None
- self.workdir = os.path.join(BUILD_DIR, 'tests/functional', self.arch,
- self.id())
+ self.outputdir = self.build_file('tests', 'functional',
+ self.arch, self.id())
+ self.workdir = os.path.join(self.outputdir, 'scratch')
os.makedirs(self.workdir, exist_ok=True)
- self.logdir = self.workdir
+ self.log_filename = self.log_file('base.log')
self.log = logging.getLogger('qemu-test')
self.log.setLevel(logging.DEBUG)
- self._log_fh = logging.FileHandler(os.path.join(self.logdir,
- 'base.log'), mode='w')
+ self._log_fh = logging.FileHandler(self.log_filename, mode='w')
self._log_fh.setLevel(logging.DEBUG)
fileFormatter = logging.Formatter(
'%(asctime)s - %(levelname)s: %(message)s')
self._log_fh.setFormatter(fileFormatter)
self.log.addHandler(self._log_fh)
+ # Capture QEMUMachine logging
+ self.machinelog = logging.getLogger('qemu.machine')
+ self.machinelog.setLevel(logging.DEBUG)
+ self.machinelog.addHandler(self._log_fh)
+
+ if not self.assets_available():
+ self.skipTest('One or more assets is not available')
+
def tearDown(self):
+ if "QEMU_TEST_KEEP_SCRATCH" not in os.environ:
+ shutil.rmtree(self.workdir)
+ if self.socketdir is not None:
+ shutil.rmtree(self.socketdir.name)
+ self.socketdir = None
+ self.machinelog.removeHandler(self._log_fh)
self.log.removeHandler(self._log_fh)
def main():
@@ -68,24 +243,33 @@ class QemuBaseTest(unittest.TestCase):
tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError,
test_output_log = pycotap.LogMode.LogToError)
- unittest.main(module = None, testRunner = tr, argv=["__dummy__", path])
+ res = unittest.main(module = None, testRunner = tr, exit = False,
+ argv=["__dummy__", path])
+ for (test, message) in res.result.errors + res.result.failures:
+
+ if hasattr(test, "log_filename"):
+ print('More information on ' + test.id() + ' could be found here:'
+ '\n %s' % test.log_filename, file=sys.stderr)
+ if hasattr(test, 'console_log_name'):
+ print(' %s' % test.console_log_name, file=sys.stderr)
+ sys.exit(not res.result.wasSuccessful())
class QemuUserTest(QemuBaseTest):
def setUp(self):
- super().setUp('qemu-')
+ super().setUp()
self._ldpath = []
def add_ldpath(self, ldpath):
self._ldpath.append(os.path.abspath(ldpath))
def run_cmd(self, bin_path, args=[]):
- return subprocess.run([self.qemu_bin]
- + ["-L %s" % ldpath for ldpath in self._ldpath]
- + [bin_path]
- + args,
- text=True, capture_output=True)
+ return run([self.qemu_bin]
+ + ["-L %s" % ldpath for ldpath in self._ldpath]
+ + [bin_path]
+ + args,
+ text=True, capture_output=True)
class QemuSystemTest(QemuBaseTest):
"""Facilitates system emulation tests."""
@@ -97,12 +281,13 @@ class QemuSystemTest(QemuBaseTest):
def setUp(self):
self._vms = {}
- super().setUp('qemu-system-')
+ super().setUp()
console_log = logging.getLogger('console')
console_log.setLevel(logging.DEBUG)
- self._console_log_fh = logging.FileHandler(os.path.join(self.workdir,
- 'console.log'), mode='w')
+ self.console_log_name = self.log_file('console.log')
+ self._console_log_fh = logging.FileHandler(self.console_log_name,
+ mode='w')
self._console_log_fh.setLevel(logging.DEBUG)
fileFormatter = logging.Formatter('%(asctime)s: %(message)s')
self._console_log_fh.setFormatter(fileFormatter)
@@ -111,7 +296,9 @@ class QemuSystemTest(QemuBaseTest):
def set_machine(self, machinename):
# TODO: We should use QMP to get the list of available machines
if not self._machinehelp:
- self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0];
+ self._machinehelp = run(
+ [self.qemu_bin, '-M', 'help'],
+ capture_output=True, check=True, encoding='utf8').stdout
if self._machinehelp.find(machinename) < 0:
self.skipTest('no support for machine ' + machinename)
self.machine = machinename
@@ -130,7 +317,9 @@ class QemuSystemTest(QemuBaseTest):
:type accelerator: str
"""
checker = {'tcg': tcg_available,
- 'kvm': kvm_available}.get(accelerator)
+ 'kvm': kvm_available,
+ 'hvf': hvf_available,
+ }.get(accelerator)
if checker is None:
self.skipTest("Don't know how to check for the presence "
"of accelerator %s" % accelerator)
@@ -139,22 +328,33 @@ class QemuSystemTest(QemuBaseTest):
"available" % accelerator)
def require_netdev(self, netdevname):
- netdevhelp = run_cmd([self.qemu_bin,
- '-M', 'none', '-netdev', 'help'])[0];
- if netdevhelp.find('\n' + netdevname + '\n') < 0:
+ help = run([self.qemu_bin,
+ '-M', 'none', '-netdev', 'help'],
+ capture_output=True, check=True, encoding='utf8').stdout;
+ if help.find('\n' + netdevname + '\n') < 0:
self.skipTest('no support for " + netdevname + " networking')
def require_device(self, devicename):
- devhelp = run_cmd([self.qemu_bin,
- '-M', 'none', '-device', 'help'])[0];
- if devhelp.find(devicename) < 0:
+ help = run([self.qemu_bin,
+ '-M', 'none', '-device', 'help'],
+ capture_output=True, check=True, encoding='utf8').stdout;
+ if help.find(devicename) < 0:
self.skipTest('no support for device ' + devicename)
def _new_vm(self, name, *args):
- vm = QEMUMachine(self.qemu_bin, base_temp_dir=self.workdir)
+ vm = QEMUMachine(self.qemu_bin,
+ name=name,
+ base_temp_dir=self.workdir,
+ log_dir=self.log_file())
self.log.debug('QEMUMachine "%s" created', name)
self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
- self.log.debug('QEMUMachine "%s" log_dir: %s', name, vm.log_dir)
+
+ sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None)
+ if sockpath is not None:
+ vm.add_args("-chardev",
+ f"socket,id=backdoor,path={sockpath},server=on,wait=off",
+ "-mon", "chardev=backdoor,mode=control")
+
if args:
vm.add_args(*args)
return vm
diff --git a/tests/functional/qemu_test/tuxruntest.py b/tests/functional/qemu_test/tuxruntest.py
new file mode 100644
index 0000000..6c442ff
--- /dev/null
+++ b/tests/functional/qemu_test/tuxruntest.py
@@ -0,0 +1,136 @@
+# Functional test that boots known good tuxboot images the same way
+# that tuxrun (www.tuxrun.org) does. This tool is used by things like
+# the LKFT project to run regression tests on kernels.
+#
+# Copyright (c) 2023 Linaro Ltd.
+#
+# Author:
+# Alex Bennée <alex.bennee@linaro.org>
+#
+# SPDX-License-Identifier: GPL-2.0-or-later
+
+import os
+
+from qemu_test import QemuSystemTest
+from qemu_test import exec_command_and_wait_for_pattern
+from qemu_test import wait_for_console_pattern
+from qemu_test import which, get_qemu_img
+
+class TuxRunBaselineTest(QemuSystemTest):
+
+ KERNEL_COMMON_COMMAND_LINE = 'printk.time=0'
+ # Tests are ~10-40s, allow for --debug/--enable-gcov overhead
+ timeout = 100
+
+ def setUp(self):
+ super().setUp()
+
+ # We need zstd for all the tuxrun tests
+ if which('zstd') is None:
+ self.skipTest("zstd not found in $PATH")
+
+ # Pre-init TuxRun specific settings: Most machines work with
+ # reasonable defaults but we sometimes need to tweak the
+ # config. To avoid open coding everything we store all these
+ # details in the metadata for each test.
+
+ # The tuxboot tag matches the root directory
+ self.tuxboot = self.arch
+
+ # Most Linux's use ttyS0 for their serial port
+ self.console = "ttyS0"
+
+ # Does the machine shutdown QEMU nicely on "halt"
+ self.wait_for_shutdown = True
+
+ self.root = "vda"
+
+ # Occasionally we need extra devices to hook things up
+ self.extradev = None
+
+ self.qemu_img = get_qemu_img(self)
+
+ def wait_for_console_pattern(self, success_message, vm=None):
+ wait_for_console_pattern(self, success_message,
+ failure_message='Kernel panic - not syncing',
+ vm=vm)
+
+ def fetch_tuxrun_assets(self, kernel_asset, rootfs_asset, dtb_asset=None):
+ """
+ Fetch the TuxBoot assets.
+ """
+ kernel_image = kernel_asset.fetch()
+ disk_image = self.uncompress(rootfs_asset)
+ dtb = dtb_asset.fetch() if dtb_asset is not None else None
+
+ return (kernel_image, disk_image, dtb)
+
+ def prepare_run(self, kernel, disk, drive, dtb=None, console_index=0):
+ """
+ Setup to run and add the common parameters to the system
+ """
+ self.vm.set_console(console_index=console_index)
+
+ # all block devices are raw ext4's
+ blockdev = "driver=raw,file.driver=file," \
+ + f"file.filename={disk},node-name=hd0"
+
+ self.kcmd_line = self.KERNEL_COMMON_COMMAND_LINE
+ self.kcmd_line += f" root=/dev/{self.root}"
+ self.kcmd_line += f" console={self.console}"
+
+ self.vm.add_args('-kernel', kernel,
+ '-append', self.kcmd_line,
+ '-blockdev', blockdev)
+
+ # Sometimes we need extra devices attached
+ if self.extradev:
+ self.vm.add_args('-device', self.extradev)
+
+ self.vm.add_args('-device',
+ f"{drive},drive=hd0")
+
+ # Some machines need an explicit DTB
+ if dtb:
+ self.vm.add_args('-dtb', dtb)
+
+ def run_tuxtest_tests(self, haltmsg):
+ """
+ Wait for the system to boot up, wait for the login prompt and
+ then do a few things on the console. Trigger a shutdown and
+ wait to exit cleanly.
+ """
+ ps1='root@tuxtest:~#'
+ self.wait_for_console_pattern(self.kcmd_line)
+ self.wait_for_console_pattern('tuxtest login:')
+ exec_command_and_wait_for_pattern(self, 'root', ps1)
+ exec_command_and_wait_for_pattern(self, 'cat /proc/interrupts', ps1)
+ exec_command_and_wait_for_pattern(self, 'cat /proc/self/maps', ps1)
+ exec_command_and_wait_for_pattern(self, 'uname -a', ps1)
+ exec_command_and_wait_for_pattern(self, 'halt', haltmsg)
+
+ # Wait for VM to shut down gracefully if it can
+ if self.wait_for_shutdown:
+ self.vm.wait()
+ else:
+ self.vm.shutdown()
+
+ def common_tuxrun(self,
+ kernel_asset,
+ rootfs_asset,
+ dtb_asset=None,
+ drive="virtio-blk-device",
+ haltmsg="reboot: System halted",
+ console_index=0):
+ """
+ Common path for LKFT tests. Unless we need to do something
+ special with the command line we can process most things using
+ the tag metadata.
+ """
+ (kernel, disk, dtb) = self.fetch_tuxrun_assets(kernel_asset, rootfs_asset,
+ dtb_asset)
+
+ self.prepare_run(kernel, disk, drive, dtb, console_index)
+ self.vm.launch()
+ self.run_tuxtest_tests(haltmsg)
+ os.remove(disk)
diff --git a/tests/functional/qemu_test/uncompress.py b/tests/functional/qemu_test/uncompress.py
new file mode 100644
index 0000000..b7ef8f7
--- /dev/null
+++ b/tests/functional/qemu_test/uncompress.py
@@ -0,0 +1,107 @@
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+# Utilities for python-based QEMU tests
+#
+# Copyright 2024 Red Hat, Inc.
+#
+# Authors:
+# Thomas Huth <thuth@redhat.com>
+
+import gzip
+import lzma
+import os
+import stat
+import shutil
+from urllib.parse import urlparse
+from subprocess import run, CalledProcessError
+
+from .asset import Asset
+
+
+def gzip_uncompress(gz_path, output_path):
+ if os.path.exists(output_path):
+ return
+ with gzip.open(gz_path, 'rb') as gz_in:
+ try:
+ with open(output_path, 'wb') as raw_out:
+ shutil.copyfileobj(gz_in, raw_out)
+ except:
+ os.remove(output_path)
+ raise
+
+def lzma_uncompress(xz_path, output_path):
+ if os.path.exists(output_path):
+ return
+ with lzma.open(xz_path, 'rb') as lzma_in:
+ try:
+ with open(output_path, 'wb') as raw_out:
+ shutil.copyfileobj(lzma_in, raw_out)
+ except:
+ os.remove(output_path)
+ raise
+
+
+def zstd_uncompress(zstd_path, output_path):
+ if os.path.exists(output_path):
+ return
+
+ try:
+ run(['zstd', "-f", "-d", zstd_path,
+ "-o", output_path], capture_output=True, check=True)
+ except CalledProcessError as e:
+ os.remove(output_path)
+ raise Exception(
+ f"Unable to decompress zstd file {zstd_path} with {e}") from e
+
+ # zstd copies source archive permissions for the output
+ # file, so must make this writable for QEMU
+ os.chmod(output_path, stat.S_IRUSR | stat.S_IWUSR)
+
+
+'''
+@params compressed: filename, Asset, or file-like object to uncompress
+@params uncompressed: filename to uncompress into
+@params format: optional compression format (gzip, lzma)
+
+Uncompresses @compressed into @uncompressed
+
+If @format is None, heuristics will be applied to guess the format
+from the filename or Asset URL. @format must be non-None if @uncompressed
+is a file-like object.
+
+Returns the fully qualified path to the uncompessed file
+'''
+def uncompress(compressed, uncompressed, format=None):
+ if format is None:
+ format = guess_uncompress_format(compressed)
+
+ if format == "xz":
+ lzma_uncompress(str(compressed), uncompressed)
+ elif format == "gz":
+ gzip_uncompress(str(compressed), uncompressed)
+ elif format == "zstd":
+ zstd_uncompress(str(compressed), uncompressed)
+ else:
+ raise Exception(f"Unknown compression format {format}")
+
+'''
+@params compressed: filename, Asset, or file-like object to guess
+
+Guess the format of @compressed, raising an exception if
+no format can be determined
+'''
+def guess_uncompress_format(compressed):
+ if type(compressed) == Asset:
+ compressed = urlparse(compressed.url).path
+ elif type(compressed) != str:
+ raise Exception(f"Unable to guess compression cformat for {compressed}")
+
+ (name, ext) = os.path.splitext(compressed)
+ if ext == ".xz":
+ return "xz"
+ elif ext == ".gz":
+ return "gz"
+ elif ext in [".zstd", ".zst"]:
+ return 'zstd'
+ else:
+ raise Exception(f"Unknown compression format for {compressed}")
diff --git a/tests/functional/qemu_test/utils.py b/tests/functional/qemu_test/utils.py
index 2a1cb60..e7c8de8 100644
--- a/tests/functional/qemu_test/utils.py
+++ b/tests/functional/qemu_test/utils.py
@@ -8,49 +8,32 @@
# This work is licensed under the terms of the GNU GPL, version 2 or
# later. See the COPYING file in the top-level directory.
-import gzip
-import lzma
import os
-import shutil
-import subprocess
-import tarfile
-def archive_extract(archive, dest_dir, member=None):
- with tarfile.open(archive) as tf:
- if hasattr(tarfile, 'data_filter'):
- tf.extraction_filter = getattr(tarfile, 'data_filter',
- (lambda member, path: member))
- if member:
- tf.extract(member=member, path=dest_dir)
- else:
- tf.extractall(path=dest_dir)
+from qemu.utils import get_info_usernet_hostfwd_port
-def gzip_uncompress(gz_path, output_path):
- if os.path.exists(output_path):
- return
- with gzip.open(gz_path, 'rb') as gz_in:
- try:
- with open(output_path, 'wb') as raw_out:
- shutil.copyfileobj(gz_in, raw_out)
- except:
- os.remove(output_path)
- raise
-def lzma_uncompress(xz_path, output_path):
- if os.path.exists(output_path):
- return
- with lzma.open(xz_path, 'rb') as lzma_in:
- try:
- with open(output_path, 'wb') as raw_out:
- shutil.copyfileobj(lzma_in, raw_out)
- except:
- os.remove(output_path)
- raise
+def get_usernet_hostfwd_port(vm):
+ res = vm.cmd('human-monitor-command', command_line='info usernet')
+ return get_info_usernet_hostfwd_port(res)
-def cpio_extract(cpio_handle, output_path):
- cwd = os.getcwd()
- os.chdir(output_path)
- subprocess.run(['cpio', '-i'],
- input=cpio_handle.read(),
- stderr=subprocess.DEVNULL)
- os.chdir(cwd)
+"""
+Round up to next power of 2
+"""
+def pow2ceil(x):
+ return 1 if x == 0 else 2**(x - 1).bit_length()
+
+def file_truncate(path, size):
+ if size != os.path.getsize(path):
+ with open(path, 'ab+') as fd:
+ fd.truncate(size)
+
+"""
+Expand file size to next power of 2
+"""
+def image_pow2ceil_expand(path):
+ size = os.path.getsize(path)
+ size_aligned = pow2ceil(size)
+ if size != size_aligned:
+ with open(path, 'ab+') as fd:
+ fd.truncate(size_aligned)