diff options
Diffstat (limited to 'tests/functional/qemu_test')
-rw-r--r-- | tests/functional/qemu_test/__init__.py | 12 | ||||
-rw-r--r-- | tests/functional/qemu_test/archive.py | 117 | ||||
-rw-r--r-- | tests/functional/qemu_test/asset.py | 87 | ||||
-rw-r--r-- | tests/functional/qemu_test/cmd.py | 159 | ||||
-rw-r--r-- | tests/functional/qemu_test/config.py | 12 | ||||
-rw-r--r-- | tests/functional/qemu_test/decorators.py | 151 | ||||
-rw-r--r-- | tests/functional/qemu_test/linuxkernel.py | 52 | ||||
-rw-r--r-- | tests/functional/qemu_test/ports.py | 55 | ||||
-rw-r--r-- | tests/functional/qemu_test/tesseract.py | 20 | ||||
-rw-r--r-- | tests/functional/qemu_test/testcase.py | 272 | ||||
-rw-r--r-- | tests/functional/qemu_test/tuxruntest.py | 136 | ||||
-rw-r--r-- | tests/functional/qemu_test/uncompress.py | 107 | ||||
-rw-r--r-- | tests/functional/qemu_test/utils.py | 65 |
13 files changed, 1061 insertions, 184 deletions
diff --git a/tests/functional/qemu_test/__init__.py b/tests/functional/qemu_test/__init__.py index f33282e..6e666a0 100644 --- a/tests/functional/qemu_test/__init__.py +++ b/tests/functional/qemu_test/__init__.py @@ -7,8 +7,14 @@ from .asset import Asset -from .config import BUILD_DIR -from .cmd import has_cmd, has_cmds, run_cmd, is_readable_executable_file, \ +from .config import BUILD_DIR, dso_suffix +from .cmd import is_readable_executable_file, \ interrupt_interactive_console_until_pattern, wait_for_console_pattern, \ - exec_command, exec_command_and_wait_for_pattern, get_qemu_img + exec_command, exec_command_and_wait_for_pattern, get_qemu_img, which from .testcase import QemuBaseTest, QemuUserTest, QemuSystemTest +from .linuxkernel import LinuxKernelTest +from .decorators import skipIfMissingCommands, skipIfNotMachine, \ + skipFlakyTest, skipUntrustedTest, skipBigDataTest, skipSlowTest, \ + skipIfMissingImports, skipIfOperatingSystem, skipLockedMemoryTest +from .archive import archive_extract +from .uncompress import uncompress diff --git a/tests/functional/qemu_test/archive.py b/tests/functional/qemu_test/archive.py new file mode 100644 index 0000000..c803fda --- /dev/null +++ b/tests/functional/qemu_test/archive.py @@ -0,0 +1,117 @@ +# SPDX-License-Identifier: GPL-2.0-or-later +# +# Utilities for python-based QEMU tests +# +# Copyright 2024 Red Hat, Inc. +# +# Authors: +# Thomas Huth <thuth@redhat.com> + +import os +from subprocess import check_call, run, DEVNULL +import tarfile +from urllib.parse import urlparse +import zipfile + +from .asset import Asset + + +def tar_extract(archive, dest_dir, member=None): + with tarfile.open(archive) as tf: + if hasattr(tarfile, 'data_filter'): + tf.extraction_filter = getattr(tarfile, 'data_filter', + (lambda member, path: member)) + if member: + tf.extract(member=member, path=dest_dir) + else: + tf.extractall(path=dest_dir) + +def cpio_extract(archive, output_path): + cwd = os.getcwd() + os.chdir(output_path) + # Not passing 'check=True' as cpio exits with non-zero + # status if the archive contains any device nodes :-( + if type(archive) == str: + run(['cpio', '-i', '-F', archive], + stdout=DEVNULL, stderr=DEVNULL) + else: + run(['cpio', '-i'], + input=archive.read(), + stdout=DEVNULL, stderr=DEVNULL) + os.chdir(cwd) + +def zip_extract(archive, dest_dir, member=None): + with zipfile.ZipFile(archive, 'r') as zf: + if member: + zf.extract(member=member, path=dest_dir) + else: + zf.extractall(path=dest_dir) + +def deb_extract(archive, dest_dir, member=None): + cwd = os.getcwd() + os.chdir(dest_dir) + try: + proc = run(['ar', 't', archive], + check=True, capture_output=True, encoding='utf8') + file_path = proc.stdout.split()[2] + check_call(['ar', 'x', archive, file_path], + stdout=DEVNULL, stderr=DEVNULL) + tar_extract(file_path, dest_dir, member) + finally: + os.chdir(cwd) + +''' +@params archive: filename, Asset, or file-like object to extract +@params dest_dir: target directory to extract into +@params member: optional member file to limit extraction to + +Extracts @archive into @dest_dir. All files are extracted +unless @member specifies a limit. + +If @format is None, heuristics will be applied to guess the format +from the filename or Asset URL. @format must be non-None if @archive +is a file-like object. +''' +def archive_extract(archive, dest_dir, format=None, member=None): + if format is None: + format = guess_archive_format(archive) + if type(archive) == Asset: + archive = str(archive) + + if format == "tar": + tar_extract(archive, dest_dir, member) + elif format == "zip": + zip_extract(archive, dest_dir, member) + elif format == "cpio": + if member is not None: + raise Exception("Unable to filter cpio extraction") + cpio_extract(archive, dest_dir) + elif format == "deb": + if type(archive) != str: + raise Exception("Unable to use file-like object with deb archives") + deb_extract(archive, dest_dir, "./" + member) + else: + raise Exception(f"Unknown archive format {format}") + +''' +@params archive: filename, or Asset to guess + +Guess the format of @compressed, raising an exception if +no format can be determined +''' +def guess_archive_format(archive): + if type(archive) == Asset: + archive = urlparse(archive.url).path + elif type(archive) != str: + raise Exception(f"Unable to guess archive format for {archive}") + + if ".tar." in archive or archive.endswith("tgz"): + return "tar" + elif archive.endswith(".zip"): + return "zip" + elif archive.endswith(".cpio"): + return "cpio" + elif archive.endswith(".deb") or archive.endswith(".udeb"): + return "deb" + else: + raise Exception(f"Unknown archive format for {archive}") diff --git a/tests/functional/qemu_test/asset.py b/tests/functional/qemu_test/asset.py index d3be2af..704b84d 100644 --- a/tests/functional/qemu_test/asset.py +++ b/tests/functional/qemu_test/asset.py @@ -8,14 +8,23 @@ import hashlib import logging import os -import subprocess +import stat import sys import unittest import urllib.request from time import sleep from pathlib import Path from shutil import copyfileobj +from urllib.error import HTTPError +class AssetError(Exception): + def __init__(self, asset, msg, transient=False): + self.url = asset.url + self.msg = msg + self.transient = transient + + def __str__(self): + return "%s: %s" % (self.url, self.msg) # Instances of this class must be declared as class level variables # starting with a name "ASSET_". This enables the pre-caching logic @@ -39,23 +48,38 @@ class Asset: return "Asset: url=%s hash=%s cache=%s" % ( self.url, self.hash, self.cache_file) + def __str__(self): + return str(self.cache_file) + def _check(self, cache_file): if self.hash is None: return True if len(self.hash) == 64: - sum_prog = 'sha256sum' + hl = hashlib.sha256() elif len(self.hash) == 128: - sum_prog = 'sha512sum' + hl = hashlib.sha512() else: - raise Exception("unknown hash type") + raise AssetError(self, "unknown hash type") - checksum = subprocess.check_output( - [sum_prog, str(cache_file)]).split()[0] - return self.hash == checksum.decode("utf-8") + # Calculate the hash of the file: + with open(cache_file, 'rb') as file: + while True: + chunk = file.read(1 << 20) + if not chunk: + break + hl.update(chunk) + + return self.hash == hl.hexdigest() def valid(self): return self.cache_file.exists() and self._check(self.cache_file) + def fetchable(self): + return not os.environ.get("QEMU_TEST_NO_DOWNLOAD", False) + + def available(self): + return self.valid() or self.fetchable() + def _wait_for_other_download(self, tmp_cache_file): # Another thread already seems to download the asset, so wait until # it is done, while also checking the size to see whether it is stuck @@ -94,8 +118,9 @@ class Asset: self.cache_file, self.url) return str(self.cache_file) - if os.environ.get("QEMU_TEST_NO_DOWNLOAD", False): - raise Exception("Asset cache is invalid and downloads disabled") + if not self.fetchable(): + raise AssetError(self, + "Asset cache is invalid and downloads disabled") self.log.info("Downloading %s to %s...", self.url, self.cache_file) tmp_cache_file = self.cache_file.with_suffix(".download") @@ -105,6 +130,20 @@ class Asset: with tmp_cache_file.open("xb") as dst: with urllib.request.urlopen(self.url) as resp: copyfileobj(resp, dst) + length_hdr = resp.getheader("Content-Length") + + # Verify downloaded file size against length metadata, if + # available. + if length_hdr is not None: + length = int(length_hdr) + fsize = tmp_cache_file.stat().st_size + if fsize != length: + self.log.error("Unable to download %s: " + "connection closed before " + "transfer complete (%d/%d)", + self.url, fsize, length) + tmp_cache_file.unlink() + continue break except FileExistsError: self.log.debug("%s already exists, " @@ -117,10 +156,23 @@ class Asset: tmp_cache_file) tmp_cache_file.unlink() continue + except HTTPError as e: + tmp_cache_file.unlink() + self.log.error("Unable to download %s: HTTP error %d", + self.url, e.code) + # Treat 404 as fatal, since it is highly likely to + # indicate a broken test rather than a transient + # server or networking problem + if e.code == 404: + raise AssetError(self, "Unable to download: " + "HTTP error %d" % e.code) + continue except Exception as e: - self.log.error("Unable to download %s: %s", self.url, e) tmp_cache_file.unlink() - raise + raise AssetError(self, "Unable to download: " % e) + + if not os.path.exists(tmp_cache_file): + raise AssetError(self, "Download retries exceeded", transient=True) try: # Set these just for informational purposes @@ -134,9 +186,10 @@ class Asset: if not self._check(tmp_cache_file): tmp_cache_file.unlink() - raise Exception("Hash of %s does not match %s" % - (self.url, self.hash)) + raise AssetError(self, "Hash does not match %s" % self.hash) tmp_cache_file.replace(self.cache_file) + # Remove write perms to stop tests accidentally modifying them + os.chmod(self.cache_file, stat.S_IRUSR | stat.S_IRGRP) self.log.info("Cached %s at %s" % (self.url, self.cache_file)) return str(self.cache_file) @@ -153,7 +206,13 @@ class Asset: for name, asset in vars(test.__class__).items(): if name.startswith("ASSET_") and type(asset) == Asset: log.info("Attempting to cache '%s'" % asset) - asset.fetch() + try: + asset.fetch() + except AssetError as e: + if not e.transient: + raise + log.error("%s: skipping asset precache" % e) + log.removeHandler(handler) def precache_suite(suite): diff --git a/tests/functional/qemu_test/cmd.py b/tests/functional/qemu_test/cmd.py index 3acd617..dc5f422 100644 --- a/tests/functional/qemu_test/cmd.py +++ b/tests/functional/qemu_test/cmd.py @@ -14,77 +14,93 @@ import logging import os import os.path -import subprocess -from .config import BUILD_DIR - -def has_cmd(name, args=None): - """ - This function is for use in a @skipUnless decorator, e.g.: - - @skipUnless(*has_cmd('sudo -n', ('sudo', '-n', 'true'))) - def test_something_that_needs_sudo(self): - ... - """ - - if args is None: - args = ('which', name) - - try: - _, stderr, exitcode = run_cmd(args) - except Exception as e: - exitcode = -1 - stderr = str(e) - - if exitcode != 0: - cmd_line = ' '.join(args) - err = f'{name} required, but "{cmd_line}" failed: {stderr.strip()}' - return (False, err) - else: - return (True, '') - -def has_cmds(*cmds): - """ - This function is for use in a @skipUnless decorator and - allows checking for the availability of multiple commands, e.g.: - - @skipUnless(*has_cmds(('cmd1', ('cmd1', '--some-parameter')), - 'cmd2', 'cmd3')) - def test_something_that_needs_cmd1_and_cmd2(self): - ... +def which(tool): + """ looks up the full path for @tool, returns None if not found + or if @tool does not have executable permissions. """ + paths=os.getenv('PATH') + for p in paths.split(os.path.pathsep): + p = os.path.join(p, tool) + if os.access(p, os.X_OK): + return p + return None - for cmd in cmds: - if isinstance(cmd, str): - cmd = (cmd,) +def is_readable_executable_file(path): + return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK) - ok, errstr = has_cmd(*cmd) - if not ok: - return (False, errstr) +# @test: functional test to fail if @failure is seen +# @vm: the VM whose console to process +# @success: a non-None string to look for +# @failure: a string to look for that triggers test failure, or None +# +# Read up to 1 line of text from @vm, looking for @success +# and optionally @failure. +# +# If @success or @failure are seen, immediately return True, +# even if end of line is not yet seen. ie remainder of the +# line is left unread. +# +# If end of line is seen, with neither @success or @failure +# return False +# +# If @failure is seen, then mark @test as failed +def _console_read_line_until_match(test, vm, success, failure): + msg = bytes([]) + done = False + while True: + c = vm.console_socket.recv(1) + if c is None: + done = True + test.fail( + f"EOF in console, expected '{success}'") + break + msg += c - return (True, '') + if success in msg: + done = True + break + if failure and failure in msg: + done = True + vm.console_socket.close() + test.fail( + f"'{failure}' found in console, expected '{success}'") -def run_cmd(args): - subp = subprocess.Popen(args, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=True) - stdout, stderr = subp.communicate() - ret = subp.returncode + if c == b'\n': + break - return (stdout, stderr, ret) + console_logger = logging.getLogger('console') + try: + console_logger.debug(msg.decode().strip()) + except: + console_logger.debug(msg) -def is_readable_executable_file(path): - return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK) + return done def _console_interaction(test, success_message, failure_message, send_string, keep_sending=False, vm=None): assert not keep_sending or send_string + assert success_message or send_string + if vm is None: vm = test.vm - console = vm.console_file - console_logger = logging.getLogger('console') + + test.log.debug( + f"Console interaction: success_msg='{success_message}' " + + f"failure_msg='{failure_message}' send_string='{send_string}'") + + # We'll process console in bytes, to avoid having to + # deal with unicode decode errors from receiving + # partial utf8 byte sequences + success_message_b = None + if success_message is not None: + success_message_b = success_message.encode() + + failure_message_b = None + if failure_message is not None: + failure_message_b = failure_message.encode() + while True: if send_string: vm.console_socket.sendall(send_string.encode()) @@ -92,25 +108,15 @@ def _console_interaction(test, success_message, failure_message, send_string = None # send only once # Only consume console output if waiting for something - if success_message is None and failure_message is None: + if success_message is None: if send_string is None: break continue - try: - msg = console.readline().decode().strip() - except UnicodeDecodeError: - msg = None - if not msg: - continue - console_logger.debug(msg) - if success_message is None or success_message in msg: + if _console_read_line_until_match(test, vm, + success_message_b, + failure_message_b): break - if failure_message and failure_message in msg: - console.close() - fail = 'Failure message found in console: "%s". Expected: "%s"' % \ - (failure_message, success_message) - test.fail(fail) def interrupt_interactive_console_until_pattern(test, success_message, failure_message=None, @@ -135,6 +141,7 @@ def interrupt_interactive_console_until_pattern(test, success_message, :param interrupt_string: a string to send to the console before trying to read a new line """ + assert success_message _console_interaction(test, success_message, failure_message, interrupt_string, True) @@ -149,6 +156,7 @@ def wait_for_console_pattern(test, success_message, failure_message=None, :param success_message: if this message appears, test succeeds :param failure_message: if this message appears, test fails """ + assert success_message _console_interaction(test, success_message, failure_message, None, vm=vm) def exec_command(test, command): @@ -177,6 +185,7 @@ def exec_command_and_wait_for_pattern(test, command, :param success_message: if this message appears, test succeeds :param failure_message: if this message appears, test fails """ + assert success_message _console_interaction(test, success_message, failure_message, command + '\r') def get_qemu_img(test): @@ -184,10 +193,10 @@ def get_qemu_img(test): # If qemu-img has been built, use it, otherwise the system wide one # will be used. - qemu_img = os.path.join(BUILD_DIR, 'qemu-img') + qemu_img = test.build_file('qemu-img') if os.path.exists(qemu_img): return qemu_img - if has_cmd('qemu-img'): - return 'qemu-img' - test.skipTest('Could not find "qemu-img", which is required to ' - 'create temporary images') + qemu_img = which('qemu-img') + if qemu_img is not None: + return qemu_img + test.skipTest(f"qemu-img not found in build dir or '$PATH'") diff --git a/tests/functional/qemu_test/config.py b/tests/functional/qemu_test/config.py index edd75b7..6d4c9c3 100644 --- a/tests/functional/qemu_test/config.py +++ b/tests/functional/qemu_test/config.py @@ -13,6 +13,7 @@ import os from pathlib import Path +import platform def _source_dir(): @@ -34,3 +35,14 @@ def _build_dir(): raise Exception("Cannot identify build dir, set QEMU_BUILD_ROOT") BUILD_DIR = _build_dir() + +def dso_suffix(): + '''Return the dynamic libraries suffix for the current platform''' + + if platform.system() == "Darwin": + return "dylib" + + if platform.system() == "Windows": + return "dll" + + return "so" diff --git a/tests/functional/qemu_test/decorators.py b/tests/functional/qemu_test/decorators.py new file mode 100644 index 0000000..c0d1567 --- /dev/null +++ b/tests/functional/qemu_test/decorators.py @@ -0,0 +1,151 @@ +# SPDX-License-Identifier: GPL-2.0-or-later +# +# Decorators useful in functional tests + +import importlib +import os +import platform +import resource +from unittest import skipIf, skipUnless + +from .cmd import which + +''' +Decorator to skip execution of a test if the list +of command binaries is not available in $PATH. +Example: + + @skipIfMissingCommands("mkisofs", "losetup") +''' +def skipIfMissingCommands(*args): + has_cmds = True + for cmd in args: + if not which(cmd): + has_cmds = False + break + + return skipUnless(has_cmds, 'required command(s) "%s" not installed' % + ", ".join(args)) + +''' +Decorator to skip execution of a test if the current +host operating system does match one of the prohibited +ones. +Example + + @skipIfOperatingSystem("Linux", "Darwin") +''' +def skipIfOperatingSystem(*args): + return skipIf(platform.system() in args, + 'running on an OS (%s) that is not able to run this test' % + ", ".join(args)) + +''' +Decorator to skip execution of a test if the current +host machine does not match one of the permitted +machines. +Example + + @skipIfNotMachine("x86_64", "aarch64") +''' +def skipIfNotMachine(*args): + return skipUnless(platform.machine() in args, + 'not running on one of the required machine(s) "%s"' % + ", ".join(args)) + +''' +Decorator to skip execution of flaky tests, unless +the $QEMU_TEST_FLAKY_TESTS environment variable is set. +A bug URL must be provided that documents the observed +failure behaviour, so it can be tracked & re-evaluated +in future. + +Historical tests may be providing "None" as the bug_url +but this should not be done for new test. + +Example: + + @skipFlakyTest("https://gitlab.com/qemu-project/qemu/-/issues/NNN") +''' +def skipFlakyTest(bug_url): + if bug_url is None: + bug_url = "FIXME: reproduce flaky test and file bug report or remove" + return skipUnless(os.getenv('QEMU_TEST_FLAKY_TESTS'), + f'Test is unstable: {bug_url}') + +''' +Decorator to skip execution of tests which are likely +to execute untrusted commands on the host, or commands +which process untrusted code, unless the +$QEMU_TEST_ALLOW_UNTRUSTED_CODE env var is set. +Example: + + @skipUntrustedTest() +''' +def skipUntrustedTest(): + return skipUnless(os.getenv('QEMU_TEST_ALLOW_UNTRUSTED_CODE'), + 'Test runs untrusted code / processes untrusted data') + +''' +Decorator to skip execution of tests which need large +data storage (over around 500MB-1GB mark) on the host, +unless the $QEMU_TEST_ALLOW_LARGE_STORAGE environment +variable is set + +Example: + + @skipBigDataTest() +''' +def skipBigDataTest(): + return skipUnless(os.getenv('QEMU_TEST_ALLOW_LARGE_STORAGE'), + 'Test requires large host storage space') + +''' +Decorator to skip execution of tests which have a really long +runtime (and might e.g. time out if QEMU has been compiled with +debugging enabled) unless the $QEMU_TEST_ALLOW_SLOW +environment variable is set + +Example: + + @skipSlowTest() +''' +def skipSlowTest(): + return skipUnless(os.getenv('QEMU_TEST_ALLOW_SLOW'), + 'Test has a very long runtime and might time out') + +''' +Decorator to skip execution of a test if the list +of python imports is not available. +Example: + + @skipIfMissingImports("numpy", "cv2") +''' +def skipIfMissingImports(*args): + has_imports = True + for impname in args: + try: + importlib.import_module(impname) + except ImportError: + has_imports = False + break + + return skipUnless(has_imports, 'required import(s) "%s" not installed' % + ", ".join(args)) + +''' +Decorator to skip execution of a test if the system's +locked memory limit is below the required threshold. +Takes required locked memory threshold in kB. +Example: + + @skipLockedMemoryTest(2_097_152) +''' +def skipLockedMemoryTest(locked_memory): + # get memlock hard limit in bytes + _, ulimit_memory = resource.getrlimit(resource.RLIMIT_MEMLOCK) + + return skipUnless( + ulimit_memory == resource.RLIM_INFINITY or ulimit_memory >= locked_memory * 1024, + f'Test required {locked_memory} kB of available locked memory', + ) diff --git a/tests/functional/qemu_test/linuxkernel.py b/tests/functional/qemu_test/linuxkernel.py new file mode 100644 index 0000000..2aca0ee --- /dev/null +++ b/tests/functional/qemu_test/linuxkernel.py @@ -0,0 +1,52 @@ +# Test class for testing the boot process of a Linux kernel +# +# This work is licensed under the terms of the GNU GPL, version 2 or +# later. See the COPYING file in the top-level directory. + +import hashlib +import urllib.request + +from .cmd import wait_for_console_pattern, exec_command_and_wait_for_pattern +from .testcase import QemuSystemTest +from .utils import get_usernet_hostfwd_port + + +class LinuxKernelTest(QemuSystemTest): + KERNEL_COMMON_COMMAND_LINE = 'printk.time=0 ' + + def wait_for_console_pattern(self, success_message, vm=None): + wait_for_console_pattern(self, success_message, + failure_message='Kernel panic - not syncing', + vm=vm) + + def launch_kernel(self, kernel, initrd=None, dtb=None, console_index=0, + wait_for=None): + self.vm.set_console(console_index=console_index) + self.vm.add_args('-kernel', kernel) + if initrd: + self.vm.add_args('-initrd', initrd) + if dtb: + self.vm.add_args('-dtb', dtb) + self.vm.launch() + if wait_for: + self.wait_for_console_pattern(wait_for) + + def check_http_download(self, filename, hashsum, guestport=8080, + pythoncmd='python3 -m http.server'): + exec_command_and_wait_for_pattern(self, + f'{pythoncmd} {guestport} & sleep 1', + f'Serving HTTP on 0.0.0.0 port {guestport}') + hl = hashlib.sha256() + hostport = get_usernet_hostfwd_port(self.vm) + url = f'http://localhost:{hostport}{filename}' + self.log.info(f'Downloading {url} ...') + with urllib.request.urlopen(url) as response: + while True: + chunk = response.read(1 << 20) + if not chunk: + break + hl.update(chunk) + + digest = hl.hexdigest() + self.log.info(f'sha256sum of download is {digest}.') + self.assertEqual(digest, hashsum) diff --git a/tests/functional/qemu_test/ports.py b/tests/functional/qemu_test/ports.py new file mode 100644 index 0000000..631b77a --- /dev/null +++ b/tests/functional/qemu_test/ports.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python3 +# +# Simple functional tests for VNC functionality +# +# Copyright 2018, 2024 Red Hat, Inc. +# +# This work is licensed under the terms of the GNU GPL, version 2 or +# later. See the COPYING file in the top-level directory. + +import fcntl +import os +import socket + +from .config import BUILD_DIR +from typing import List + + +class Ports(): + + PORTS_ADDR = '127.0.0.1' + PORTS_RANGE_SIZE = 1024 + PORTS_START = 49152 + ((os.getpid() * PORTS_RANGE_SIZE) % 16384) + PORTS_END = PORTS_START + PORTS_RANGE_SIZE + + def __enter__(self): + lock_file = os.path.join(BUILD_DIR, "tests", "functional", "port_lock") + self.lock_fh = os.open(lock_file, os.O_CREAT) + fcntl.flock(self.lock_fh, fcntl.LOCK_EX) + return self + + def __exit__(self, exc_type, exc_value, traceback): + fcntl.flock(self.lock_fh, fcntl.LOCK_UN) + os.close(self.lock_fh) + + def check_bind(self, port: int) -> bool: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + try: + sock.bind((self.PORTS_ADDR, port)) + except OSError: + return False + + return True + + def find_free_ports(self, count: int) -> List[int]: + result = [] + for port in range(self.PORTS_START, self.PORTS_END): + if self.check_bind(port): + result.append(port) + if len(result) >= count: + break + assert len(result) == count + return result + + def find_free_port(self) -> int: + return self.find_free_ports(1)[0] diff --git a/tests/functional/qemu_test/tesseract.py b/tests/functional/qemu_test/tesseract.py index c4087b7..ede6c65 100644 --- a/tests/functional/qemu_test/tesseract.py +++ b/tests/functional/qemu_test/tesseract.py @@ -5,29 +5,19 @@ # This work is licensed under the terms of the GNU GPL, version 2 or # later. See the COPYING file in the top-level directory. -import re import logging +from subprocess import run -from . import has_cmd, run_cmd - -def tesseract_available(expected_version): - if not has_cmd('tesseract'): - return False - (stdout, stderr, ret) = run_cmd([ 'tesseract', '--version']) - if ret: - return False - version = stdout.split()[1] - return int(version.split('.')[0]) >= expected_version def tesseract_ocr(image_path, tesseract_args=''): console_logger = logging.getLogger('console') console_logger.debug(image_path) - (stdout, stderr, ret) = run_cmd(['tesseract', image_path, - 'stdout']) - if ret: + proc = run(['tesseract', image_path, 'stdout'], + capture_output=True, encoding='utf8') + if proc.returncode: return None lines = [] - for line in stdout.split('\n'): + for line in proc.stdout.split('\n'): sline = line.strip() if len(sline): console_logger.debug(sline) diff --git a/tests/functional/qemu_test/testcase.py b/tests/functional/qemu_test/testcase.py index aa01462..2082c6f 100644 --- a/tests/functional/qemu_test/testcase.py +++ b/tests/functional/qemu_test/testcase.py @@ -13,49 +13,224 @@ import logging import os -import subprocess +from pathlib import Path import pycotap +import shutil +from subprocess import run import sys +import tempfile import unittest import uuid from qemu.machine import QEMUMachine -from qemu.utils import kvm_available, tcg_available +from qemu.utils import hvf_available, kvm_available, tcg_available +from .archive import archive_extract from .asset import Asset -from .cmd import run_cmd -from .config import BUILD_DIR +from .config import BUILD_DIR, dso_suffix +from .uncompress import uncompress class QemuBaseTest(unittest.TestCase): - qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY') - arch = None - - workdir = None - log = None - logdir = None + ''' + @params compressed: filename, Asset, or file-like object to uncompress + @params format: optional compression format (gzip, lzma) + + Uncompresses @compressed into the scratch directory. + + If @format is None, heuristics will be applied to guess the format + from the filename or Asset URL. @format must be non-None if @uncompressed + is a file-like object. + + Returns the fully qualified path to the uncompressed file + ''' + def uncompress(self, compressed, format=None): + self.log.debug(f"Uncompress {compressed} format={format}") + if type(compressed) == Asset: + compressed.fetch() + + (name, ext) = os.path.splitext(str(compressed)) + uncompressed = self.scratch_file(os.path.basename(name)) + + uncompress(compressed, uncompressed, format) + + return uncompressed + + ''' + @params archive: filename, Asset, or file-like object to extract + @params format: optional archive format (tar, zip, deb, cpio) + @params sub_dir: optional sub-directory to extract into + @params member: optional member file to limit extraction to + + Extracts @archive into the scratch directory, or a directory beneath + named by @sub_dir. All files are extracted unless @member specifies + a limit. + + If @format is None, heuristics will be applied to guess the format + from the filename or Asset URL. @format must be non-None if @archive + is a file-like object. + + If @member is non-None, returns the fully qualified path to @member + ''' + def archive_extract(self, archive, format=None, sub_dir=None, member=None): + self.log.debug(f"Extract {archive} format={format}" + + f"sub_dir={sub_dir} member={member}") + if type(archive) == Asset: + archive.fetch() + if sub_dir is None: + archive_extract(archive, self.scratch_file(), format, member) + else: + archive_extract(archive, self.scratch_file(sub_dir), + format, member) + + if member is not None: + return self.scratch_file(member) + return None + + ''' + Create a temporary directory suitable for storing UNIX + socket paths. + + Returns: a tempfile.TemporaryDirectory instance + ''' + def socket_dir(self): + if self.socketdir is None: + self.socketdir = tempfile.TemporaryDirectory( + prefix="qemu_func_test_sock_") + return self.socketdir + + ''' + @params args list of zero or more subdirectories or file + + Construct a path for accessing a data file located + relative to the source directory that is the root for + functional tests. + + @args may be an empty list to reference the root dir + itself, may be a single element to reference a file in + the root directory, or may be multiple elements to + reference a file nested below. The path components + will be joined using the platform appropriate path + separator. + + Returns: string representing a file path + ''' + def data_file(self, *args): + return str(Path(Path(__file__).parent.parent, *args)) + + ''' + @params args list of zero or more subdirectories or file + + Construct a path for accessing a data file located + relative to the build directory root. + + @args may be an empty list to reference the build dir + itself, may be a single element to reference a file in + the build directory, or may be multiple elements to + reference a file nested below. The path components + will be joined using the platform appropriate path + separator. + + Returns: string representing a file path + ''' + def build_file(self, *args): + return str(Path(BUILD_DIR, *args)) + + ''' + @params args list of zero or more subdirectories or file + + Construct a path for accessing/creating a scratch file + located relative to a temporary directory dedicated to + this test case. The directory and its contents will be + purged upon completion of the test. + + @args may be an empty list to reference the scratch dir + itself, may be a single element to reference a file in + the scratch directory, or may be multiple elements to + reference a file nested below. The path components + will be joined using the platform appropriate path + separator. + + Returns: string representing a file path + ''' + def scratch_file(self, *args): + return str(Path(self.workdir, *args)) + + ''' + @params args list of zero or more subdirectories or file + + Construct a path for accessing/creating a log file + located relative to a temporary directory dedicated to + this test case. The directory and its log files will be + preserved upon completion of the test. + + @args may be an empty list to reference the log dir + itself, may be a single element to reference a file in + the log directory, or may be multiple elements to + reference a file nested below. The path components + will be joined using the platform appropriate path + separator. + + Returns: string representing a file path + ''' + def log_file(self, *args): + return str(Path(self.outputdir, *args)) + + ''' + @params plugin name + + Return the full path to the plugin taking into account any host OS + specific suffixes. + ''' + def plugin_file(self, plugin_name): + sfx = dso_suffix() + return os.path.join('tests', 'tcg', 'plugins', f'{plugin_name}.{sfx}') + + def assets_available(self): + for name, asset in vars(self.__class__).items(): + if name.startswith("ASSET_") and type(asset) == Asset: + if not asset.available(): + self.log.debug(f"Asset {asset.url} not available") + return False + return True - def setUp(self, bin_prefix): + def setUp(self): + self.qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY') self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set') self.arch = self.qemu_bin.split('-')[-1] + self.socketdir = None - self.workdir = os.path.join(BUILD_DIR, 'tests/functional', self.arch, - self.id()) + self.outputdir = self.build_file('tests', 'functional', + self.arch, self.id()) + self.workdir = os.path.join(self.outputdir, 'scratch') os.makedirs(self.workdir, exist_ok=True) - self.logdir = self.workdir + self.log_filename = self.log_file('base.log') self.log = logging.getLogger('qemu-test') self.log.setLevel(logging.DEBUG) - self._log_fh = logging.FileHandler(os.path.join(self.logdir, - 'base.log'), mode='w') + self._log_fh = logging.FileHandler(self.log_filename, mode='w') self._log_fh.setLevel(logging.DEBUG) fileFormatter = logging.Formatter( '%(asctime)s - %(levelname)s: %(message)s') self._log_fh.setFormatter(fileFormatter) self.log.addHandler(self._log_fh) + # Capture QEMUMachine logging + self.machinelog = logging.getLogger('qemu.machine') + self.machinelog.setLevel(logging.DEBUG) + self.machinelog.addHandler(self._log_fh) + + if not self.assets_available(): + self.skipTest('One or more assets is not available') + def tearDown(self): + if "QEMU_TEST_KEEP_SCRATCH" not in os.environ: + shutil.rmtree(self.workdir) + if self.socketdir is not None: + shutil.rmtree(self.socketdir.name) + self.socketdir = None + self.machinelog.removeHandler(self._log_fh) self.log.removeHandler(self._log_fh) def main(): @@ -68,24 +243,33 @@ class QemuBaseTest(unittest.TestCase): tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError, test_output_log = pycotap.LogMode.LogToError) - unittest.main(module = None, testRunner = tr, argv=["__dummy__", path]) + res = unittest.main(module = None, testRunner = tr, exit = False, + argv=["__dummy__", path]) + for (test, message) in res.result.errors + res.result.failures: + + if hasattr(test, "log_filename"): + print('More information on ' + test.id() + ' could be found here:' + '\n %s' % test.log_filename, file=sys.stderr) + if hasattr(test, 'console_log_name'): + print(' %s' % test.console_log_name, file=sys.stderr) + sys.exit(not res.result.wasSuccessful()) class QemuUserTest(QemuBaseTest): def setUp(self): - super().setUp('qemu-') + super().setUp() self._ldpath = [] def add_ldpath(self, ldpath): self._ldpath.append(os.path.abspath(ldpath)) def run_cmd(self, bin_path, args=[]): - return subprocess.run([self.qemu_bin] - + ["-L %s" % ldpath for ldpath in self._ldpath] - + [bin_path] - + args, - text=True, capture_output=True) + return run([self.qemu_bin] + + ["-L %s" % ldpath for ldpath in self._ldpath] + + [bin_path] + + args, + text=True, capture_output=True) class QemuSystemTest(QemuBaseTest): """Facilitates system emulation tests.""" @@ -97,12 +281,13 @@ class QemuSystemTest(QemuBaseTest): def setUp(self): self._vms = {} - super().setUp('qemu-system-') + super().setUp() console_log = logging.getLogger('console') console_log.setLevel(logging.DEBUG) - self._console_log_fh = logging.FileHandler(os.path.join(self.workdir, - 'console.log'), mode='w') + self.console_log_name = self.log_file('console.log') + self._console_log_fh = logging.FileHandler(self.console_log_name, + mode='w') self._console_log_fh.setLevel(logging.DEBUG) fileFormatter = logging.Formatter('%(asctime)s: %(message)s') self._console_log_fh.setFormatter(fileFormatter) @@ -111,7 +296,9 @@ class QemuSystemTest(QemuBaseTest): def set_machine(self, machinename): # TODO: We should use QMP to get the list of available machines if not self._machinehelp: - self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0]; + self._machinehelp = run( + [self.qemu_bin, '-M', 'help'], + capture_output=True, check=True, encoding='utf8').stdout if self._machinehelp.find(machinename) < 0: self.skipTest('no support for machine ' + machinename) self.machine = machinename @@ -130,7 +317,9 @@ class QemuSystemTest(QemuBaseTest): :type accelerator: str """ checker = {'tcg': tcg_available, - 'kvm': kvm_available}.get(accelerator) + 'kvm': kvm_available, + 'hvf': hvf_available, + }.get(accelerator) if checker is None: self.skipTest("Don't know how to check for the presence " "of accelerator %s" % accelerator) @@ -139,22 +328,33 @@ class QemuSystemTest(QemuBaseTest): "available" % accelerator) def require_netdev(self, netdevname): - netdevhelp = run_cmd([self.qemu_bin, - '-M', 'none', '-netdev', 'help'])[0]; - if netdevhelp.find('\n' + netdevname + '\n') < 0: + help = run([self.qemu_bin, + '-M', 'none', '-netdev', 'help'], + capture_output=True, check=True, encoding='utf8').stdout; + if help.find('\n' + netdevname + '\n') < 0: self.skipTest('no support for " + netdevname + " networking') def require_device(self, devicename): - devhelp = run_cmd([self.qemu_bin, - '-M', 'none', '-device', 'help'])[0]; - if devhelp.find(devicename) < 0: + help = run([self.qemu_bin, + '-M', 'none', '-device', 'help'], + capture_output=True, check=True, encoding='utf8').stdout; + if help.find(devicename) < 0: self.skipTest('no support for device ' + devicename) def _new_vm(self, name, *args): - vm = QEMUMachine(self.qemu_bin, base_temp_dir=self.workdir) + vm = QEMUMachine(self.qemu_bin, + name=name, + base_temp_dir=self.workdir, + log_dir=self.log_file()) self.log.debug('QEMUMachine "%s" created', name) self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir) - self.log.debug('QEMUMachine "%s" log_dir: %s', name, vm.log_dir) + + sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None) + if sockpath is not None: + vm.add_args("-chardev", + f"socket,id=backdoor,path={sockpath},server=on,wait=off", + "-mon", "chardev=backdoor,mode=control") + if args: vm.add_args(*args) return vm diff --git a/tests/functional/qemu_test/tuxruntest.py b/tests/functional/qemu_test/tuxruntest.py new file mode 100644 index 0000000..6c442ff --- /dev/null +++ b/tests/functional/qemu_test/tuxruntest.py @@ -0,0 +1,136 @@ +# Functional test that boots known good tuxboot images the same way +# that tuxrun (www.tuxrun.org) does. This tool is used by things like +# the LKFT project to run regression tests on kernels. +# +# Copyright (c) 2023 Linaro Ltd. +# +# Author: +# Alex Bennée <alex.bennee@linaro.org> +# +# SPDX-License-Identifier: GPL-2.0-or-later + +import os + +from qemu_test import QemuSystemTest +from qemu_test import exec_command_and_wait_for_pattern +from qemu_test import wait_for_console_pattern +from qemu_test import which, get_qemu_img + +class TuxRunBaselineTest(QemuSystemTest): + + KERNEL_COMMON_COMMAND_LINE = 'printk.time=0' + # Tests are ~10-40s, allow for --debug/--enable-gcov overhead + timeout = 100 + + def setUp(self): + super().setUp() + + # We need zstd for all the tuxrun tests + if which('zstd') is None: + self.skipTest("zstd not found in $PATH") + + # Pre-init TuxRun specific settings: Most machines work with + # reasonable defaults but we sometimes need to tweak the + # config. To avoid open coding everything we store all these + # details in the metadata for each test. + + # The tuxboot tag matches the root directory + self.tuxboot = self.arch + + # Most Linux's use ttyS0 for their serial port + self.console = "ttyS0" + + # Does the machine shutdown QEMU nicely on "halt" + self.wait_for_shutdown = True + + self.root = "vda" + + # Occasionally we need extra devices to hook things up + self.extradev = None + + self.qemu_img = get_qemu_img(self) + + def wait_for_console_pattern(self, success_message, vm=None): + wait_for_console_pattern(self, success_message, + failure_message='Kernel panic - not syncing', + vm=vm) + + def fetch_tuxrun_assets(self, kernel_asset, rootfs_asset, dtb_asset=None): + """ + Fetch the TuxBoot assets. + """ + kernel_image = kernel_asset.fetch() + disk_image = self.uncompress(rootfs_asset) + dtb = dtb_asset.fetch() if dtb_asset is not None else None + + return (kernel_image, disk_image, dtb) + + def prepare_run(self, kernel, disk, drive, dtb=None, console_index=0): + """ + Setup to run and add the common parameters to the system + """ + self.vm.set_console(console_index=console_index) + + # all block devices are raw ext4's + blockdev = "driver=raw,file.driver=file," \ + + f"file.filename={disk},node-name=hd0" + + self.kcmd_line = self.KERNEL_COMMON_COMMAND_LINE + self.kcmd_line += f" root=/dev/{self.root}" + self.kcmd_line += f" console={self.console}" + + self.vm.add_args('-kernel', kernel, + '-append', self.kcmd_line, + '-blockdev', blockdev) + + # Sometimes we need extra devices attached + if self.extradev: + self.vm.add_args('-device', self.extradev) + + self.vm.add_args('-device', + f"{drive},drive=hd0") + + # Some machines need an explicit DTB + if dtb: + self.vm.add_args('-dtb', dtb) + + def run_tuxtest_tests(self, haltmsg): + """ + Wait for the system to boot up, wait for the login prompt and + then do a few things on the console. Trigger a shutdown and + wait to exit cleanly. + """ + ps1='root@tuxtest:~#' + self.wait_for_console_pattern(self.kcmd_line) + self.wait_for_console_pattern('tuxtest login:') + exec_command_and_wait_for_pattern(self, 'root', ps1) + exec_command_and_wait_for_pattern(self, 'cat /proc/interrupts', ps1) + exec_command_and_wait_for_pattern(self, 'cat /proc/self/maps', ps1) + exec_command_and_wait_for_pattern(self, 'uname -a', ps1) + exec_command_and_wait_for_pattern(self, 'halt', haltmsg) + + # Wait for VM to shut down gracefully if it can + if self.wait_for_shutdown: + self.vm.wait() + else: + self.vm.shutdown() + + def common_tuxrun(self, + kernel_asset, + rootfs_asset, + dtb_asset=None, + drive="virtio-blk-device", + haltmsg="reboot: System halted", + console_index=0): + """ + Common path for LKFT tests. Unless we need to do something + special with the command line we can process most things using + the tag metadata. + """ + (kernel, disk, dtb) = self.fetch_tuxrun_assets(kernel_asset, rootfs_asset, + dtb_asset) + + self.prepare_run(kernel, disk, drive, dtb, console_index) + self.vm.launch() + self.run_tuxtest_tests(haltmsg) + os.remove(disk) diff --git a/tests/functional/qemu_test/uncompress.py b/tests/functional/qemu_test/uncompress.py new file mode 100644 index 0000000..b7ef8f7 --- /dev/null +++ b/tests/functional/qemu_test/uncompress.py @@ -0,0 +1,107 @@ +# SPDX-License-Identifier: GPL-2.0-or-later +# +# Utilities for python-based QEMU tests +# +# Copyright 2024 Red Hat, Inc. +# +# Authors: +# Thomas Huth <thuth@redhat.com> + +import gzip +import lzma +import os +import stat +import shutil +from urllib.parse import urlparse +from subprocess import run, CalledProcessError + +from .asset import Asset + + +def gzip_uncompress(gz_path, output_path): + if os.path.exists(output_path): + return + with gzip.open(gz_path, 'rb') as gz_in: + try: + with open(output_path, 'wb') as raw_out: + shutil.copyfileobj(gz_in, raw_out) + except: + os.remove(output_path) + raise + +def lzma_uncompress(xz_path, output_path): + if os.path.exists(output_path): + return + with lzma.open(xz_path, 'rb') as lzma_in: + try: + with open(output_path, 'wb') as raw_out: + shutil.copyfileobj(lzma_in, raw_out) + except: + os.remove(output_path) + raise + + +def zstd_uncompress(zstd_path, output_path): + if os.path.exists(output_path): + return + + try: + run(['zstd', "-f", "-d", zstd_path, + "-o", output_path], capture_output=True, check=True) + except CalledProcessError as e: + os.remove(output_path) + raise Exception( + f"Unable to decompress zstd file {zstd_path} with {e}") from e + + # zstd copies source archive permissions for the output + # file, so must make this writable for QEMU + os.chmod(output_path, stat.S_IRUSR | stat.S_IWUSR) + + +''' +@params compressed: filename, Asset, or file-like object to uncompress +@params uncompressed: filename to uncompress into +@params format: optional compression format (gzip, lzma) + +Uncompresses @compressed into @uncompressed + +If @format is None, heuristics will be applied to guess the format +from the filename or Asset URL. @format must be non-None if @uncompressed +is a file-like object. + +Returns the fully qualified path to the uncompessed file +''' +def uncompress(compressed, uncompressed, format=None): + if format is None: + format = guess_uncompress_format(compressed) + + if format == "xz": + lzma_uncompress(str(compressed), uncompressed) + elif format == "gz": + gzip_uncompress(str(compressed), uncompressed) + elif format == "zstd": + zstd_uncompress(str(compressed), uncompressed) + else: + raise Exception(f"Unknown compression format {format}") + +''' +@params compressed: filename, Asset, or file-like object to guess + +Guess the format of @compressed, raising an exception if +no format can be determined +''' +def guess_uncompress_format(compressed): + if type(compressed) == Asset: + compressed = urlparse(compressed.url).path + elif type(compressed) != str: + raise Exception(f"Unable to guess compression cformat for {compressed}") + + (name, ext) = os.path.splitext(compressed) + if ext == ".xz": + return "xz" + elif ext == ".gz": + return "gz" + elif ext in [".zstd", ".zst"]: + return 'zstd' + else: + raise Exception(f"Unknown compression format for {compressed}") diff --git a/tests/functional/qemu_test/utils.py b/tests/functional/qemu_test/utils.py index 2a1cb60..e7c8de8 100644 --- a/tests/functional/qemu_test/utils.py +++ b/tests/functional/qemu_test/utils.py @@ -8,49 +8,32 @@ # This work is licensed under the terms of the GNU GPL, version 2 or # later. See the COPYING file in the top-level directory. -import gzip -import lzma import os -import shutil -import subprocess -import tarfile -def archive_extract(archive, dest_dir, member=None): - with tarfile.open(archive) as tf: - if hasattr(tarfile, 'data_filter'): - tf.extraction_filter = getattr(tarfile, 'data_filter', - (lambda member, path: member)) - if member: - tf.extract(member=member, path=dest_dir) - else: - tf.extractall(path=dest_dir) +from qemu.utils import get_info_usernet_hostfwd_port -def gzip_uncompress(gz_path, output_path): - if os.path.exists(output_path): - return - with gzip.open(gz_path, 'rb') as gz_in: - try: - with open(output_path, 'wb') as raw_out: - shutil.copyfileobj(gz_in, raw_out) - except: - os.remove(output_path) - raise -def lzma_uncompress(xz_path, output_path): - if os.path.exists(output_path): - return - with lzma.open(xz_path, 'rb') as lzma_in: - try: - with open(output_path, 'wb') as raw_out: - shutil.copyfileobj(lzma_in, raw_out) - except: - os.remove(output_path) - raise +def get_usernet_hostfwd_port(vm): + res = vm.cmd('human-monitor-command', command_line='info usernet') + return get_info_usernet_hostfwd_port(res) -def cpio_extract(cpio_handle, output_path): - cwd = os.getcwd() - os.chdir(output_path) - subprocess.run(['cpio', '-i'], - input=cpio_handle.read(), - stderr=subprocess.DEVNULL) - os.chdir(cwd) +""" +Round up to next power of 2 +""" +def pow2ceil(x): + return 1 if x == 0 else 2**(x - 1).bit_length() + +def file_truncate(path, size): + if size != os.path.getsize(path): + with open(path, 'ab+') as fd: + fd.truncate(size) + +""" +Expand file size to next power of 2 +""" +def image_pow2ceil_expand(path): + size = os.path.getsize(path) + size_aligned = pow2ceil(size) + if size != size_aligned: + with open(path, 'ab+') as fd: + fd.truncate(size_aligned) |