diff options
author | Peter Maydell <peter.maydell@linaro.org> | 2024-09-05 18:01:51 +0100 |
---|---|---|
committer | Peter Maydell <peter.maydell@linaro.org> | 2024-09-05 18:01:51 +0100 |
commit | eabebca69b7fca7cf85f6cd39ac58a7f04986b47 (patch) | |
tree | 5e7bcc92e509008cc5b761fa3fbb08e69075825c /tests/functional/qemu_test | |
parent | 7b87a25f49a301d3377f3e71e0b4a62540c6f6e4 (diff) | |
parent | c3e24cff2b27d63ac4b56ac6d38ef1ae3a27d92f (diff) | |
download | qemu-eabebca69b7fca7cf85f6cd39ac58a7f04986b47.zip qemu-eabebca69b7fca7cf85f6cd39ac58a7f04986b47.tar.gz qemu-eabebca69b7fca7cf85f6cd39ac58a7f04986b47.tar.bz2 |
Merge tag 'pull-request-2024-09-04' of https://gitlab.com/thuth/qemu into staging
* Bump Avocado to version 103
* Introduce new functional test framework for Python-based tests
* Convert many Avocado tests to the new functional test framework
# -----BEGIN PGP SIGNATURE-----
#
# iQJFBAABCAAvFiEEJ7iIR+7gJQEY8+q5LtnXdP5wLbUFAmbYOEsRHHRodXRoQHJl
# ZGhhdC5jb20ACgkQLtnXdP5wLbUDAA/+Kdlak/nCrK5gXDyDasfy86IxgMD0QlDR
# U0MOpQyfXbM2EJjwCUhmgo8pui8qV23dKzfCwbDmkjB7mJ+yKi2ZdiFEp6onq/ke
# aAdaaZwENtWcFglRD80TOSQX6oyeNmE/PuvJGG0BfwWXyyhaEa6kCdytEPORipQs
# lZ+ZndHgXtcM3roXtgI3kp2V1nY5LLCJ044UrasKRq2xWfD/Ken90uWP5/nMLV7f
# 7YLRUIb0sgV7IdjZiT1UkXJZRB7MatV7+OsojYbG8BPbQEvXqpryXMIeygHVR9a0
# yxNDUpTZR6JoS1IaLKkHh1mTM+L1JpFltKadKkXa0zqJHHSur7Tp0xVO/GeqCek4
# 9N8K4zw2CoO/AKmN8JjW5i4GnMrFMdcvxxNwLdRoVgYt4YA731wnHrbosXZOXcuv
# H0z8Tm6ueKvfBtrQErdvqsGrP/8FUYRqZP4H6XaaC+wEis++7OmVR2nlQ/gAyr6/
# mMJtmxqVHCIcEVjDu1jYltrW3BN2CcxN2M9gxyOScq2/Xmzqtaeb4iyjxeCUjIBW
# Pc4LXlSafIg3hPrdH3EKN275ev8cx/5jp8oEgXD5We25Mj3W930zde6/STXoX318
# NVNlbrIQjGjQN7rN5oxTFxTlIN8ax2tuuzpQDFvS/4bLyMYXcZ4I5gUrM5tvWTGv
# +0UN45pJ7Nk=
# =l6Ki
# -----END PGP SIGNATURE-----
# gpg: Signature made Wed 04 Sep 2024 11:36:59 BST
# gpg: using RSA key 27B88847EEE0250118F3EAB92ED9D774FE702DB5
# gpg: issuer "thuth@redhat.com"
# gpg: Good signature from "Thomas Huth <th.huth@gmx.de>" [full]
# gpg: aka "Thomas Huth <thuth@redhat.com>" [full]
# gpg: aka "Thomas Huth <huth@tuxfamily.org>" [full]
# gpg: aka "Thomas Huth <th.huth@posteo.de>" [unknown]
# Primary key fingerprint: 27B8 8847 EEE0 2501 18F3 EAB9 2ED9 D774 FE70 2DB5
* tag 'pull-request-2024-09-04' of https://gitlab.com/thuth/qemu: (42 commits)
docs/devel/testing: Add documentation for functional tests
docs/devel/testing: Rename avocado_qemu.Test class
docs/devel/testing: Split the Avocado documentation into a separate file
docs/devel: Split testing docs from the build docs and move to separate folder
gitlab-ci: Add "check-functional" to the build tests
tests/avocado: Remove unused QemuUserTest class
tests/functional: Convert ARM bFLT linux-user avocado test
tests/functional: Add QemuUserTest class
tests/functional: Convert mips64el Fuloong2e avocado test (1/2)
tests/functional: Convert Aarch64 Virt machine avocado tests
tests/functional: Convert Aarch64 SBSA-Ref avocado tests
tests/functional: Convert ARM Integrator/CP avocado tests
tests/functional: Convert the linux_initrd avocado test into a standalone test
tests/functional: Convert the rx_gdbsim avocado test into a standalone test
tests/functional: Convert the acpi-bits test into a standalone test
tests/functional: Convert the m68k nextcube test with tesseract
tests/functional: Convert the ppc_hv avocado test into a standalone test
tests/functional: Convert the ppc_amiga avocado test into a standalone test
tests/functional: Convert most ppc avocado tests into standalone tests
tests/functional: Convert the virtio_gpu avocado test into a standalone test
...
Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
Diffstat (limited to 'tests/functional/qemu_test')
-rw-r--r-- | tests/functional/qemu_test/__init__.py | 14 | ||||
-rw-r--r-- | tests/functional/qemu_test/asset.py | 171 | ||||
-rw-r--r-- | tests/functional/qemu_test/cmd.py | 193 | ||||
-rw-r--r-- | tests/functional/qemu_test/config.py | 36 | ||||
-rw-r--r-- | tests/functional/qemu_test/tesseract.py | 35 | ||||
-rw-r--r-- | tests/functional/qemu_test/testcase.py | 202 | ||||
-rw-r--r-- | tests/functional/qemu_test/utils.py | 56 |
7 files changed, 707 insertions, 0 deletions
diff --git a/tests/functional/qemu_test/__init__.py b/tests/functional/qemu_test/__init__.py new file mode 100644 index 0000000..f33282e --- /dev/null +++ b/tests/functional/qemu_test/__init__.py @@ -0,0 +1,14 @@ +# Test class and utilities for functional tests +# +# Copyright 2024 Red Hat, Inc. +# +# This work is licensed under the terms of the GNU GPL, version 2 or +# later. See the COPYING file in the top-level directory. + + +from .asset import Asset +from .config import BUILD_DIR +from .cmd import has_cmd, has_cmds, run_cmd, is_readable_executable_file, \ + interrupt_interactive_console_until_pattern, wait_for_console_pattern, \ + exec_command, exec_command_and_wait_for_pattern, get_qemu_img +from .testcase import QemuBaseTest, QemuUserTest, QemuSystemTest diff --git a/tests/functional/qemu_test/asset.py b/tests/functional/qemu_test/asset.py new file mode 100644 index 0000000..d3be2af --- /dev/null +++ b/tests/functional/qemu_test/asset.py @@ -0,0 +1,171 @@ +# Test utilities for fetching & caching assets +# +# Copyright 2024 Red Hat, Inc. +# +# This work is licensed under the terms of the GNU GPL, version 2 or +# later. See the COPYING file in the top-level directory. + +import hashlib +import logging +import os +import subprocess +import sys +import unittest +import urllib.request +from time import sleep +from pathlib import Path +from shutil import copyfileobj + + +# Instances of this class must be declared as class level variables +# starting with a name "ASSET_". This enables the pre-caching logic +# to easily find all referenced assets and download them prior to +# execution of the tests. +class Asset: + + def __init__(self, url, hashsum): + self.url = url + self.hash = hashsum + cache_dir_env = os.getenv('QEMU_TEST_CACHE_DIR') + if cache_dir_env: + self.cache_dir = Path(cache_dir_env, "download") + else: + self.cache_dir = Path(Path("~").expanduser(), + ".cache", "qemu", "download") + self.cache_file = Path(self.cache_dir, hashsum) + self.log = logging.getLogger('qemu-test') + + def __repr__(self): + return "Asset: url=%s hash=%s cache=%s" % ( + self.url, self.hash, self.cache_file) + + def _check(self, cache_file): + if self.hash is None: + return True + if len(self.hash) == 64: + sum_prog = 'sha256sum' + elif len(self.hash) == 128: + sum_prog = 'sha512sum' + else: + raise Exception("unknown hash type") + + checksum = subprocess.check_output( + [sum_prog, str(cache_file)]).split()[0] + return self.hash == checksum.decode("utf-8") + + def valid(self): + return self.cache_file.exists() and self._check(self.cache_file) + + def _wait_for_other_download(self, tmp_cache_file): + # Another thread already seems to download the asset, so wait until + # it is done, while also checking the size to see whether it is stuck + try: + current_size = tmp_cache_file.stat().st_size + new_size = current_size + except: + if os.path.exists(self.cache_file): + return True + raise + waittime = lastchange = 600 + while waittime > 0: + sleep(1) + waittime -= 1 + try: + new_size = tmp_cache_file.stat().st_size + except: + if os.path.exists(self.cache_file): + return True + raise + if new_size != current_size: + lastchange = waittime + current_size = new_size + elif lastchange - waittime > 90: + return False + + self.log.debug("Time out while waiting for %s!", tmp_cache_file) + raise + + def fetch(self): + if not self.cache_dir.exists(): + self.cache_dir.mkdir(parents=True, exist_ok=True) + + if self.valid(): + self.log.debug("Using cached asset %s for %s", + self.cache_file, self.url) + return str(self.cache_file) + + if os.environ.get("QEMU_TEST_NO_DOWNLOAD", False): + raise Exception("Asset cache is invalid and downloads disabled") + + self.log.info("Downloading %s to %s...", self.url, self.cache_file) + tmp_cache_file = self.cache_file.with_suffix(".download") + + for retries in range(3): + try: + with tmp_cache_file.open("xb") as dst: + with urllib.request.urlopen(self.url) as resp: + copyfileobj(resp, dst) + break + except FileExistsError: + self.log.debug("%s already exists, " + "waiting for other thread to finish...", + tmp_cache_file) + if self._wait_for_other_download(tmp_cache_file): + return str(self.cache_file) + self.log.debug("%s seems to be stale, " + "deleting and retrying download...", + tmp_cache_file) + tmp_cache_file.unlink() + continue + except Exception as e: + self.log.error("Unable to download %s: %s", self.url, e) + tmp_cache_file.unlink() + raise + + try: + # Set these just for informational purposes + os.setxattr(str(tmp_cache_file), "user.qemu-asset-url", + self.url.encode('utf8')) + os.setxattr(str(tmp_cache_file), "user.qemu-asset-hash", + self.hash.encode('utf8')) + except Exception as e: + self.log.debug("Unable to set xattr on %s: %s", tmp_cache_file, e) + pass + + if not self._check(tmp_cache_file): + tmp_cache_file.unlink() + raise Exception("Hash of %s does not match %s" % + (self.url, self.hash)) + tmp_cache_file.replace(self.cache_file) + + self.log.info("Cached %s at %s" % (self.url, self.cache_file)) + return str(self.cache_file) + + def precache_test(test): + log = logging.getLogger('qemu-test') + log.setLevel(logging.DEBUG) + handler = logging.StreamHandler(sys.stdout) + handler.setLevel(logging.DEBUG) + formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s') + handler.setFormatter(formatter) + log.addHandler(handler) + for name, asset in vars(test.__class__).items(): + if name.startswith("ASSET_") and type(asset) == Asset: + log.info("Attempting to cache '%s'" % asset) + asset.fetch() + log.removeHandler(handler) + + def precache_suite(suite): + for test in suite: + if isinstance(test, unittest.TestSuite): + Asset.precache_suite(test) + elif isinstance(test, unittest.TestCase): + Asset.precache_test(test) + + def precache_suites(path, cacheTstamp): + loader = unittest.loader.defaultTestLoader + tests = loader.loadTestsFromNames([path], None) + + with open(cacheTstamp, "w") as fh: + Asset.precache_suite(tests) diff --git a/tests/functional/qemu_test/cmd.py b/tests/functional/qemu_test/cmd.py new file mode 100644 index 0000000..3acd617 --- /dev/null +++ b/tests/functional/qemu_test/cmd.py @@ -0,0 +1,193 @@ +# Test class and utilities for functional tests +# +# Copyright 2018, 2024 Red Hat, Inc. +# +# Original Author (Avocado-based tests): +# Cleber Rosa <crosa@redhat.com> +# +# Adaption for standalone version: +# Thomas Huth <thuth@redhat.com> +# +# This work is licensed under the terms of the GNU GPL, version 2 or +# later. See the COPYING file in the top-level directory. + +import logging +import os +import os.path +import subprocess + +from .config import BUILD_DIR + + +def has_cmd(name, args=None): + """ + This function is for use in a @skipUnless decorator, e.g.: + + @skipUnless(*has_cmd('sudo -n', ('sudo', '-n', 'true'))) + def test_something_that_needs_sudo(self): + ... + """ + + if args is None: + args = ('which', name) + + try: + _, stderr, exitcode = run_cmd(args) + except Exception as e: + exitcode = -1 + stderr = str(e) + + if exitcode != 0: + cmd_line = ' '.join(args) + err = f'{name} required, but "{cmd_line}" failed: {stderr.strip()}' + return (False, err) + else: + return (True, '') + +def has_cmds(*cmds): + """ + This function is for use in a @skipUnless decorator and + allows checking for the availability of multiple commands, e.g.: + + @skipUnless(*has_cmds(('cmd1', ('cmd1', '--some-parameter')), + 'cmd2', 'cmd3')) + def test_something_that_needs_cmd1_and_cmd2(self): + ... + """ + + for cmd in cmds: + if isinstance(cmd, str): + cmd = (cmd,) + + ok, errstr = has_cmd(*cmd) + if not ok: + return (False, errstr) + + return (True, '') + +def run_cmd(args): + subp = subprocess.Popen(args, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True) + stdout, stderr = subp.communicate() + ret = subp.returncode + + return (stdout, stderr, ret) + +def is_readable_executable_file(path): + return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK) + +def _console_interaction(test, success_message, failure_message, + send_string, keep_sending=False, vm=None): + assert not keep_sending or send_string + if vm is None: + vm = test.vm + console = vm.console_file + console_logger = logging.getLogger('console') + while True: + if send_string: + vm.console_socket.sendall(send_string.encode()) + if not keep_sending: + send_string = None # send only once + + # Only consume console output if waiting for something + if success_message is None and failure_message is None: + if send_string is None: + break + continue + + try: + msg = console.readline().decode().strip() + except UnicodeDecodeError: + msg = None + if not msg: + continue + console_logger.debug(msg) + if success_message is None or success_message in msg: + break + if failure_message and failure_message in msg: + console.close() + fail = 'Failure message found in console: "%s". Expected: "%s"' % \ + (failure_message, success_message) + test.fail(fail) + +def interrupt_interactive_console_until_pattern(test, success_message, + failure_message=None, + interrupt_string='\r'): + """ + Keep sending a string to interrupt a console prompt, while logging the + console output. Typical use case is to break a boot loader prompt, such: + + Press a key within 5 seconds to interrupt boot process. + 5 + 4 + 3 + 2 + 1 + Booting default image... + + :param test: a test containing a VM that will have its console + read and probed for a success or failure message + :type test: :class:`qemu_test.QemuSystemTest` + :param success_message: if this message appears, test succeeds + :param failure_message: if this message appears, test fails + :param interrupt_string: a string to send to the console before trying + to read a new line + """ + _console_interaction(test, success_message, failure_message, + interrupt_string, True) + +def wait_for_console_pattern(test, success_message, failure_message=None, + vm=None): + """ + Waits for messages to appear on the console, while logging the content + + :param test: a test containing a VM that will have its console + read and probed for a success or failure message + :type test: :class:`qemu_test.QemuSystemTest` + :param success_message: if this message appears, test succeeds + :param failure_message: if this message appears, test fails + """ + _console_interaction(test, success_message, failure_message, None, vm=vm) + +def exec_command(test, command): + """ + Send a command to a console (appending CRLF characters), while logging + the content. + + :param test: a test containing a VM. + :type test: :class:`qemu_test.QemuSystemTest` + :param command: the command to send + :type command: str + """ + _console_interaction(test, None, None, command + '\r') + +def exec_command_and_wait_for_pattern(test, command, + success_message, failure_message=None): + """ + Send a command to a console (appending CRLF characters), then wait + for success_message to appear on the console, while logging the. + content. Mark the test as failed if failure_message is found instead. + + :param test: a test containing a VM that will have its console + read and probed for a success or failure message + :type test: :class:`qemu_test.QemuSystemTest` + :param command: the command to send + :param success_message: if this message appears, test succeeds + :param failure_message: if this message appears, test fails + """ + _console_interaction(test, success_message, failure_message, command + '\r') + +def get_qemu_img(test): + test.log.debug('Looking for and selecting a qemu-img binary') + + # If qemu-img has been built, use it, otherwise the system wide one + # will be used. + qemu_img = os.path.join(BUILD_DIR, 'qemu-img') + if os.path.exists(qemu_img): + return qemu_img + if has_cmd('qemu-img'): + return 'qemu-img' + test.skipTest('Could not find "qemu-img", which is required to ' + 'create temporary images') diff --git a/tests/functional/qemu_test/config.py b/tests/functional/qemu_test/config.py new file mode 100644 index 0000000..edd75b7 --- /dev/null +++ b/tests/functional/qemu_test/config.py @@ -0,0 +1,36 @@ +# Test class and utilities for functional tests +# +# Copyright 2018, 2024 Red Hat, Inc. +# +# Original Author (Avocado-based tests): +# Cleber Rosa <crosa@redhat.com> +# +# Adaption for standalone version: +# Thomas Huth <thuth@redhat.com> +# +# This work is licensed under the terms of the GNU GPL, version 2 or +# later. See the COPYING file in the top-level directory. + +import os +from pathlib import Path + + +def _source_dir(): + # Determine top-level directory of the QEMU sources + return Path(__file__).parent.parent.parent.parent + +def _build_dir(): + root = os.getenv('QEMU_BUILD_ROOT') + if root is not None: + return Path(root) + # Makefile.mtest only exists in build dir, so if it is available, use CWD + if os.path.exists('Makefile.mtest'): + return Path(os.getcwd()) + + root = os.path.join(_source_dir(), 'build') + if os.path.exists(root): + return Path(root) + + raise Exception("Cannot identify build dir, set QEMU_BUILD_ROOT") + +BUILD_DIR = _build_dir() diff --git a/tests/functional/qemu_test/tesseract.py b/tests/functional/qemu_test/tesseract.py new file mode 100644 index 0000000..c4087b7 --- /dev/null +++ b/tests/functional/qemu_test/tesseract.py @@ -0,0 +1,35 @@ +# ... +# +# Copyright (c) 2019 Philippe Mathieu-Daudé <f4bug@amsat.org> +# +# This work is licensed under the terms of the GNU GPL, version 2 or +# later. See the COPYING file in the top-level directory. + +import re +import logging + +from . import has_cmd, run_cmd + +def tesseract_available(expected_version): + if not has_cmd('tesseract'): + return False + (stdout, stderr, ret) = run_cmd([ 'tesseract', '--version']) + if ret: + return False + version = stdout.split()[1] + return int(version.split('.')[0]) >= expected_version + +def tesseract_ocr(image_path, tesseract_args=''): + console_logger = logging.getLogger('console') + console_logger.debug(image_path) + (stdout, stderr, ret) = run_cmd(['tesseract', image_path, + 'stdout']) + if ret: + return None + lines = [] + for line in stdout.split('\n'): + sline = line.strip() + if len(sline): + console_logger.debug(sline) + lines += [sline] + return lines diff --git a/tests/functional/qemu_test/testcase.py b/tests/functional/qemu_test/testcase.py new file mode 100644 index 0000000..aa01462 --- /dev/null +++ b/tests/functional/qemu_test/testcase.py @@ -0,0 +1,202 @@ +# Test class and utilities for functional tests +# +# Copyright 2018, 2024 Red Hat, Inc. +# +# Original Author (Avocado-based tests): +# Cleber Rosa <crosa@redhat.com> +# +# Adaption for standalone version: +# Thomas Huth <thuth@redhat.com> +# +# This work is licensed under the terms of the GNU GPL, version 2 or +# later. See the COPYING file in the top-level directory. + +import logging +import os +import subprocess +import pycotap +import sys +import unittest +import uuid + +from qemu.machine import QEMUMachine +from qemu.utils import kvm_available, tcg_available + +from .asset import Asset +from .cmd import run_cmd +from .config import BUILD_DIR + + +class QemuBaseTest(unittest.TestCase): + + qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY') + arch = None + + workdir = None + log = None + logdir = None + + def setUp(self, bin_prefix): + self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set') + self.arch = self.qemu_bin.split('-')[-1] + + self.workdir = os.path.join(BUILD_DIR, 'tests/functional', self.arch, + self.id()) + os.makedirs(self.workdir, exist_ok=True) + + self.logdir = self.workdir + self.log = logging.getLogger('qemu-test') + self.log.setLevel(logging.DEBUG) + self._log_fh = logging.FileHandler(os.path.join(self.logdir, + 'base.log'), mode='w') + self._log_fh.setLevel(logging.DEBUG) + fileFormatter = logging.Formatter( + '%(asctime)s - %(levelname)s: %(message)s') + self._log_fh.setFormatter(fileFormatter) + self.log.addHandler(self._log_fh) + + def tearDown(self): + self.log.removeHandler(self._log_fh) + + def main(): + path = os.path.basename(sys.argv[0])[:-3] + + cache = os.environ.get("QEMU_TEST_PRECACHE", None) + if cache is not None: + Asset.precache_suites(path, cache) + return + + tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError, + test_output_log = pycotap.LogMode.LogToError) + unittest.main(module = None, testRunner = tr, argv=["__dummy__", path]) + + +class QemuUserTest(QemuBaseTest): + + def setUp(self): + super().setUp('qemu-') + self._ldpath = [] + + def add_ldpath(self, ldpath): + self._ldpath.append(os.path.abspath(ldpath)) + + def run_cmd(self, bin_path, args=[]): + return subprocess.run([self.qemu_bin] + + ["-L %s" % ldpath for ldpath in self._ldpath] + + [bin_path] + + args, + text=True, capture_output=True) + +class QemuSystemTest(QemuBaseTest): + """Facilitates system emulation tests.""" + + cpu = None + machine = None + _machinehelp = None + + def setUp(self): + self._vms = {} + + super().setUp('qemu-system-') + + console_log = logging.getLogger('console') + console_log.setLevel(logging.DEBUG) + self._console_log_fh = logging.FileHandler(os.path.join(self.workdir, + 'console.log'), mode='w') + self._console_log_fh.setLevel(logging.DEBUG) + fileFormatter = logging.Formatter('%(asctime)s: %(message)s') + self._console_log_fh.setFormatter(fileFormatter) + console_log.addHandler(self._console_log_fh) + + def set_machine(self, machinename): + # TODO: We should use QMP to get the list of available machines + if not self._machinehelp: + self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0]; + if self._machinehelp.find(machinename) < 0: + self.skipTest('no support for machine ' + machinename) + self.machine = machinename + + def require_accelerator(self, accelerator): + """ + Requires an accelerator to be available for the test to continue + + It takes into account the currently set qemu binary. + + If the check fails, the test is canceled. If the check itself + for the given accelerator is not available, the test is also + canceled. + + :param accelerator: name of the accelerator, such as "kvm" or "tcg" + :type accelerator: str + """ + checker = {'tcg': tcg_available, + 'kvm': kvm_available}.get(accelerator) + if checker is None: + self.skipTest("Don't know how to check for the presence " + "of accelerator %s" % accelerator) + if not checker(qemu_bin=self.qemu_bin): + self.skipTest("%s accelerator does not seem to be " + "available" % accelerator) + + def require_netdev(self, netdevname): + netdevhelp = run_cmd([self.qemu_bin, + '-M', 'none', '-netdev', 'help'])[0]; + if netdevhelp.find('\n' + netdevname + '\n') < 0: + self.skipTest('no support for " + netdevname + " networking') + + def require_device(self, devicename): + devhelp = run_cmd([self.qemu_bin, + '-M', 'none', '-device', 'help'])[0]; + if devhelp.find(devicename) < 0: + self.skipTest('no support for device ' + devicename) + + def _new_vm(self, name, *args): + vm = QEMUMachine(self.qemu_bin, base_temp_dir=self.workdir) + self.log.debug('QEMUMachine "%s" created', name) + self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir) + self.log.debug('QEMUMachine "%s" log_dir: %s', name, vm.log_dir) + if args: + vm.add_args(*args) + return vm + + @property + def vm(self): + return self.get_vm(name='default') + + def get_vm(self, *args, name=None): + if not name: + name = str(uuid.uuid4()) + if self._vms.get(name) is None: + self._vms[name] = self._new_vm(name, *args) + if self.cpu is not None: + self._vms[name].add_args('-cpu', self.cpu) + if self.machine is not None: + self._vms[name].set_machine(self.machine) + return self._vms[name] + + def set_vm_arg(self, arg, value): + """ + Set an argument to list of extra arguments to be given to the QEMU + binary. If the argument already exists then its value is replaced. + + :param arg: the QEMU argument, such as "-cpu" in "-cpu host" + :type arg: str + :param value: the argument value, such as "host" in "-cpu host" + :type value: str + """ + if not arg or not value: + return + if arg not in self.vm.args: + self.vm.args.extend([arg, value]) + else: + idx = self.vm.args.index(arg) + 1 + if idx < len(self.vm.args): + self.vm.args[idx] = value + else: + self.vm.args.append(value) + + def tearDown(self): + for vm in self._vms.values(): + vm.shutdown() + logging.getLogger('console').removeHandler(self._console_log_fh) + super().tearDown() diff --git a/tests/functional/qemu_test/utils.py b/tests/functional/qemu_test/utils.py new file mode 100644 index 0000000..2a1cb60 --- /dev/null +++ b/tests/functional/qemu_test/utils.py @@ -0,0 +1,56 @@ +# Utilities for python-based QEMU tests +# +# Copyright 2024 Red Hat, Inc. +# +# Authors: +# Thomas Huth <thuth@redhat.com> +# +# This work is licensed under the terms of the GNU GPL, version 2 or +# later. See the COPYING file in the top-level directory. + +import gzip +import lzma +import os +import shutil +import subprocess +import tarfile + +def archive_extract(archive, dest_dir, member=None): + with tarfile.open(archive) as tf: + if hasattr(tarfile, 'data_filter'): + tf.extraction_filter = getattr(tarfile, 'data_filter', + (lambda member, path: member)) + if member: + tf.extract(member=member, path=dest_dir) + else: + tf.extractall(path=dest_dir) + +def gzip_uncompress(gz_path, output_path): + if os.path.exists(output_path): + return + with gzip.open(gz_path, 'rb') as gz_in: + try: + with open(output_path, 'wb') as raw_out: + shutil.copyfileobj(gz_in, raw_out) + except: + os.remove(output_path) + raise + +def lzma_uncompress(xz_path, output_path): + if os.path.exists(output_path): + return + with lzma.open(xz_path, 'rb') as lzma_in: + try: + with open(output_path, 'wb') as raw_out: + shutil.copyfileobj(lzma_in, raw_out) + except: + os.remove(output_path) + raise + +def cpio_extract(cpio_handle, output_path): + cwd = os.getcwd() + os.chdir(output_path) + subprocess.run(['cpio', '-i'], + input=cpio_handle.read(), + stderr=subprocess.DEVNULL) + os.chdir(cwd) |