aboutsummaryrefslogtreecommitdiff
path: root/tests/functional/qemu_test
diff options
context:
space:
mode:
authorPeter Maydell <peter.maydell@linaro.org>2024-09-05 18:01:51 +0100
committerPeter Maydell <peter.maydell@linaro.org>2024-09-05 18:01:51 +0100
commiteabebca69b7fca7cf85f6cd39ac58a7f04986b47 (patch)
tree5e7bcc92e509008cc5b761fa3fbb08e69075825c /tests/functional/qemu_test
parent7b87a25f49a301d3377f3e71e0b4a62540c6f6e4 (diff)
parentc3e24cff2b27d63ac4b56ac6d38ef1ae3a27d92f (diff)
downloadqemu-eabebca69b7fca7cf85f6cd39ac58a7f04986b47.zip
qemu-eabebca69b7fca7cf85f6cd39ac58a7f04986b47.tar.gz
qemu-eabebca69b7fca7cf85f6cd39ac58a7f04986b47.tar.bz2
Merge tag 'pull-request-2024-09-04' of https://gitlab.com/thuth/qemu into staging
* Bump Avocado to version 103 * Introduce new functional test framework for Python-based tests * Convert many Avocado tests to the new functional test framework # -----BEGIN PGP SIGNATURE----- # # iQJFBAABCAAvFiEEJ7iIR+7gJQEY8+q5LtnXdP5wLbUFAmbYOEsRHHRodXRoQHJl # ZGhhdC5jb20ACgkQLtnXdP5wLbUDAA/+Kdlak/nCrK5gXDyDasfy86IxgMD0QlDR # U0MOpQyfXbM2EJjwCUhmgo8pui8qV23dKzfCwbDmkjB7mJ+yKi2ZdiFEp6onq/ke # aAdaaZwENtWcFglRD80TOSQX6oyeNmE/PuvJGG0BfwWXyyhaEa6kCdytEPORipQs # lZ+ZndHgXtcM3roXtgI3kp2V1nY5LLCJ044UrasKRq2xWfD/Ken90uWP5/nMLV7f # 7YLRUIb0sgV7IdjZiT1UkXJZRB7MatV7+OsojYbG8BPbQEvXqpryXMIeygHVR9a0 # yxNDUpTZR6JoS1IaLKkHh1mTM+L1JpFltKadKkXa0zqJHHSur7Tp0xVO/GeqCek4 # 9N8K4zw2CoO/AKmN8JjW5i4GnMrFMdcvxxNwLdRoVgYt4YA731wnHrbosXZOXcuv # H0z8Tm6ueKvfBtrQErdvqsGrP/8FUYRqZP4H6XaaC+wEis++7OmVR2nlQ/gAyr6/ # mMJtmxqVHCIcEVjDu1jYltrW3BN2CcxN2M9gxyOScq2/Xmzqtaeb4iyjxeCUjIBW # Pc4LXlSafIg3hPrdH3EKN275ev8cx/5jp8oEgXD5We25Mj3W930zde6/STXoX318 # NVNlbrIQjGjQN7rN5oxTFxTlIN8ax2tuuzpQDFvS/4bLyMYXcZ4I5gUrM5tvWTGv # +0UN45pJ7Nk= # =l6Ki # -----END PGP SIGNATURE----- # gpg: Signature made Wed 04 Sep 2024 11:36:59 BST # gpg: using RSA key 27B88847EEE0250118F3EAB92ED9D774FE702DB5 # gpg: issuer "thuth@redhat.com" # gpg: Good signature from "Thomas Huth <th.huth@gmx.de>" [full] # gpg: aka "Thomas Huth <thuth@redhat.com>" [full] # gpg: aka "Thomas Huth <huth@tuxfamily.org>" [full] # gpg: aka "Thomas Huth <th.huth@posteo.de>" [unknown] # Primary key fingerprint: 27B8 8847 EEE0 2501 18F3 EAB9 2ED9 D774 FE70 2DB5 * tag 'pull-request-2024-09-04' of https://gitlab.com/thuth/qemu: (42 commits) docs/devel/testing: Add documentation for functional tests docs/devel/testing: Rename avocado_qemu.Test class docs/devel/testing: Split the Avocado documentation into a separate file docs/devel: Split testing docs from the build docs and move to separate folder gitlab-ci: Add "check-functional" to the build tests tests/avocado: Remove unused QemuUserTest class tests/functional: Convert ARM bFLT linux-user avocado test tests/functional: Add QemuUserTest class tests/functional: Convert mips64el Fuloong2e avocado test (1/2) tests/functional: Convert Aarch64 Virt machine avocado tests tests/functional: Convert Aarch64 SBSA-Ref avocado tests tests/functional: Convert ARM Integrator/CP avocado tests tests/functional: Convert the linux_initrd avocado test into a standalone test tests/functional: Convert the rx_gdbsim avocado test into a standalone test tests/functional: Convert the acpi-bits test into a standalone test tests/functional: Convert the m68k nextcube test with tesseract tests/functional: Convert the ppc_hv avocado test into a standalone test tests/functional: Convert the ppc_amiga avocado test into a standalone test tests/functional: Convert most ppc avocado tests into standalone tests tests/functional: Convert the virtio_gpu avocado test into a standalone test ... Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
Diffstat (limited to 'tests/functional/qemu_test')
-rw-r--r--tests/functional/qemu_test/__init__.py14
-rw-r--r--tests/functional/qemu_test/asset.py171
-rw-r--r--tests/functional/qemu_test/cmd.py193
-rw-r--r--tests/functional/qemu_test/config.py36
-rw-r--r--tests/functional/qemu_test/tesseract.py35
-rw-r--r--tests/functional/qemu_test/testcase.py202
-rw-r--r--tests/functional/qemu_test/utils.py56
7 files changed, 707 insertions, 0 deletions
diff --git a/tests/functional/qemu_test/__init__.py b/tests/functional/qemu_test/__init__.py
new file mode 100644
index 0000000..f33282e
--- /dev/null
+++ b/tests/functional/qemu_test/__init__.py
@@ -0,0 +1,14 @@
+# Test class and utilities for functional tests
+#
+# Copyright 2024 Red Hat, Inc.
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or
+# later. See the COPYING file in the top-level directory.
+
+
+from .asset import Asset
+from .config import BUILD_DIR
+from .cmd import has_cmd, has_cmds, run_cmd, is_readable_executable_file, \
+ interrupt_interactive_console_until_pattern, wait_for_console_pattern, \
+ exec_command, exec_command_and_wait_for_pattern, get_qemu_img
+from .testcase import QemuBaseTest, QemuUserTest, QemuSystemTest
diff --git a/tests/functional/qemu_test/asset.py b/tests/functional/qemu_test/asset.py
new file mode 100644
index 0000000..d3be2af
--- /dev/null
+++ b/tests/functional/qemu_test/asset.py
@@ -0,0 +1,171 @@
+# Test utilities for fetching & caching assets
+#
+# Copyright 2024 Red Hat, Inc.
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or
+# later. See the COPYING file in the top-level directory.
+
+import hashlib
+import logging
+import os
+import subprocess
+import sys
+import unittest
+import urllib.request
+from time import sleep
+from pathlib import Path
+from shutil import copyfileobj
+
+
+# Instances of this class must be declared as class level variables
+# starting with a name "ASSET_". This enables the pre-caching logic
+# to easily find all referenced assets and download them prior to
+# execution of the tests.
+class Asset:
+
+ def __init__(self, url, hashsum):
+ self.url = url
+ self.hash = hashsum
+ cache_dir_env = os.getenv('QEMU_TEST_CACHE_DIR')
+ if cache_dir_env:
+ self.cache_dir = Path(cache_dir_env, "download")
+ else:
+ self.cache_dir = Path(Path("~").expanduser(),
+ ".cache", "qemu", "download")
+ self.cache_file = Path(self.cache_dir, hashsum)
+ self.log = logging.getLogger('qemu-test')
+
+ def __repr__(self):
+ return "Asset: url=%s hash=%s cache=%s" % (
+ self.url, self.hash, self.cache_file)
+
+ def _check(self, cache_file):
+ if self.hash is None:
+ return True
+ if len(self.hash) == 64:
+ sum_prog = 'sha256sum'
+ elif len(self.hash) == 128:
+ sum_prog = 'sha512sum'
+ else:
+ raise Exception("unknown hash type")
+
+ checksum = subprocess.check_output(
+ [sum_prog, str(cache_file)]).split()[0]
+ return self.hash == checksum.decode("utf-8")
+
+ def valid(self):
+ return self.cache_file.exists() and self._check(self.cache_file)
+
+ def _wait_for_other_download(self, tmp_cache_file):
+ # Another thread already seems to download the asset, so wait until
+ # it is done, while also checking the size to see whether it is stuck
+ try:
+ current_size = tmp_cache_file.stat().st_size
+ new_size = current_size
+ except:
+ if os.path.exists(self.cache_file):
+ return True
+ raise
+ waittime = lastchange = 600
+ while waittime > 0:
+ sleep(1)
+ waittime -= 1
+ try:
+ new_size = tmp_cache_file.stat().st_size
+ except:
+ if os.path.exists(self.cache_file):
+ return True
+ raise
+ if new_size != current_size:
+ lastchange = waittime
+ current_size = new_size
+ elif lastchange - waittime > 90:
+ return False
+
+ self.log.debug("Time out while waiting for %s!", tmp_cache_file)
+ raise
+
+ def fetch(self):
+ if not self.cache_dir.exists():
+ self.cache_dir.mkdir(parents=True, exist_ok=True)
+
+ if self.valid():
+ self.log.debug("Using cached asset %s for %s",
+ self.cache_file, self.url)
+ return str(self.cache_file)
+
+ if os.environ.get("QEMU_TEST_NO_DOWNLOAD", False):
+ raise Exception("Asset cache is invalid and downloads disabled")
+
+ self.log.info("Downloading %s to %s...", self.url, self.cache_file)
+ tmp_cache_file = self.cache_file.with_suffix(".download")
+
+ for retries in range(3):
+ try:
+ with tmp_cache_file.open("xb") as dst:
+ with urllib.request.urlopen(self.url) as resp:
+ copyfileobj(resp, dst)
+ break
+ except FileExistsError:
+ self.log.debug("%s already exists, "
+ "waiting for other thread to finish...",
+ tmp_cache_file)
+ if self._wait_for_other_download(tmp_cache_file):
+ return str(self.cache_file)
+ self.log.debug("%s seems to be stale, "
+ "deleting and retrying download...",
+ tmp_cache_file)
+ tmp_cache_file.unlink()
+ continue
+ except Exception as e:
+ self.log.error("Unable to download %s: %s", self.url, e)
+ tmp_cache_file.unlink()
+ raise
+
+ try:
+ # Set these just for informational purposes
+ os.setxattr(str(tmp_cache_file), "user.qemu-asset-url",
+ self.url.encode('utf8'))
+ os.setxattr(str(tmp_cache_file), "user.qemu-asset-hash",
+ self.hash.encode('utf8'))
+ except Exception as e:
+ self.log.debug("Unable to set xattr on %s: %s", tmp_cache_file, e)
+ pass
+
+ if not self._check(tmp_cache_file):
+ tmp_cache_file.unlink()
+ raise Exception("Hash of %s does not match %s" %
+ (self.url, self.hash))
+ tmp_cache_file.replace(self.cache_file)
+
+ self.log.info("Cached %s at %s" % (self.url, self.cache_file))
+ return str(self.cache_file)
+
+ def precache_test(test):
+ log = logging.getLogger('qemu-test')
+ log.setLevel(logging.DEBUG)
+ handler = logging.StreamHandler(sys.stdout)
+ handler.setLevel(logging.DEBUG)
+ formatter = logging.Formatter(
+ '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+ handler.setFormatter(formatter)
+ log.addHandler(handler)
+ for name, asset in vars(test.__class__).items():
+ if name.startswith("ASSET_") and type(asset) == Asset:
+ log.info("Attempting to cache '%s'" % asset)
+ asset.fetch()
+ log.removeHandler(handler)
+
+ def precache_suite(suite):
+ for test in suite:
+ if isinstance(test, unittest.TestSuite):
+ Asset.precache_suite(test)
+ elif isinstance(test, unittest.TestCase):
+ Asset.precache_test(test)
+
+ def precache_suites(path, cacheTstamp):
+ loader = unittest.loader.defaultTestLoader
+ tests = loader.loadTestsFromNames([path], None)
+
+ with open(cacheTstamp, "w") as fh:
+ Asset.precache_suite(tests)
diff --git a/tests/functional/qemu_test/cmd.py b/tests/functional/qemu_test/cmd.py
new file mode 100644
index 0000000..3acd617
--- /dev/null
+++ b/tests/functional/qemu_test/cmd.py
@@ -0,0 +1,193 @@
+# Test class and utilities for functional tests
+#
+# Copyright 2018, 2024 Red Hat, Inc.
+#
+# Original Author (Avocado-based tests):
+# Cleber Rosa <crosa@redhat.com>
+#
+# Adaption for standalone version:
+# Thomas Huth <thuth@redhat.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or
+# later. See the COPYING file in the top-level directory.
+
+import logging
+import os
+import os.path
+import subprocess
+
+from .config import BUILD_DIR
+
+
+def has_cmd(name, args=None):
+ """
+ This function is for use in a @skipUnless decorator, e.g.:
+
+ @skipUnless(*has_cmd('sudo -n', ('sudo', '-n', 'true')))
+ def test_something_that_needs_sudo(self):
+ ...
+ """
+
+ if args is None:
+ args = ('which', name)
+
+ try:
+ _, stderr, exitcode = run_cmd(args)
+ except Exception as e:
+ exitcode = -1
+ stderr = str(e)
+
+ if exitcode != 0:
+ cmd_line = ' '.join(args)
+ err = f'{name} required, but "{cmd_line}" failed: {stderr.strip()}'
+ return (False, err)
+ else:
+ return (True, '')
+
+def has_cmds(*cmds):
+ """
+ This function is for use in a @skipUnless decorator and
+ allows checking for the availability of multiple commands, e.g.:
+
+ @skipUnless(*has_cmds(('cmd1', ('cmd1', '--some-parameter')),
+ 'cmd2', 'cmd3'))
+ def test_something_that_needs_cmd1_and_cmd2(self):
+ ...
+ """
+
+ for cmd in cmds:
+ if isinstance(cmd, str):
+ cmd = (cmd,)
+
+ ok, errstr = has_cmd(*cmd)
+ if not ok:
+ return (False, errstr)
+
+ return (True, '')
+
+def run_cmd(args):
+ subp = subprocess.Popen(args,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ universal_newlines=True)
+ stdout, stderr = subp.communicate()
+ ret = subp.returncode
+
+ return (stdout, stderr, ret)
+
+def is_readable_executable_file(path):
+ return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK)
+
+def _console_interaction(test, success_message, failure_message,
+ send_string, keep_sending=False, vm=None):
+ assert not keep_sending or send_string
+ if vm is None:
+ vm = test.vm
+ console = vm.console_file
+ console_logger = logging.getLogger('console')
+ while True:
+ if send_string:
+ vm.console_socket.sendall(send_string.encode())
+ if not keep_sending:
+ send_string = None # send only once
+
+ # Only consume console output if waiting for something
+ if success_message is None and failure_message is None:
+ if send_string is None:
+ break
+ continue
+
+ try:
+ msg = console.readline().decode().strip()
+ except UnicodeDecodeError:
+ msg = None
+ if not msg:
+ continue
+ console_logger.debug(msg)
+ if success_message is None or success_message in msg:
+ break
+ if failure_message and failure_message in msg:
+ console.close()
+ fail = 'Failure message found in console: "%s". Expected: "%s"' % \
+ (failure_message, success_message)
+ test.fail(fail)
+
+def interrupt_interactive_console_until_pattern(test, success_message,
+ failure_message=None,
+ interrupt_string='\r'):
+ """
+ Keep sending a string to interrupt a console prompt, while logging the
+ console output. Typical use case is to break a boot loader prompt, such:
+
+ Press a key within 5 seconds to interrupt boot process.
+ 5
+ 4
+ 3
+ 2
+ 1
+ Booting default image...
+
+ :param test: a test containing a VM that will have its console
+ read and probed for a success or failure message
+ :type test: :class:`qemu_test.QemuSystemTest`
+ :param success_message: if this message appears, test succeeds
+ :param failure_message: if this message appears, test fails
+ :param interrupt_string: a string to send to the console before trying
+ to read a new line
+ """
+ _console_interaction(test, success_message, failure_message,
+ interrupt_string, True)
+
+def wait_for_console_pattern(test, success_message, failure_message=None,
+ vm=None):
+ """
+ Waits for messages to appear on the console, while logging the content
+
+ :param test: a test containing a VM that will have its console
+ read and probed for a success or failure message
+ :type test: :class:`qemu_test.QemuSystemTest`
+ :param success_message: if this message appears, test succeeds
+ :param failure_message: if this message appears, test fails
+ """
+ _console_interaction(test, success_message, failure_message, None, vm=vm)
+
+def exec_command(test, command):
+ """
+ Send a command to a console (appending CRLF characters), while logging
+ the content.
+
+ :param test: a test containing a VM.
+ :type test: :class:`qemu_test.QemuSystemTest`
+ :param command: the command to send
+ :type command: str
+ """
+ _console_interaction(test, None, None, command + '\r')
+
+def exec_command_and_wait_for_pattern(test, command,
+ success_message, failure_message=None):
+ """
+ Send a command to a console (appending CRLF characters), then wait
+ for success_message to appear on the console, while logging the.
+ content. Mark the test as failed if failure_message is found instead.
+
+ :param test: a test containing a VM that will have its console
+ read and probed for a success or failure message
+ :type test: :class:`qemu_test.QemuSystemTest`
+ :param command: the command to send
+ :param success_message: if this message appears, test succeeds
+ :param failure_message: if this message appears, test fails
+ """
+ _console_interaction(test, success_message, failure_message, command + '\r')
+
+def get_qemu_img(test):
+ test.log.debug('Looking for and selecting a qemu-img binary')
+
+ # If qemu-img has been built, use it, otherwise the system wide one
+ # will be used.
+ qemu_img = os.path.join(BUILD_DIR, 'qemu-img')
+ if os.path.exists(qemu_img):
+ return qemu_img
+ if has_cmd('qemu-img'):
+ return 'qemu-img'
+ test.skipTest('Could not find "qemu-img", which is required to '
+ 'create temporary images')
diff --git a/tests/functional/qemu_test/config.py b/tests/functional/qemu_test/config.py
new file mode 100644
index 0000000..edd75b7
--- /dev/null
+++ b/tests/functional/qemu_test/config.py
@@ -0,0 +1,36 @@
+# Test class and utilities for functional tests
+#
+# Copyright 2018, 2024 Red Hat, Inc.
+#
+# Original Author (Avocado-based tests):
+# Cleber Rosa <crosa@redhat.com>
+#
+# Adaption for standalone version:
+# Thomas Huth <thuth@redhat.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or
+# later. See the COPYING file in the top-level directory.
+
+import os
+from pathlib import Path
+
+
+def _source_dir():
+ # Determine top-level directory of the QEMU sources
+ return Path(__file__).parent.parent.parent.parent
+
+def _build_dir():
+ root = os.getenv('QEMU_BUILD_ROOT')
+ if root is not None:
+ return Path(root)
+ # Makefile.mtest only exists in build dir, so if it is available, use CWD
+ if os.path.exists('Makefile.mtest'):
+ return Path(os.getcwd())
+
+ root = os.path.join(_source_dir(), 'build')
+ if os.path.exists(root):
+ return Path(root)
+
+ raise Exception("Cannot identify build dir, set QEMU_BUILD_ROOT")
+
+BUILD_DIR = _build_dir()
diff --git a/tests/functional/qemu_test/tesseract.py b/tests/functional/qemu_test/tesseract.py
new file mode 100644
index 0000000..c4087b7
--- /dev/null
+++ b/tests/functional/qemu_test/tesseract.py
@@ -0,0 +1,35 @@
+# ...
+#
+# Copyright (c) 2019 Philippe Mathieu-Daudé <f4bug@amsat.org>
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or
+# later. See the COPYING file in the top-level directory.
+
+import re
+import logging
+
+from . import has_cmd, run_cmd
+
+def tesseract_available(expected_version):
+ if not has_cmd('tesseract'):
+ return False
+ (stdout, stderr, ret) = run_cmd([ 'tesseract', '--version'])
+ if ret:
+ return False
+ version = stdout.split()[1]
+ return int(version.split('.')[0]) >= expected_version
+
+def tesseract_ocr(image_path, tesseract_args=''):
+ console_logger = logging.getLogger('console')
+ console_logger.debug(image_path)
+ (stdout, stderr, ret) = run_cmd(['tesseract', image_path,
+ 'stdout'])
+ if ret:
+ return None
+ lines = []
+ for line in stdout.split('\n'):
+ sline = line.strip()
+ if len(sline):
+ console_logger.debug(sline)
+ lines += [sline]
+ return lines
diff --git a/tests/functional/qemu_test/testcase.py b/tests/functional/qemu_test/testcase.py
new file mode 100644
index 0000000..aa01462
--- /dev/null
+++ b/tests/functional/qemu_test/testcase.py
@@ -0,0 +1,202 @@
+# Test class and utilities for functional tests
+#
+# Copyright 2018, 2024 Red Hat, Inc.
+#
+# Original Author (Avocado-based tests):
+# Cleber Rosa <crosa@redhat.com>
+#
+# Adaption for standalone version:
+# Thomas Huth <thuth@redhat.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or
+# later. See the COPYING file in the top-level directory.
+
+import logging
+import os
+import subprocess
+import pycotap
+import sys
+import unittest
+import uuid
+
+from qemu.machine import QEMUMachine
+from qemu.utils import kvm_available, tcg_available
+
+from .asset import Asset
+from .cmd import run_cmd
+from .config import BUILD_DIR
+
+
+class QemuBaseTest(unittest.TestCase):
+
+ qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY')
+ arch = None
+
+ workdir = None
+ log = None
+ logdir = None
+
+ def setUp(self, bin_prefix):
+ self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set')
+ self.arch = self.qemu_bin.split('-')[-1]
+
+ self.workdir = os.path.join(BUILD_DIR, 'tests/functional', self.arch,
+ self.id())
+ os.makedirs(self.workdir, exist_ok=True)
+
+ self.logdir = self.workdir
+ self.log = logging.getLogger('qemu-test')
+ self.log.setLevel(logging.DEBUG)
+ self._log_fh = logging.FileHandler(os.path.join(self.logdir,
+ 'base.log'), mode='w')
+ self._log_fh.setLevel(logging.DEBUG)
+ fileFormatter = logging.Formatter(
+ '%(asctime)s - %(levelname)s: %(message)s')
+ self._log_fh.setFormatter(fileFormatter)
+ self.log.addHandler(self._log_fh)
+
+ def tearDown(self):
+ self.log.removeHandler(self._log_fh)
+
+ def main():
+ path = os.path.basename(sys.argv[0])[:-3]
+
+ cache = os.environ.get("QEMU_TEST_PRECACHE", None)
+ if cache is not None:
+ Asset.precache_suites(path, cache)
+ return
+
+ tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError,
+ test_output_log = pycotap.LogMode.LogToError)
+ unittest.main(module = None, testRunner = tr, argv=["__dummy__", path])
+
+
+class QemuUserTest(QemuBaseTest):
+
+ def setUp(self):
+ super().setUp('qemu-')
+ self._ldpath = []
+
+ def add_ldpath(self, ldpath):
+ self._ldpath.append(os.path.abspath(ldpath))
+
+ def run_cmd(self, bin_path, args=[]):
+ return subprocess.run([self.qemu_bin]
+ + ["-L %s" % ldpath for ldpath in self._ldpath]
+ + [bin_path]
+ + args,
+ text=True, capture_output=True)
+
+class QemuSystemTest(QemuBaseTest):
+ """Facilitates system emulation tests."""
+
+ cpu = None
+ machine = None
+ _machinehelp = None
+
+ def setUp(self):
+ self._vms = {}
+
+ super().setUp('qemu-system-')
+
+ console_log = logging.getLogger('console')
+ console_log.setLevel(logging.DEBUG)
+ self._console_log_fh = logging.FileHandler(os.path.join(self.workdir,
+ 'console.log'), mode='w')
+ self._console_log_fh.setLevel(logging.DEBUG)
+ fileFormatter = logging.Formatter('%(asctime)s: %(message)s')
+ self._console_log_fh.setFormatter(fileFormatter)
+ console_log.addHandler(self._console_log_fh)
+
+ def set_machine(self, machinename):
+ # TODO: We should use QMP to get the list of available machines
+ if not self._machinehelp:
+ self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0];
+ if self._machinehelp.find(machinename) < 0:
+ self.skipTest('no support for machine ' + machinename)
+ self.machine = machinename
+
+ def require_accelerator(self, accelerator):
+ """
+ Requires an accelerator to be available for the test to continue
+
+ It takes into account the currently set qemu binary.
+
+ If the check fails, the test is canceled. If the check itself
+ for the given accelerator is not available, the test is also
+ canceled.
+
+ :param accelerator: name of the accelerator, such as "kvm" or "tcg"
+ :type accelerator: str
+ """
+ checker = {'tcg': tcg_available,
+ 'kvm': kvm_available}.get(accelerator)
+ if checker is None:
+ self.skipTest("Don't know how to check for the presence "
+ "of accelerator %s" % accelerator)
+ if not checker(qemu_bin=self.qemu_bin):
+ self.skipTest("%s accelerator does not seem to be "
+ "available" % accelerator)
+
+ def require_netdev(self, netdevname):
+ netdevhelp = run_cmd([self.qemu_bin,
+ '-M', 'none', '-netdev', 'help'])[0];
+ if netdevhelp.find('\n' + netdevname + '\n') < 0:
+ self.skipTest('no support for " + netdevname + " networking')
+
+ def require_device(self, devicename):
+ devhelp = run_cmd([self.qemu_bin,
+ '-M', 'none', '-device', 'help'])[0];
+ if devhelp.find(devicename) < 0:
+ self.skipTest('no support for device ' + devicename)
+
+ def _new_vm(self, name, *args):
+ vm = QEMUMachine(self.qemu_bin, base_temp_dir=self.workdir)
+ self.log.debug('QEMUMachine "%s" created', name)
+ self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
+ self.log.debug('QEMUMachine "%s" log_dir: %s', name, vm.log_dir)
+ if args:
+ vm.add_args(*args)
+ return vm
+
+ @property
+ def vm(self):
+ return self.get_vm(name='default')
+
+ def get_vm(self, *args, name=None):
+ if not name:
+ name = str(uuid.uuid4())
+ if self._vms.get(name) is None:
+ self._vms[name] = self._new_vm(name, *args)
+ if self.cpu is not None:
+ self._vms[name].add_args('-cpu', self.cpu)
+ if self.machine is not None:
+ self._vms[name].set_machine(self.machine)
+ return self._vms[name]
+
+ def set_vm_arg(self, arg, value):
+ """
+ Set an argument to list of extra arguments to be given to the QEMU
+ binary. If the argument already exists then its value is replaced.
+
+ :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
+ :type arg: str
+ :param value: the argument value, such as "host" in "-cpu host"
+ :type value: str
+ """
+ if not arg or not value:
+ return
+ if arg not in self.vm.args:
+ self.vm.args.extend([arg, value])
+ else:
+ idx = self.vm.args.index(arg) + 1
+ if idx < len(self.vm.args):
+ self.vm.args[idx] = value
+ else:
+ self.vm.args.append(value)
+
+ def tearDown(self):
+ for vm in self._vms.values():
+ vm.shutdown()
+ logging.getLogger('console').removeHandler(self._console_log_fh)
+ super().tearDown()
diff --git a/tests/functional/qemu_test/utils.py b/tests/functional/qemu_test/utils.py
new file mode 100644
index 0000000..2a1cb60
--- /dev/null
+++ b/tests/functional/qemu_test/utils.py
@@ -0,0 +1,56 @@
+# Utilities for python-based QEMU tests
+#
+# Copyright 2024 Red Hat, Inc.
+#
+# Authors:
+# Thomas Huth <thuth@redhat.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or
+# later. See the COPYING file in the top-level directory.
+
+import gzip
+import lzma
+import os
+import shutil
+import subprocess
+import tarfile
+
+def archive_extract(archive, dest_dir, member=None):
+ with tarfile.open(archive) as tf:
+ if hasattr(tarfile, 'data_filter'):
+ tf.extraction_filter = getattr(tarfile, 'data_filter',
+ (lambda member, path: member))
+ if member:
+ tf.extract(member=member, path=dest_dir)
+ else:
+ tf.extractall(path=dest_dir)
+
+def gzip_uncompress(gz_path, output_path):
+ if os.path.exists(output_path):
+ return
+ with gzip.open(gz_path, 'rb') as gz_in:
+ try:
+ with open(output_path, 'wb') as raw_out:
+ shutil.copyfileobj(gz_in, raw_out)
+ except:
+ os.remove(output_path)
+ raise
+
+def lzma_uncompress(xz_path, output_path):
+ if os.path.exists(output_path):
+ return
+ with lzma.open(xz_path, 'rb') as lzma_in:
+ try:
+ with open(output_path, 'wb') as raw_out:
+ shutil.copyfileobj(lzma_in, raw_out)
+ except:
+ os.remove(output_path)
+ raise
+
+def cpio_extract(cpio_handle, output_path):
+ cwd = os.getcwd()
+ os.chdir(output_path)
+ subprocess.run(['cpio', '-i'],
+ input=cpio_handle.read(),
+ stderr=subprocess.DEVNULL)
+ os.chdir(cwd)