aboutsummaryrefslogtreecommitdiff
path: root/tests/functional/qemu_test
diff options
context:
space:
mode:
Diffstat (limited to 'tests/functional/qemu_test')
-rw-r--r--tests/functional/qemu_test/__init__.py20
-rw-r--r--tests/functional/qemu_test/archive.py117
-rw-r--r--tests/functional/qemu_test/asset.py230
-rw-r--r--tests/functional/qemu_test/cmd.py202
-rw-r--r--tests/functional/qemu_test/config.py48
-rw-r--r--tests/functional/qemu_test/decorators.py151
-rw-r--r--tests/functional/qemu_test/linuxkernel.py52
-rw-r--r--tests/functional/qemu_test/ports.py55
-rw-r--r--tests/functional/qemu_test/tesseract.py25
-rw-r--r--tests/functional/qemu_test/testcase.py402
-rw-r--r--tests/functional/qemu_test/tuxruntest.py136
-rw-r--r--tests/functional/qemu_test/uncompress.py107
-rw-r--r--tests/functional/qemu_test/utils.py39
13 files changed, 1584 insertions, 0 deletions
diff --git a/tests/functional/qemu_test/__init__.py b/tests/functional/qemu_test/__init__.py
new file mode 100644
index 0000000..6e666a0
--- /dev/null
+++ b/tests/functional/qemu_test/__init__.py
@@ -0,0 +1,20 @@
+# Test class and utilities for functional tests
+#
+# Copyright 2024 Red Hat, Inc.
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or
+# later. See the COPYING file in the top-level directory.
+
+
+from .asset import Asset
+from .config import BUILD_DIR, dso_suffix
+from .cmd import is_readable_executable_file, \
+ interrupt_interactive_console_until_pattern, wait_for_console_pattern, \
+ exec_command, exec_command_and_wait_for_pattern, get_qemu_img, which
+from .testcase import QemuBaseTest, QemuUserTest, QemuSystemTest
+from .linuxkernel import LinuxKernelTest
+from .decorators import skipIfMissingCommands, skipIfNotMachine, \
+ skipFlakyTest, skipUntrustedTest, skipBigDataTest, skipSlowTest, \
+ skipIfMissingImports, skipIfOperatingSystem, skipLockedMemoryTest
+from .archive import archive_extract
+from .uncompress import uncompress
diff --git a/tests/functional/qemu_test/archive.py b/tests/functional/qemu_test/archive.py
new file mode 100644
index 0000000..c803fda
--- /dev/null
+++ b/tests/functional/qemu_test/archive.py
@@ -0,0 +1,117 @@
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+# Utilities for python-based QEMU tests
+#
+# Copyright 2024 Red Hat, Inc.
+#
+# Authors:
+# Thomas Huth <thuth@redhat.com>
+
+import os
+from subprocess import check_call, run, DEVNULL
+import tarfile
+from urllib.parse import urlparse
+import zipfile
+
+from .asset import Asset
+
+
+def tar_extract(archive, dest_dir, member=None):
+ with tarfile.open(archive) as tf:
+ if hasattr(tarfile, 'data_filter'):
+ tf.extraction_filter = getattr(tarfile, 'data_filter',
+ (lambda member, path: member))
+ if member:
+ tf.extract(member=member, path=dest_dir)
+ else:
+ tf.extractall(path=dest_dir)
+
+def cpio_extract(archive, output_path):
+ cwd = os.getcwd()
+ os.chdir(output_path)
+ # Not passing 'check=True' as cpio exits with non-zero
+ # status if the archive contains any device nodes :-(
+ if type(archive) == str:
+ run(['cpio', '-i', '-F', archive],
+ stdout=DEVNULL, stderr=DEVNULL)
+ else:
+ run(['cpio', '-i'],
+ input=archive.read(),
+ stdout=DEVNULL, stderr=DEVNULL)
+ os.chdir(cwd)
+
+def zip_extract(archive, dest_dir, member=None):
+ with zipfile.ZipFile(archive, 'r') as zf:
+ if member:
+ zf.extract(member=member, path=dest_dir)
+ else:
+ zf.extractall(path=dest_dir)
+
+def deb_extract(archive, dest_dir, member=None):
+ cwd = os.getcwd()
+ os.chdir(dest_dir)
+ try:
+ proc = run(['ar', 't', archive],
+ check=True, capture_output=True, encoding='utf8')
+ file_path = proc.stdout.split()[2]
+ check_call(['ar', 'x', archive, file_path],
+ stdout=DEVNULL, stderr=DEVNULL)
+ tar_extract(file_path, dest_dir, member)
+ finally:
+ os.chdir(cwd)
+
+'''
+@params archive: filename, Asset, or file-like object to extract
+@params dest_dir: target directory to extract into
+@params member: optional member file to limit extraction to
+
+Extracts @archive into @dest_dir. All files are extracted
+unless @member specifies a limit.
+
+If @format is None, heuristics will be applied to guess the format
+from the filename or Asset URL. @format must be non-None if @archive
+is a file-like object.
+'''
+def archive_extract(archive, dest_dir, format=None, member=None):
+ if format is None:
+ format = guess_archive_format(archive)
+ if type(archive) == Asset:
+ archive = str(archive)
+
+ if format == "tar":
+ tar_extract(archive, dest_dir, member)
+ elif format == "zip":
+ zip_extract(archive, dest_dir, member)
+ elif format == "cpio":
+ if member is not None:
+ raise Exception("Unable to filter cpio extraction")
+ cpio_extract(archive, dest_dir)
+ elif format == "deb":
+ if type(archive) != str:
+ raise Exception("Unable to use file-like object with deb archives")
+ deb_extract(archive, dest_dir, "./" + member)
+ else:
+ raise Exception(f"Unknown archive format {format}")
+
+'''
+@params archive: filename, or Asset to guess
+
+Guess the format of @compressed, raising an exception if
+no format can be determined
+'''
+def guess_archive_format(archive):
+ if type(archive) == Asset:
+ archive = urlparse(archive.url).path
+ elif type(archive) != str:
+ raise Exception(f"Unable to guess archive format for {archive}")
+
+ if ".tar." in archive or archive.endswith("tgz"):
+ return "tar"
+ elif archive.endswith(".zip"):
+ return "zip"
+ elif archive.endswith(".cpio"):
+ return "cpio"
+ elif archive.endswith(".deb") or archive.endswith(".udeb"):
+ return "deb"
+ else:
+ raise Exception(f"Unknown archive format for {archive}")
diff --git a/tests/functional/qemu_test/asset.py b/tests/functional/qemu_test/asset.py
new file mode 100644
index 0000000..704b84d
--- /dev/null
+++ b/tests/functional/qemu_test/asset.py
@@ -0,0 +1,230 @@
+# Test utilities for fetching & caching assets
+#
+# Copyright 2024 Red Hat, Inc.
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or
+# later. See the COPYING file in the top-level directory.
+
+import hashlib
+import logging
+import os
+import stat
+import sys
+import unittest
+import urllib.request
+from time import sleep
+from pathlib import Path
+from shutil import copyfileobj
+from urllib.error import HTTPError
+
+class AssetError(Exception):
+ def __init__(self, asset, msg, transient=False):
+ self.url = asset.url
+ self.msg = msg
+ self.transient = transient
+
+ def __str__(self):
+ return "%s: %s" % (self.url, self.msg)
+
+# Instances of this class must be declared as class level variables
+# starting with a name "ASSET_". This enables the pre-caching logic
+# to easily find all referenced assets and download them prior to
+# execution of the tests.
+class Asset:
+
+ def __init__(self, url, hashsum):
+ self.url = url
+ self.hash = hashsum
+ cache_dir_env = os.getenv('QEMU_TEST_CACHE_DIR')
+ if cache_dir_env:
+ self.cache_dir = Path(cache_dir_env, "download")
+ else:
+ self.cache_dir = Path(Path("~").expanduser(),
+ ".cache", "qemu", "download")
+ self.cache_file = Path(self.cache_dir, hashsum)
+ self.log = logging.getLogger('qemu-test')
+
+ def __repr__(self):
+ return "Asset: url=%s hash=%s cache=%s" % (
+ self.url, self.hash, self.cache_file)
+
+ def __str__(self):
+ return str(self.cache_file)
+
+ def _check(self, cache_file):
+ if self.hash is None:
+ return True
+ if len(self.hash) == 64:
+ hl = hashlib.sha256()
+ elif len(self.hash) == 128:
+ hl = hashlib.sha512()
+ else:
+ raise AssetError(self, "unknown hash type")
+
+ # Calculate the hash of the file:
+ with open(cache_file, 'rb') as file:
+ while True:
+ chunk = file.read(1 << 20)
+ if not chunk:
+ break
+ hl.update(chunk)
+
+ return self.hash == hl.hexdigest()
+
+ def valid(self):
+ return self.cache_file.exists() and self._check(self.cache_file)
+
+ def fetchable(self):
+ return not os.environ.get("QEMU_TEST_NO_DOWNLOAD", False)
+
+ def available(self):
+ return self.valid() or self.fetchable()
+
+ def _wait_for_other_download(self, tmp_cache_file):
+ # Another thread already seems to download the asset, so wait until
+ # it is done, while also checking the size to see whether it is stuck
+ try:
+ current_size = tmp_cache_file.stat().st_size
+ new_size = current_size
+ except:
+ if os.path.exists(self.cache_file):
+ return True
+ raise
+ waittime = lastchange = 600
+ while waittime > 0:
+ sleep(1)
+ waittime -= 1
+ try:
+ new_size = tmp_cache_file.stat().st_size
+ except:
+ if os.path.exists(self.cache_file):
+ return True
+ raise
+ if new_size != current_size:
+ lastchange = waittime
+ current_size = new_size
+ elif lastchange - waittime > 90:
+ return False
+
+ self.log.debug("Time out while waiting for %s!", tmp_cache_file)
+ raise
+
+ def fetch(self):
+ if not self.cache_dir.exists():
+ self.cache_dir.mkdir(parents=True, exist_ok=True)
+
+ if self.valid():
+ self.log.debug("Using cached asset %s for %s",
+ self.cache_file, self.url)
+ return str(self.cache_file)
+
+ if not self.fetchable():
+ raise AssetError(self,
+ "Asset cache is invalid and downloads disabled")
+
+ self.log.info("Downloading %s to %s...", self.url, self.cache_file)
+ tmp_cache_file = self.cache_file.with_suffix(".download")
+
+ for retries in range(3):
+ try:
+ with tmp_cache_file.open("xb") as dst:
+ with urllib.request.urlopen(self.url) as resp:
+ copyfileobj(resp, dst)
+ length_hdr = resp.getheader("Content-Length")
+
+ # Verify downloaded file size against length metadata, if
+ # available.
+ if length_hdr is not None:
+ length = int(length_hdr)
+ fsize = tmp_cache_file.stat().st_size
+ if fsize != length:
+ self.log.error("Unable to download %s: "
+ "connection closed before "
+ "transfer complete (%d/%d)",
+ self.url, fsize, length)
+ tmp_cache_file.unlink()
+ continue
+ break
+ except FileExistsError:
+ self.log.debug("%s already exists, "
+ "waiting for other thread to finish...",
+ tmp_cache_file)
+ if self._wait_for_other_download(tmp_cache_file):
+ return str(self.cache_file)
+ self.log.debug("%s seems to be stale, "
+ "deleting and retrying download...",
+ tmp_cache_file)
+ tmp_cache_file.unlink()
+ continue
+ except HTTPError as e:
+ tmp_cache_file.unlink()
+ self.log.error("Unable to download %s: HTTP error %d",
+ self.url, e.code)
+ # Treat 404 as fatal, since it is highly likely to
+ # indicate a broken test rather than a transient
+ # server or networking problem
+ if e.code == 404:
+ raise AssetError(self, "Unable to download: "
+ "HTTP error %d" % e.code)
+ continue
+ except Exception as e:
+ tmp_cache_file.unlink()
+ raise AssetError(self, "Unable to download: " % e)
+
+ if not os.path.exists(tmp_cache_file):
+ raise AssetError(self, "Download retries exceeded", transient=True)
+
+ try:
+ # Set these just for informational purposes
+ os.setxattr(str(tmp_cache_file), "user.qemu-asset-url",
+ self.url.encode('utf8'))
+ os.setxattr(str(tmp_cache_file), "user.qemu-asset-hash",
+ self.hash.encode('utf8'))
+ except Exception as e:
+ self.log.debug("Unable to set xattr on %s: %s", tmp_cache_file, e)
+ pass
+
+ if not self._check(tmp_cache_file):
+ tmp_cache_file.unlink()
+ raise AssetError(self, "Hash does not match %s" % self.hash)
+ tmp_cache_file.replace(self.cache_file)
+ # Remove write perms to stop tests accidentally modifying them
+ os.chmod(self.cache_file, stat.S_IRUSR | stat.S_IRGRP)
+
+ self.log.info("Cached %s at %s" % (self.url, self.cache_file))
+ return str(self.cache_file)
+
+ def precache_test(test):
+ log = logging.getLogger('qemu-test')
+ log.setLevel(logging.DEBUG)
+ handler = logging.StreamHandler(sys.stdout)
+ handler.setLevel(logging.DEBUG)
+ formatter = logging.Formatter(
+ '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+ handler.setFormatter(formatter)
+ log.addHandler(handler)
+ for name, asset in vars(test.__class__).items():
+ if name.startswith("ASSET_") and type(asset) == Asset:
+ log.info("Attempting to cache '%s'" % asset)
+ try:
+ asset.fetch()
+ except AssetError as e:
+ if not e.transient:
+ raise
+ log.error("%s: skipping asset precache" % e)
+
+ log.removeHandler(handler)
+
+ def precache_suite(suite):
+ for test in suite:
+ if isinstance(test, unittest.TestSuite):
+ Asset.precache_suite(test)
+ elif isinstance(test, unittest.TestCase):
+ Asset.precache_test(test)
+
+ def precache_suites(path, cacheTstamp):
+ loader = unittest.loader.defaultTestLoader
+ tests = loader.loadTestsFromNames([path], None)
+
+ with open(cacheTstamp, "w") as fh:
+ Asset.precache_suite(tests)
diff --git a/tests/functional/qemu_test/cmd.py b/tests/functional/qemu_test/cmd.py
new file mode 100644
index 0000000..dc5f422
--- /dev/null
+++ b/tests/functional/qemu_test/cmd.py
@@ -0,0 +1,202 @@
+# Test class and utilities for functional tests
+#
+# Copyright 2018, 2024 Red Hat, Inc.
+#
+# Original Author (Avocado-based tests):
+# Cleber Rosa <crosa@redhat.com>
+#
+# Adaption for standalone version:
+# Thomas Huth <thuth@redhat.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or
+# later. See the COPYING file in the top-level directory.
+
+import logging
+import os
+import os.path
+
+
+def which(tool):
+ """ looks up the full path for @tool, returns None if not found
+ or if @tool does not have executable permissions.
+ """
+ paths=os.getenv('PATH')
+ for p in paths.split(os.path.pathsep):
+ p = os.path.join(p, tool)
+ if os.access(p, os.X_OK):
+ return p
+ return None
+
+def is_readable_executable_file(path):
+ return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK)
+
+# @test: functional test to fail if @failure is seen
+# @vm: the VM whose console to process
+# @success: a non-None string to look for
+# @failure: a string to look for that triggers test failure, or None
+#
+# Read up to 1 line of text from @vm, looking for @success
+# and optionally @failure.
+#
+# If @success or @failure are seen, immediately return True,
+# even if end of line is not yet seen. ie remainder of the
+# line is left unread.
+#
+# If end of line is seen, with neither @success or @failure
+# return False
+#
+# If @failure is seen, then mark @test as failed
+def _console_read_line_until_match(test, vm, success, failure):
+ msg = bytes([])
+ done = False
+ while True:
+ c = vm.console_socket.recv(1)
+ if c is None:
+ done = True
+ test.fail(
+ f"EOF in console, expected '{success}'")
+ break
+ msg += c
+
+ if success in msg:
+ done = True
+ break
+ if failure and failure in msg:
+ done = True
+ vm.console_socket.close()
+ test.fail(
+ f"'{failure}' found in console, expected '{success}'")
+
+ if c == b'\n':
+ break
+
+ console_logger = logging.getLogger('console')
+ try:
+ console_logger.debug(msg.decode().strip())
+ except:
+ console_logger.debug(msg)
+
+ return done
+
+def _console_interaction(test, success_message, failure_message,
+ send_string, keep_sending=False, vm=None):
+ assert not keep_sending or send_string
+ assert success_message or send_string
+
+ if vm is None:
+ vm = test.vm
+
+ test.log.debug(
+ f"Console interaction: success_msg='{success_message}' " +
+ f"failure_msg='{failure_message}' send_string='{send_string}'")
+
+ # We'll process console in bytes, to avoid having to
+ # deal with unicode decode errors from receiving
+ # partial utf8 byte sequences
+ success_message_b = None
+ if success_message is not None:
+ success_message_b = success_message.encode()
+
+ failure_message_b = None
+ if failure_message is not None:
+ failure_message_b = failure_message.encode()
+
+ while True:
+ if send_string:
+ vm.console_socket.sendall(send_string.encode())
+ if not keep_sending:
+ send_string = None # send only once
+
+ # Only consume console output if waiting for something
+ if success_message is None:
+ if send_string is None:
+ break
+ continue
+
+ if _console_read_line_until_match(test, vm,
+ success_message_b,
+ failure_message_b):
+ break
+
+def interrupt_interactive_console_until_pattern(test, success_message,
+ failure_message=None,
+ interrupt_string='\r'):
+ """
+ Keep sending a string to interrupt a console prompt, while logging the
+ console output. Typical use case is to break a boot loader prompt, such:
+
+ Press a key within 5 seconds to interrupt boot process.
+ 5
+ 4
+ 3
+ 2
+ 1
+ Booting default image...
+
+ :param test: a test containing a VM that will have its console
+ read and probed for a success or failure message
+ :type test: :class:`qemu_test.QemuSystemTest`
+ :param success_message: if this message appears, test succeeds
+ :param failure_message: if this message appears, test fails
+ :param interrupt_string: a string to send to the console before trying
+ to read a new line
+ """
+ assert success_message
+ _console_interaction(test, success_message, failure_message,
+ interrupt_string, True)
+
+def wait_for_console_pattern(test, success_message, failure_message=None,
+ vm=None):
+ """
+ Waits for messages to appear on the console, while logging the content
+
+ :param test: a test containing a VM that will have its console
+ read and probed for a success or failure message
+ :type test: :class:`qemu_test.QemuSystemTest`
+ :param success_message: if this message appears, test succeeds
+ :param failure_message: if this message appears, test fails
+ """
+ assert success_message
+ _console_interaction(test, success_message, failure_message, None, vm=vm)
+
+def exec_command(test, command):
+ """
+ Send a command to a console (appending CRLF characters), while logging
+ the content.
+
+ :param test: a test containing a VM.
+ :type test: :class:`qemu_test.QemuSystemTest`
+ :param command: the command to send
+ :type command: str
+ """
+ _console_interaction(test, None, None, command + '\r')
+
+def exec_command_and_wait_for_pattern(test, command,
+ success_message, failure_message=None):
+ """
+ Send a command to a console (appending CRLF characters), then wait
+ for success_message to appear on the console, while logging the.
+ content. Mark the test as failed if failure_message is found instead.
+
+ :param test: a test containing a VM that will have its console
+ read and probed for a success or failure message
+ :type test: :class:`qemu_test.QemuSystemTest`
+ :param command: the command to send
+ :param success_message: if this message appears, test succeeds
+ :param failure_message: if this message appears, test fails
+ """
+ assert success_message
+ _console_interaction(test, success_message, failure_message, command + '\r')
+
+def get_qemu_img(test):
+ test.log.debug('Looking for and selecting a qemu-img binary')
+
+ # If qemu-img has been built, use it, otherwise the system wide one
+ # will be used.
+ qemu_img = test.build_file('qemu-img')
+ if os.path.exists(qemu_img):
+ return qemu_img
+ qemu_img = which('qemu-img')
+ if qemu_img is not None:
+ return qemu_img
+ test.skipTest(f"qemu-img not found in build dir or '$PATH'")
diff --git a/tests/functional/qemu_test/config.py b/tests/functional/qemu_test/config.py
new file mode 100644
index 0000000..6d4c9c3
--- /dev/null
+++ b/tests/functional/qemu_test/config.py
@@ -0,0 +1,48 @@
+# Test class and utilities for functional tests
+#
+# Copyright 2018, 2024 Red Hat, Inc.
+#
+# Original Author (Avocado-based tests):
+# Cleber Rosa <crosa@redhat.com>
+#
+# Adaption for standalone version:
+# Thomas Huth <thuth@redhat.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or
+# later. See the COPYING file in the top-level directory.
+
+import os
+from pathlib import Path
+import platform
+
+
+def _source_dir():
+ # Determine top-level directory of the QEMU sources
+ return Path(__file__).parent.parent.parent.parent
+
+def _build_dir():
+ root = os.getenv('QEMU_BUILD_ROOT')
+ if root is not None:
+ return Path(root)
+ # Makefile.mtest only exists in build dir, so if it is available, use CWD
+ if os.path.exists('Makefile.mtest'):
+ return Path(os.getcwd())
+
+ root = os.path.join(_source_dir(), 'build')
+ if os.path.exists(root):
+ return Path(root)
+
+ raise Exception("Cannot identify build dir, set QEMU_BUILD_ROOT")
+
+BUILD_DIR = _build_dir()
+
+def dso_suffix():
+ '''Return the dynamic libraries suffix for the current platform'''
+
+ if platform.system() == "Darwin":
+ return "dylib"
+
+ if platform.system() == "Windows":
+ return "dll"
+
+ return "so"
diff --git a/tests/functional/qemu_test/decorators.py b/tests/functional/qemu_test/decorators.py
new file mode 100644
index 0000000..c0d1567
--- /dev/null
+++ b/tests/functional/qemu_test/decorators.py
@@ -0,0 +1,151 @@
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+# Decorators useful in functional tests
+
+import importlib
+import os
+import platform
+import resource
+from unittest import skipIf, skipUnless
+
+from .cmd import which
+
+'''
+Decorator to skip execution of a test if the list
+of command binaries is not available in $PATH.
+Example:
+
+ @skipIfMissingCommands("mkisofs", "losetup")
+'''
+def skipIfMissingCommands(*args):
+ has_cmds = True
+ for cmd in args:
+ if not which(cmd):
+ has_cmds = False
+ break
+
+ return skipUnless(has_cmds, 'required command(s) "%s" not installed' %
+ ", ".join(args))
+
+'''
+Decorator to skip execution of a test if the current
+host operating system does match one of the prohibited
+ones.
+Example
+
+ @skipIfOperatingSystem("Linux", "Darwin")
+'''
+def skipIfOperatingSystem(*args):
+ return skipIf(platform.system() in args,
+ 'running on an OS (%s) that is not able to run this test' %
+ ", ".join(args))
+
+'''
+Decorator to skip execution of a test if the current
+host machine does not match one of the permitted
+machines.
+Example
+
+ @skipIfNotMachine("x86_64", "aarch64")
+'''
+def skipIfNotMachine(*args):
+ return skipUnless(platform.machine() in args,
+ 'not running on one of the required machine(s) "%s"' %
+ ", ".join(args))
+
+'''
+Decorator to skip execution of flaky tests, unless
+the $QEMU_TEST_FLAKY_TESTS environment variable is set.
+A bug URL must be provided that documents the observed
+failure behaviour, so it can be tracked & re-evaluated
+in future.
+
+Historical tests may be providing "None" as the bug_url
+but this should not be done for new test.
+
+Example:
+
+ @skipFlakyTest("https://gitlab.com/qemu-project/qemu/-/issues/NNN")
+'''
+def skipFlakyTest(bug_url):
+ if bug_url is None:
+ bug_url = "FIXME: reproduce flaky test and file bug report or remove"
+ return skipUnless(os.getenv('QEMU_TEST_FLAKY_TESTS'),
+ f'Test is unstable: {bug_url}')
+
+'''
+Decorator to skip execution of tests which are likely
+to execute untrusted commands on the host, or commands
+which process untrusted code, unless the
+$QEMU_TEST_ALLOW_UNTRUSTED_CODE env var is set.
+Example:
+
+ @skipUntrustedTest()
+'''
+def skipUntrustedTest():
+ return skipUnless(os.getenv('QEMU_TEST_ALLOW_UNTRUSTED_CODE'),
+ 'Test runs untrusted code / processes untrusted data')
+
+'''
+Decorator to skip execution of tests which need large
+data storage (over around 500MB-1GB mark) on the host,
+unless the $QEMU_TEST_ALLOW_LARGE_STORAGE environment
+variable is set
+
+Example:
+
+ @skipBigDataTest()
+'''
+def skipBigDataTest():
+ return skipUnless(os.getenv('QEMU_TEST_ALLOW_LARGE_STORAGE'),
+ 'Test requires large host storage space')
+
+'''
+Decorator to skip execution of tests which have a really long
+runtime (and might e.g. time out if QEMU has been compiled with
+debugging enabled) unless the $QEMU_TEST_ALLOW_SLOW
+environment variable is set
+
+Example:
+
+ @skipSlowTest()
+'''
+def skipSlowTest():
+ return skipUnless(os.getenv('QEMU_TEST_ALLOW_SLOW'),
+ 'Test has a very long runtime and might time out')
+
+'''
+Decorator to skip execution of a test if the list
+of python imports is not available.
+Example:
+
+ @skipIfMissingImports("numpy", "cv2")
+'''
+def skipIfMissingImports(*args):
+ has_imports = True
+ for impname in args:
+ try:
+ importlib.import_module(impname)
+ except ImportError:
+ has_imports = False
+ break
+
+ return skipUnless(has_imports, 'required import(s) "%s" not installed' %
+ ", ".join(args))
+
+'''
+Decorator to skip execution of a test if the system's
+locked memory limit is below the required threshold.
+Takes required locked memory threshold in kB.
+Example:
+
+ @skipLockedMemoryTest(2_097_152)
+'''
+def skipLockedMemoryTest(locked_memory):
+ # get memlock hard limit in bytes
+ _, ulimit_memory = resource.getrlimit(resource.RLIMIT_MEMLOCK)
+
+ return skipUnless(
+ ulimit_memory == resource.RLIM_INFINITY or ulimit_memory >= locked_memory * 1024,
+ f'Test required {locked_memory} kB of available locked memory',
+ )
diff --git a/tests/functional/qemu_test/linuxkernel.py b/tests/functional/qemu_test/linuxkernel.py
new file mode 100644
index 0000000..2aca0ee
--- /dev/null
+++ b/tests/functional/qemu_test/linuxkernel.py
@@ -0,0 +1,52 @@
+# Test class for testing the boot process of a Linux kernel
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or
+# later. See the COPYING file in the top-level directory.
+
+import hashlib
+import urllib.request
+
+from .cmd import wait_for_console_pattern, exec_command_and_wait_for_pattern
+from .testcase import QemuSystemTest
+from .utils import get_usernet_hostfwd_port
+
+
+class LinuxKernelTest(QemuSystemTest):
+ KERNEL_COMMON_COMMAND_LINE = 'printk.time=0 '
+
+ def wait_for_console_pattern(self, success_message, vm=None):
+ wait_for_console_pattern(self, success_message,
+ failure_message='Kernel panic - not syncing',
+ vm=vm)
+
+ def launch_kernel(self, kernel, initrd=None, dtb=None, console_index=0,
+ wait_for=None):
+ self.vm.set_console(console_index=console_index)
+ self.vm.add_args('-kernel', kernel)
+ if initrd:
+ self.vm.add_args('-initrd', initrd)
+ if dtb:
+ self.vm.add_args('-dtb', dtb)
+ self.vm.launch()
+ if wait_for:
+ self.wait_for_console_pattern(wait_for)
+
+ def check_http_download(self, filename, hashsum, guestport=8080,
+ pythoncmd='python3 -m http.server'):
+ exec_command_and_wait_for_pattern(self,
+ f'{pythoncmd} {guestport} & sleep 1',
+ f'Serving HTTP on 0.0.0.0 port {guestport}')
+ hl = hashlib.sha256()
+ hostport = get_usernet_hostfwd_port(self.vm)
+ url = f'http://localhost:{hostport}{filename}'
+ self.log.info(f'Downloading {url} ...')
+ with urllib.request.urlopen(url) as response:
+ while True:
+ chunk = response.read(1 << 20)
+ if not chunk:
+ break
+ hl.update(chunk)
+
+ digest = hl.hexdigest()
+ self.log.info(f'sha256sum of download is {digest}.')
+ self.assertEqual(digest, hashsum)
diff --git a/tests/functional/qemu_test/ports.py b/tests/functional/qemu_test/ports.py
new file mode 100644
index 0000000..631b77a
--- /dev/null
+++ b/tests/functional/qemu_test/ports.py
@@ -0,0 +1,55 @@
+#!/usr/bin/env python3
+#
+# Simple functional tests for VNC functionality
+#
+# Copyright 2018, 2024 Red Hat, Inc.
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or
+# later. See the COPYING file in the top-level directory.
+
+import fcntl
+import os
+import socket
+
+from .config import BUILD_DIR
+from typing import List
+
+
+class Ports():
+
+ PORTS_ADDR = '127.0.0.1'
+ PORTS_RANGE_SIZE = 1024
+ PORTS_START = 49152 + ((os.getpid() * PORTS_RANGE_SIZE) % 16384)
+ PORTS_END = PORTS_START + PORTS_RANGE_SIZE
+
+ def __enter__(self):
+ lock_file = os.path.join(BUILD_DIR, "tests", "functional", "port_lock")
+ self.lock_fh = os.open(lock_file, os.O_CREAT)
+ fcntl.flock(self.lock_fh, fcntl.LOCK_EX)
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ fcntl.flock(self.lock_fh, fcntl.LOCK_UN)
+ os.close(self.lock_fh)
+
+ def check_bind(self, port: int) -> bool:
+ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
+ try:
+ sock.bind((self.PORTS_ADDR, port))
+ except OSError:
+ return False
+
+ return True
+
+ def find_free_ports(self, count: int) -> List[int]:
+ result = []
+ for port in range(self.PORTS_START, self.PORTS_END):
+ if self.check_bind(port):
+ result.append(port)
+ if len(result) >= count:
+ break
+ assert len(result) == count
+ return result
+
+ def find_free_port(self) -> int:
+ return self.find_free_ports(1)[0]
diff --git a/tests/functional/qemu_test/tesseract.py b/tests/functional/qemu_test/tesseract.py
new file mode 100644
index 0000000..ede6c65
--- /dev/null
+++ b/tests/functional/qemu_test/tesseract.py
@@ -0,0 +1,25 @@
+# ...
+#
+# Copyright (c) 2019 Philippe Mathieu-Daudé <f4bug@amsat.org>
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or
+# later. See the COPYING file in the top-level directory.
+
+import logging
+from subprocess import run
+
+
+def tesseract_ocr(image_path, tesseract_args=''):
+ console_logger = logging.getLogger('console')
+ console_logger.debug(image_path)
+ proc = run(['tesseract', image_path, 'stdout'],
+ capture_output=True, encoding='utf8')
+ if proc.returncode:
+ return None
+ lines = []
+ for line in proc.stdout.split('\n'):
+ sline = line.strip()
+ if len(sline):
+ console_logger.debug(sline)
+ lines += [sline]
+ return lines
diff --git a/tests/functional/qemu_test/testcase.py b/tests/functional/qemu_test/testcase.py
new file mode 100644
index 0000000..2082c6f
--- /dev/null
+++ b/tests/functional/qemu_test/testcase.py
@@ -0,0 +1,402 @@
+# Test class and utilities for functional tests
+#
+# Copyright 2018, 2024 Red Hat, Inc.
+#
+# Original Author (Avocado-based tests):
+# Cleber Rosa <crosa@redhat.com>
+#
+# Adaption for standalone version:
+# Thomas Huth <thuth@redhat.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or
+# later. See the COPYING file in the top-level directory.
+
+import logging
+import os
+from pathlib import Path
+import pycotap
+import shutil
+from subprocess import run
+import sys
+import tempfile
+import unittest
+import uuid
+
+from qemu.machine import QEMUMachine
+from qemu.utils import hvf_available, kvm_available, tcg_available
+
+from .archive import archive_extract
+from .asset import Asset
+from .config import BUILD_DIR, dso_suffix
+from .uncompress import uncompress
+
+
+class QemuBaseTest(unittest.TestCase):
+
+ '''
+ @params compressed: filename, Asset, or file-like object to uncompress
+ @params format: optional compression format (gzip, lzma)
+
+ Uncompresses @compressed into the scratch directory.
+
+ If @format is None, heuristics will be applied to guess the format
+ from the filename or Asset URL. @format must be non-None if @uncompressed
+ is a file-like object.
+
+ Returns the fully qualified path to the uncompressed file
+ '''
+ def uncompress(self, compressed, format=None):
+ self.log.debug(f"Uncompress {compressed} format={format}")
+ if type(compressed) == Asset:
+ compressed.fetch()
+
+ (name, ext) = os.path.splitext(str(compressed))
+ uncompressed = self.scratch_file(os.path.basename(name))
+
+ uncompress(compressed, uncompressed, format)
+
+ return uncompressed
+
+ '''
+ @params archive: filename, Asset, or file-like object to extract
+ @params format: optional archive format (tar, zip, deb, cpio)
+ @params sub_dir: optional sub-directory to extract into
+ @params member: optional member file to limit extraction to
+
+ Extracts @archive into the scratch directory, or a directory beneath
+ named by @sub_dir. All files are extracted unless @member specifies
+ a limit.
+
+ If @format is None, heuristics will be applied to guess the format
+ from the filename or Asset URL. @format must be non-None if @archive
+ is a file-like object.
+
+ If @member is non-None, returns the fully qualified path to @member
+ '''
+ def archive_extract(self, archive, format=None, sub_dir=None, member=None):
+ self.log.debug(f"Extract {archive} format={format}" +
+ f"sub_dir={sub_dir} member={member}")
+ if type(archive) == Asset:
+ archive.fetch()
+ if sub_dir is None:
+ archive_extract(archive, self.scratch_file(), format, member)
+ else:
+ archive_extract(archive, self.scratch_file(sub_dir),
+ format, member)
+
+ if member is not None:
+ return self.scratch_file(member)
+ return None
+
+ '''
+ Create a temporary directory suitable for storing UNIX
+ socket paths.
+
+ Returns: a tempfile.TemporaryDirectory instance
+ '''
+ def socket_dir(self):
+ if self.socketdir is None:
+ self.socketdir = tempfile.TemporaryDirectory(
+ prefix="qemu_func_test_sock_")
+ return self.socketdir
+
+ '''
+ @params args list of zero or more subdirectories or file
+
+ Construct a path for accessing a data file located
+ relative to the source directory that is the root for
+ functional tests.
+
+ @args may be an empty list to reference the root dir
+ itself, may be a single element to reference a file in
+ the root directory, or may be multiple elements to
+ reference a file nested below. The path components
+ will be joined using the platform appropriate path
+ separator.
+
+ Returns: string representing a file path
+ '''
+ def data_file(self, *args):
+ return str(Path(Path(__file__).parent.parent, *args))
+
+ '''
+ @params args list of zero or more subdirectories or file
+
+ Construct a path for accessing a data file located
+ relative to the build directory root.
+
+ @args may be an empty list to reference the build dir
+ itself, may be a single element to reference a file in
+ the build directory, or may be multiple elements to
+ reference a file nested below. The path components
+ will be joined using the platform appropriate path
+ separator.
+
+ Returns: string representing a file path
+ '''
+ def build_file(self, *args):
+ return str(Path(BUILD_DIR, *args))
+
+ '''
+ @params args list of zero or more subdirectories or file
+
+ Construct a path for accessing/creating a scratch file
+ located relative to a temporary directory dedicated to
+ this test case. The directory and its contents will be
+ purged upon completion of the test.
+
+ @args may be an empty list to reference the scratch dir
+ itself, may be a single element to reference a file in
+ the scratch directory, or may be multiple elements to
+ reference a file nested below. The path components
+ will be joined using the platform appropriate path
+ separator.
+
+ Returns: string representing a file path
+ '''
+ def scratch_file(self, *args):
+ return str(Path(self.workdir, *args))
+
+ '''
+ @params args list of zero or more subdirectories or file
+
+ Construct a path for accessing/creating a log file
+ located relative to a temporary directory dedicated to
+ this test case. The directory and its log files will be
+ preserved upon completion of the test.
+
+ @args may be an empty list to reference the log dir
+ itself, may be a single element to reference a file in
+ the log directory, or may be multiple elements to
+ reference a file nested below. The path components
+ will be joined using the platform appropriate path
+ separator.
+
+ Returns: string representing a file path
+ '''
+ def log_file(self, *args):
+ return str(Path(self.outputdir, *args))
+
+ '''
+ @params plugin name
+
+ Return the full path to the plugin taking into account any host OS
+ specific suffixes.
+ '''
+ def plugin_file(self, plugin_name):
+ sfx = dso_suffix()
+ return os.path.join('tests', 'tcg', 'plugins', f'{plugin_name}.{sfx}')
+
+ def assets_available(self):
+ for name, asset in vars(self.__class__).items():
+ if name.startswith("ASSET_") and type(asset) == Asset:
+ if not asset.available():
+ self.log.debug(f"Asset {asset.url} not available")
+ return False
+ return True
+
+ def setUp(self):
+ self.qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY')
+ self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set')
+ self.arch = self.qemu_bin.split('-')[-1]
+ self.socketdir = None
+
+ self.outputdir = self.build_file('tests', 'functional',
+ self.arch, self.id())
+ self.workdir = os.path.join(self.outputdir, 'scratch')
+ os.makedirs(self.workdir, exist_ok=True)
+
+ self.log_filename = self.log_file('base.log')
+ self.log = logging.getLogger('qemu-test')
+ self.log.setLevel(logging.DEBUG)
+ self._log_fh = logging.FileHandler(self.log_filename, mode='w')
+ self._log_fh.setLevel(logging.DEBUG)
+ fileFormatter = logging.Formatter(
+ '%(asctime)s - %(levelname)s: %(message)s')
+ self._log_fh.setFormatter(fileFormatter)
+ self.log.addHandler(self._log_fh)
+
+ # Capture QEMUMachine logging
+ self.machinelog = logging.getLogger('qemu.machine')
+ self.machinelog.setLevel(logging.DEBUG)
+ self.machinelog.addHandler(self._log_fh)
+
+ if not self.assets_available():
+ self.skipTest('One or more assets is not available')
+
+ def tearDown(self):
+ if "QEMU_TEST_KEEP_SCRATCH" not in os.environ:
+ shutil.rmtree(self.workdir)
+ if self.socketdir is not None:
+ shutil.rmtree(self.socketdir.name)
+ self.socketdir = None
+ self.machinelog.removeHandler(self._log_fh)
+ self.log.removeHandler(self._log_fh)
+
+ def main():
+ path = os.path.basename(sys.argv[0])[:-3]
+
+ cache = os.environ.get("QEMU_TEST_PRECACHE", None)
+ if cache is not None:
+ Asset.precache_suites(path, cache)
+ return
+
+ tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError,
+ test_output_log = pycotap.LogMode.LogToError)
+ res = unittest.main(module = None, testRunner = tr, exit = False,
+ argv=["__dummy__", path])
+ for (test, message) in res.result.errors + res.result.failures:
+
+ if hasattr(test, "log_filename"):
+ print('More information on ' + test.id() + ' could be found here:'
+ '\n %s' % test.log_filename, file=sys.stderr)
+ if hasattr(test, 'console_log_name'):
+ print(' %s' % test.console_log_name, file=sys.stderr)
+ sys.exit(not res.result.wasSuccessful())
+
+
+class QemuUserTest(QemuBaseTest):
+
+ def setUp(self):
+ super().setUp()
+ self._ldpath = []
+
+ def add_ldpath(self, ldpath):
+ self._ldpath.append(os.path.abspath(ldpath))
+
+ def run_cmd(self, bin_path, args=[]):
+ return run([self.qemu_bin]
+ + ["-L %s" % ldpath for ldpath in self._ldpath]
+ + [bin_path]
+ + args,
+ text=True, capture_output=True)
+
+class QemuSystemTest(QemuBaseTest):
+ """Facilitates system emulation tests."""
+
+ cpu = None
+ machine = None
+ _machinehelp = None
+
+ def setUp(self):
+ self._vms = {}
+
+ super().setUp()
+
+ console_log = logging.getLogger('console')
+ console_log.setLevel(logging.DEBUG)
+ self.console_log_name = self.log_file('console.log')
+ self._console_log_fh = logging.FileHandler(self.console_log_name,
+ mode='w')
+ self._console_log_fh.setLevel(logging.DEBUG)
+ fileFormatter = logging.Formatter('%(asctime)s: %(message)s')
+ self._console_log_fh.setFormatter(fileFormatter)
+ console_log.addHandler(self._console_log_fh)
+
+ def set_machine(self, machinename):
+ # TODO: We should use QMP to get the list of available machines
+ if not self._machinehelp:
+ self._machinehelp = run(
+ [self.qemu_bin, '-M', 'help'],
+ capture_output=True, check=True, encoding='utf8').stdout
+ if self._machinehelp.find(machinename) < 0:
+ self.skipTest('no support for machine ' + machinename)
+ self.machine = machinename
+
+ def require_accelerator(self, accelerator):
+ """
+ Requires an accelerator to be available for the test to continue
+
+ It takes into account the currently set qemu binary.
+
+ If the check fails, the test is canceled. If the check itself
+ for the given accelerator is not available, the test is also
+ canceled.
+
+ :param accelerator: name of the accelerator, such as "kvm" or "tcg"
+ :type accelerator: str
+ """
+ checker = {'tcg': tcg_available,
+ 'kvm': kvm_available,
+ 'hvf': hvf_available,
+ }.get(accelerator)
+ if checker is None:
+ self.skipTest("Don't know how to check for the presence "
+ "of accelerator %s" % accelerator)
+ if not checker(qemu_bin=self.qemu_bin):
+ self.skipTest("%s accelerator does not seem to be "
+ "available" % accelerator)
+
+ def require_netdev(self, netdevname):
+ help = run([self.qemu_bin,
+ '-M', 'none', '-netdev', 'help'],
+ capture_output=True, check=True, encoding='utf8').stdout;
+ if help.find('\n' + netdevname + '\n') < 0:
+ self.skipTest('no support for " + netdevname + " networking')
+
+ def require_device(self, devicename):
+ help = run([self.qemu_bin,
+ '-M', 'none', '-device', 'help'],
+ capture_output=True, check=True, encoding='utf8').stdout;
+ if help.find(devicename) < 0:
+ self.skipTest('no support for device ' + devicename)
+
+ def _new_vm(self, name, *args):
+ vm = QEMUMachine(self.qemu_bin,
+ name=name,
+ base_temp_dir=self.workdir,
+ log_dir=self.log_file())
+ self.log.debug('QEMUMachine "%s" created', name)
+ self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
+
+ sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None)
+ if sockpath is not None:
+ vm.add_args("-chardev",
+ f"socket,id=backdoor,path={sockpath},server=on,wait=off",
+ "-mon", "chardev=backdoor,mode=control")
+
+ if args:
+ vm.add_args(*args)
+ return vm
+
+ @property
+ def vm(self):
+ return self.get_vm(name='default')
+
+ def get_vm(self, *args, name=None):
+ if not name:
+ name = str(uuid.uuid4())
+ if self._vms.get(name) is None:
+ self._vms[name] = self._new_vm(name, *args)
+ if self.cpu is not None:
+ self._vms[name].add_args('-cpu', self.cpu)
+ if self.machine is not None:
+ self._vms[name].set_machine(self.machine)
+ return self._vms[name]
+
+ def set_vm_arg(self, arg, value):
+ """
+ Set an argument to list of extra arguments to be given to the QEMU
+ binary. If the argument already exists then its value is replaced.
+
+ :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
+ :type arg: str
+ :param value: the argument value, such as "host" in "-cpu host"
+ :type value: str
+ """
+ if not arg or not value:
+ return
+ if arg not in self.vm.args:
+ self.vm.args.extend([arg, value])
+ else:
+ idx = self.vm.args.index(arg) + 1
+ if idx < len(self.vm.args):
+ self.vm.args[idx] = value
+ else:
+ self.vm.args.append(value)
+
+ def tearDown(self):
+ for vm in self._vms.values():
+ vm.shutdown()
+ logging.getLogger('console').removeHandler(self._console_log_fh)
+ super().tearDown()
diff --git a/tests/functional/qemu_test/tuxruntest.py b/tests/functional/qemu_test/tuxruntest.py
new file mode 100644
index 0000000..6c442ff
--- /dev/null
+++ b/tests/functional/qemu_test/tuxruntest.py
@@ -0,0 +1,136 @@
+# Functional test that boots known good tuxboot images the same way
+# that tuxrun (www.tuxrun.org) does. This tool is used by things like
+# the LKFT project to run regression tests on kernels.
+#
+# Copyright (c) 2023 Linaro Ltd.
+#
+# Author:
+# Alex Bennée <alex.bennee@linaro.org>
+#
+# SPDX-License-Identifier: GPL-2.0-or-later
+
+import os
+
+from qemu_test import QemuSystemTest
+from qemu_test import exec_command_and_wait_for_pattern
+from qemu_test import wait_for_console_pattern
+from qemu_test import which, get_qemu_img
+
+class TuxRunBaselineTest(QemuSystemTest):
+
+ KERNEL_COMMON_COMMAND_LINE = 'printk.time=0'
+ # Tests are ~10-40s, allow for --debug/--enable-gcov overhead
+ timeout = 100
+
+ def setUp(self):
+ super().setUp()
+
+ # We need zstd for all the tuxrun tests
+ if which('zstd') is None:
+ self.skipTest("zstd not found in $PATH")
+
+ # Pre-init TuxRun specific settings: Most machines work with
+ # reasonable defaults but we sometimes need to tweak the
+ # config. To avoid open coding everything we store all these
+ # details in the metadata for each test.
+
+ # The tuxboot tag matches the root directory
+ self.tuxboot = self.arch
+
+ # Most Linux's use ttyS0 for their serial port
+ self.console = "ttyS0"
+
+ # Does the machine shutdown QEMU nicely on "halt"
+ self.wait_for_shutdown = True
+
+ self.root = "vda"
+
+ # Occasionally we need extra devices to hook things up
+ self.extradev = None
+
+ self.qemu_img = get_qemu_img(self)
+
+ def wait_for_console_pattern(self, success_message, vm=None):
+ wait_for_console_pattern(self, success_message,
+ failure_message='Kernel panic - not syncing',
+ vm=vm)
+
+ def fetch_tuxrun_assets(self, kernel_asset, rootfs_asset, dtb_asset=None):
+ """
+ Fetch the TuxBoot assets.
+ """
+ kernel_image = kernel_asset.fetch()
+ disk_image = self.uncompress(rootfs_asset)
+ dtb = dtb_asset.fetch() if dtb_asset is not None else None
+
+ return (kernel_image, disk_image, dtb)
+
+ def prepare_run(self, kernel, disk, drive, dtb=None, console_index=0):
+ """
+ Setup to run and add the common parameters to the system
+ """
+ self.vm.set_console(console_index=console_index)
+
+ # all block devices are raw ext4's
+ blockdev = "driver=raw,file.driver=file," \
+ + f"file.filename={disk},node-name=hd0"
+
+ self.kcmd_line = self.KERNEL_COMMON_COMMAND_LINE
+ self.kcmd_line += f" root=/dev/{self.root}"
+ self.kcmd_line += f" console={self.console}"
+
+ self.vm.add_args('-kernel', kernel,
+ '-append', self.kcmd_line,
+ '-blockdev', blockdev)
+
+ # Sometimes we need extra devices attached
+ if self.extradev:
+ self.vm.add_args('-device', self.extradev)
+
+ self.vm.add_args('-device',
+ f"{drive},drive=hd0")
+
+ # Some machines need an explicit DTB
+ if dtb:
+ self.vm.add_args('-dtb', dtb)
+
+ def run_tuxtest_tests(self, haltmsg):
+ """
+ Wait for the system to boot up, wait for the login prompt and
+ then do a few things on the console. Trigger a shutdown and
+ wait to exit cleanly.
+ """
+ ps1='root@tuxtest:~#'
+ self.wait_for_console_pattern(self.kcmd_line)
+ self.wait_for_console_pattern('tuxtest login:')
+ exec_command_and_wait_for_pattern(self, 'root', ps1)
+ exec_command_and_wait_for_pattern(self, 'cat /proc/interrupts', ps1)
+ exec_command_and_wait_for_pattern(self, 'cat /proc/self/maps', ps1)
+ exec_command_and_wait_for_pattern(self, 'uname -a', ps1)
+ exec_command_and_wait_for_pattern(self, 'halt', haltmsg)
+
+ # Wait for VM to shut down gracefully if it can
+ if self.wait_for_shutdown:
+ self.vm.wait()
+ else:
+ self.vm.shutdown()
+
+ def common_tuxrun(self,
+ kernel_asset,
+ rootfs_asset,
+ dtb_asset=None,
+ drive="virtio-blk-device",
+ haltmsg="reboot: System halted",
+ console_index=0):
+ """
+ Common path for LKFT tests. Unless we need to do something
+ special with the command line we can process most things using
+ the tag metadata.
+ """
+ (kernel, disk, dtb) = self.fetch_tuxrun_assets(kernel_asset, rootfs_asset,
+ dtb_asset)
+
+ self.prepare_run(kernel, disk, drive, dtb, console_index)
+ self.vm.launch()
+ self.run_tuxtest_tests(haltmsg)
+ os.remove(disk)
diff --git a/tests/functional/qemu_test/uncompress.py b/tests/functional/qemu_test/uncompress.py
new file mode 100644
index 0000000..b7ef8f7
--- /dev/null
+++ b/tests/functional/qemu_test/uncompress.py
@@ -0,0 +1,107 @@
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+# Utilities for python-based QEMU tests
+#
+# Copyright 2024 Red Hat, Inc.
+#
+# Authors:
+# Thomas Huth <thuth@redhat.com>
+
+import gzip
+import lzma
+import os
+import stat
+import shutil
+from urllib.parse import urlparse
+from subprocess import run, CalledProcessError
+
+from .asset import Asset
+
+
+def gzip_uncompress(gz_path, output_path):
+ if os.path.exists(output_path):
+ return
+ with gzip.open(gz_path, 'rb') as gz_in:
+ try:
+ with open(output_path, 'wb') as raw_out:
+ shutil.copyfileobj(gz_in, raw_out)
+ except:
+ os.remove(output_path)
+ raise
+
+def lzma_uncompress(xz_path, output_path):
+ if os.path.exists(output_path):
+ return
+ with lzma.open(xz_path, 'rb') as lzma_in:
+ try:
+ with open(output_path, 'wb') as raw_out:
+ shutil.copyfileobj(lzma_in, raw_out)
+ except:
+ os.remove(output_path)
+ raise
+
+
+def zstd_uncompress(zstd_path, output_path):
+ if os.path.exists(output_path):
+ return
+
+ try:
+ run(['zstd', "-f", "-d", zstd_path,
+ "-o", output_path], capture_output=True, check=True)
+ except CalledProcessError as e:
+ os.remove(output_path)
+ raise Exception(
+ f"Unable to decompress zstd file {zstd_path} with {e}") from e
+
+ # zstd copies source archive permissions for the output
+ # file, so must make this writable for QEMU
+ os.chmod(output_path, stat.S_IRUSR | stat.S_IWUSR)
+
+
+'''
+@params compressed: filename, Asset, or file-like object to uncompress
+@params uncompressed: filename to uncompress into
+@params format: optional compression format (gzip, lzma)
+
+Uncompresses @compressed into @uncompressed
+
+If @format is None, heuristics will be applied to guess the format
+from the filename or Asset URL. @format must be non-None if @uncompressed
+is a file-like object.
+
+Returns the fully qualified path to the uncompessed file
+'''
+def uncompress(compressed, uncompressed, format=None):
+ if format is None:
+ format = guess_uncompress_format(compressed)
+
+ if format == "xz":
+ lzma_uncompress(str(compressed), uncompressed)
+ elif format == "gz":
+ gzip_uncompress(str(compressed), uncompressed)
+ elif format == "zstd":
+ zstd_uncompress(str(compressed), uncompressed)
+ else:
+ raise Exception(f"Unknown compression format {format}")
+
+'''
+@params compressed: filename, Asset, or file-like object to guess
+
+Guess the format of @compressed, raising an exception if
+no format can be determined
+'''
+def guess_uncompress_format(compressed):
+ if type(compressed) == Asset:
+ compressed = urlparse(compressed.url).path
+ elif type(compressed) != str:
+ raise Exception(f"Unable to guess compression cformat for {compressed}")
+
+ (name, ext) = os.path.splitext(compressed)
+ if ext == ".xz":
+ return "xz"
+ elif ext == ".gz":
+ return "gz"
+ elif ext in [".zstd", ".zst"]:
+ return 'zstd'
+ else:
+ raise Exception(f"Unknown compression format for {compressed}")
diff --git a/tests/functional/qemu_test/utils.py b/tests/functional/qemu_test/utils.py
new file mode 100644
index 0000000..e7c8de8
--- /dev/null
+++ b/tests/functional/qemu_test/utils.py
@@ -0,0 +1,39 @@
+# Utilities for python-based QEMU tests
+#
+# Copyright 2024 Red Hat, Inc.
+#
+# Authors:
+# Thomas Huth <thuth@redhat.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or
+# later. See the COPYING file in the top-level directory.
+
+import os
+
+from qemu.utils import get_info_usernet_hostfwd_port
+
+
+def get_usernet_hostfwd_port(vm):
+ res = vm.cmd('human-monitor-command', command_line='info usernet')
+ return get_info_usernet_hostfwd_port(res)
+
+"""
+Round up to next power of 2
+"""
+def pow2ceil(x):
+ return 1 if x == 0 else 2**(x - 1).bit_length()
+
+def file_truncate(path, size):
+ if size != os.path.getsize(path):
+ with open(path, 'ab+') as fd:
+ fd.truncate(size)
+
+"""
+Expand file size to next power of 2
+"""
+def image_pow2ceil_expand(path):
+ size = os.path.getsize(path)
+ size_aligned = pow2ceil(size)
+ if size != size_aligned:
+ with open(path, 'ab+') as fd:
+ fd.truncate(size_aligned)