aboutsummaryrefslogtreecommitdiff
path: root/tests/avocado/avocado_qemu
diff options
context:
space:
mode:
Diffstat (limited to 'tests/avocado/avocado_qemu')
-rw-r--r--tests/avocado/avocado_qemu/__init__.py424
-rw-r--r--tests/avocado/avocado_qemu/linuxtest.py253
2 files changed, 0 insertions, 677 deletions
diff --git a/tests/avocado/avocado_qemu/__init__.py b/tests/avocado/avocado_qemu/__init__.py
deleted file mode 100644
index 93c3460..0000000
--- a/tests/avocado/avocado_qemu/__init__.py
+++ /dev/null
@@ -1,424 +0,0 @@
-# Test class and utilities for functional tests
-#
-# Copyright (c) 2018 Red Hat, Inc.
-#
-# Author:
-# Cleber Rosa <crosa@redhat.com>
-#
-# This work is licensed under the terms of the GNU GPL, version 2 or
-# later. See the COPYING file in the top-level directory.
-
-import logging
-import os
-import subprocess
-import sys
-import tempfile
-import time
-import uuid
-
-import avocado
-from avocado.utils import ssh
-from avocado.utils.path import find_command
-
-from qemu.machine import QEMUMachine
-from qemu.utils import (get_info_usernet_hostfwd_port, kvm_available,
- tcg_available)
-
-
-#: The QEMU build root directory. It may also be the source directory
-#: if building from the source dir, but it's safer to use BUILD_DIR for
-#: that purpose. Be aware that if this code is moved outside of a source
-#: and build tree, it will not be accurate.
-BUILD_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
-
-
-def has_cmd(name, args=None):
- """
- This function is for use in a @avocado.skipUnless decorator, e.g.:
-
- @skipUnless(*has_cmd('sudo -n', ('sudo', '-n', 'true')))
- def test_something_that_needs_sudo(self):
- ...
- """
-
- if args is None:
- args = ('which', name)
-
- try:
- _, stderr, exitcode = run_cmd(args)
- except Exception as e:
- exitcode = -1
- stderr = str(e)
-
- if exitcode != 0:
- cmd_line = ' '.join(args)
- err = f'{name} required, but "{cmd_line}" failed: {stderr.strip()}'
- return (False, err)
- else:
- return (True, '')
-
-def has_cmds(*cmds):
- """
- This function is for use in a @avocado.skipUnless decorator and
- allows checking for the availability of multiple commands, e.g.:
-
- @skipUnless(*has_cmds(('cmd1', ('cmd1', '--some-parameter')),
- 'cmd2', 'cmd3'))
- def test_something_that_needs_cmd1_and_cmd2(self):
- ...
- """
-
- for cmd in cmds:
- if isinstance(cmd, str):
- cmd = (cmd,)
-
- ok, errstr = has_cmd(*cmd)
- if not ok:
- return (False, errstr)
-
- return (True, '')
-
-def run_cmd(args):
- subp = subprocess.Popen(args,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- universal_newlines=True)
- stdout, stderr = subp.communicate()
- ret = subp.returncode
-
- return (stdout, stderr, ret)
-
-def is_readable_executable_file(path):
- return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK)
-
-
-def pick_default_qemu_bin(bin_prefix='qemu-system-', arch=None):
- """
- Picks the path of a QEMU binary, starting either in the current working
- directory or in the source tree root directory.
-
- :param arch: the arch to use when looking for a QEMU binary (the target
- will match the arch given). If None (the default), arch
- will be the current host system arch (as given by
- :func:`os.uname`).
- :type arch: str
- :returns: the path to the default QEMU binary or None if one could not
- be found
- :rtype: str or None
- """
- if arch is None:
- arch = os.uname()[4]
- # qemu binary path does not match arch for powerpc, handle it
- if 'ppc64le' in arch:
- arch = 'ppc64'
- qemu_bin_name = bin_prefix + arch
- qemu_bin_paths = [
- os.path.join(".", qemu_bin_name),
- os.path.join(BUILD_DIR, qemu_bin_name),
- os.path.join(BUILD_DIR, "build", qemu_bin_name),
- ]
- for path in qemu_bin_paths:
- if is_readable_executable_file(path):
- return path
- return None
-
-
-def _console_interaction(test, success_message, failure_message,
- send_string, keep_sending=False, vm=None):
- assert not keep_sending or send_string
- if vm is None:
- vm = test.vm
- console = vm.console_file
- console_logger = logging.getLogger('console')
- while True:
- if send_string:
- vm.console_socket.sendall(send_string.encode())
- if not keep_sending:
- send_string = None # send only once
-
- # Only consume console output if waiting for something
- if success_message is None and failure_message is None:
- if send_string is None:
- break
- continue
-
- try:
- msg = console.readline().decode().strip()
- except UnicodeDecodeError:
- msg = None
- if not msg:
- continue
- console_logger.debug(msg)
- if success_message is None or success_message in msg:
- break
- if failure_message and failure_message in msg:
- console.close()
- fail = 'Failure message found in console: "%s". Expected: "%s"' % \
- (failure_message, success_message)
- test.fail(fail)
-
-def interrupt_interactive_console_until_pattern(test, success_message,
- failure_message=None,
- interrupt_string='\r'):
- """
- Keep sending a string to interrupt a console prompt, while logging the
- console output. Typical use case is to break a boot loader prompt, such:
-
- Press a key within 5 seconds to interrupt boot process.
- 5
- 4
- 3
- 2
- 1
- Booting default image...
-
- :param test: an Avocado test containing a VM that will have its console
- read and probed for a success or failure message
- :type test: :class:`avocado_qemu.QemuSystemTest`
- :param success_message: if this message appears, test succeeds
- :param failure_message: if this message appears, test fails
- :param interrupt_string: a string to send to the console before trying
- to read a new line
- """
- _console_interaction(test, success_message, failure_message,
- interrupt_string, True)
-
-def wait_for_console_pattern(test, success_message, failure_message=None,
- vm=None):
- """
- Waits for messages to appear on the console, while logging the content
-
- :param test: an Avocado test containing a VM that will have its console
- read and probed for a success or failure message
- :type test: :class:`avocado_qemu.QemuSystemTest`
- :param success_message: if this message appears, test succeeds
- :param failure_message: if this message appears, test fails
- """
- _console_interaction(test, success_message, failure_message, None, vm=vm)
-
-def exec_command(test, command):
- """
- Send a command to a console (appending CRLF characters), while logging
- the content.
-
- :param test: an Avocado test containing a VM.
- :type test: :class:`avocado_qemu.QemuSystemTest`
- :param command: the command to send
- :type command: str
- """
- _console_interaction(test, None, None, command + '\r')
-
-def exec_command_and_wait_for_pattern(test, command,
- success_message, failure_message=None):
- """
- Send a command to a console (appending CRLF characters), then wait
- for success_message to appear on the console, while logging the.
- content. Mark the test as failed if failure_message is found instead.
-
- :param test: an Avocado test containing a VM that will have its console
- read and probed for a success or failure message
- :type test: :class:`avocado_qemu.QemuSystemTest`
- :param command: the command to send
- :param success_message: if this message appears, test succeeds
- :param failure_message: if this message appears, test fails
- """
- _console_interaction(test, success_message, failure_message, command + '\r')
-
-class QemuBaseTest(avocado.Test):
-
- # default timeout for all tests, can be overridden
- timeout = 120
-
- def _get_unique_tag_val(self, tag_name):
- """
- Gets a tag value, if unique for a key
- """
- vals = self.tags.get(tag_name, [])
- if len(vals) == 1:
- return vals.pop()
- return None
-
- def setUp(self, bin_prefix):
- self.arch = self.params.get('arch',
- default=self._get_unique_tag_val('arch'))
-
- self.cpu = self.params.get('cpu',
- default=self._get_unique_tag_val('cpu'))
-
- default_qemu_bin = pick_default_qemu_bin(bin_prefix, arch=self.arch)
- self.qemu_bin = self.params.get('qemu_bin',
- default=default_qemu_bin)
- if self.qemu_bin is None:
- self.cancel("No QEMU binary defined or found in the build tree")
-
- def fetch_asset(self, name,
- asset_hash, algorithm=None,
- locations=None, expire=None,
- find_only=False, cancel_on_missing=True):
- return super().fetch_asset(name,
- asset_hash=asset_hash,
- algorithm=algorithm,
- locations=locations,
- expire=expire,
- find_only=find_only,
- cancel_on_missing=cancel_on_missing)
-
-
-class QemuSystemTest(QemuBaseTest):
- """Facilitates system emulation tests."""
-
- def setUp(self):
- self._vms = {}
-
- super().setUp('qemu-system-')
-
- accel_required = self._get_unique_tag_val('accel')
- if accel_required:
- self.require_accelerator(accel_required)
-
- self.machine = self.params.get('machine',
- default=self._get_unique_tag_val('machine'))
-
- def require_accelerator(self, accelerator):
- """
- Requires an accelerator to be available for the test to continue
-
- It takes into account the currently set qemu binary.
-
- If the check fails, the test is canceled. If the check itself
- for the given accelerator is not available, the test is also
- canceled.
-
- :param accelerator: name of the accelerator, such as "kvm" or "tcg"
- :type accelerator: str
- """
- checker = {'tcg': tcg_available,
- 'kvm': kvm_available}.get(accelerator)
- if checker is None:
- self.cancel("Don't know how to check for the presence "
- "of accelerator %s" % accelerator)
- if not checker(qemu_bin=self.qemu_bin):
- self.cancel("%s accelerator does not seem to be "
- "available" % accelerator)
-
- def require_netdev(self, netdevname):
- netdevhelp = run_cmd([self.qemu_bin,
- '-M', 'none', '-netdev', 'help'])[0];
- if netdevhelp.find('\n' + netdevname + '\n') < 0:
- self.cancel('no support for user networking')
-
- def _new_vm(self, name, *args):
- self._sd = tempfile.TemporaryDirectory(prefix="qemu_")
- vm = QEMUMachine(self.qemu_bin, base_temp_dir=self.workdir,
- log_dir=self.logdir)
- self.log.debug('QEMUMachine "%s" created', name)
- self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
- self.log.debug('QEMUMachine "%s" log_dir: %s', name, vm.log_dir)
- if args:
- vm.add_args(*args)
- return vm
-
- def get_qemu_img(self):
- self.log.debug('Looking for and selecting a qemu-img binary')
-
- # If qemu-img has been built, use it, otherwise the system wide one
- # will be used.
- qemu_img = os.path.join(BUILD_DIR, 'qemu-img')
- if not os.path.exists(qemu_img):
- qemu_img = find_command('qemu-img', False)
- if qemu_img is False:
- self.cancel('Could not find "qemu-img"')
-
- return qemu_img
-
- @property
- def vm(self):
- return self.get_vm(name='default')
-
- def get_vm(self, *args, name=None):
- if not name:
- name = str(uuid.uuid4())
- if self._vms.get(name) is None:
- self._vms[name] = self._new_vm(name, *args)
- if self.cpu is not None:
- self._vms[name].add_args('-cpu', self.cpu)
- if self.machine is not None:
- self._vms[name].set_machine(self.machine)
- return self._vms[name]
-
- def set_vm_arg(self, arg, value):
- """
- Set an argument to list of extra arguments to be given to the QEMU
- binary. If the argument already exists then its value is replaced.
-
- :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
- :type arg: str
- :param value: the argument value, such as "host" in "-cpu host"
- :type value: str
- """
- if not arg or not value:
- return
- if arg not in self.vm.args:
- self.vm.args.extend([arg, value])
- else:
- idx = self.vm.args.index(arg) + 1
- if idx < len(self.vm.args):
- self.vm.args[idx] = value
- else:
- self.vm.args.append(value)
-
- def tearDown(self):
- for vm in self._vms.values():
- vm.shutdown()
- self._sd = None
- super().tearDown()
-
-
-class LinuxSSHMixIn:
- """Contains utility methods for interacting with a guest via SSH."""
-
- def ssh_connect(self, username, credential, credential_is_key=True):
- self.ssh_logger = logging.getLogger('ssh')
- res = self.vm.cmd('human-monitor-command',
- command_line='info usernet')
- port = get_info_usernet_hostfwd_port(res)
- self.assertIsNotNone(port)
- self.assertGreater(port, 0)
- self.log.debug('sshd listening on port: %d', port)
- if credential_is_key:
- self.ssh_session = ssh.Session('127.0.0.1', port=port,
- user=username, key=credential)
- else:
- self.ssh_session = ssh.Session('127.0.0.1', port=port,
- user=username, password=credential)
- for i in range(10):
- try:
- self.ssh_session.connect()
- return
- except:
- time.sleep(i)
- self.fail('ssh connection timeout')
-
- def ssh_command(self, command):
- self.ssh_logger.info(command)
- result = self.ssh_session.cmd(command)
- stdout_lines = [line.rstrip() for line
- in result.stdout_text.splitlines()]
- for line in stdout_lines:
- self.ssh_logger.info(line)
- stderr_lines = [line.rstrip() for line
- in result.stderr_text.splitlines()]
- for line in stderr_lines:
- self.ssh_logger.warning(line)
-
- self.assertEqual(result.exit_status, 0,
- f'Guest command failed: {command}')
- return stdout_lines, stderr_lines
-
- def ssh_command_output_contains(self, cmd, exp):
- stdout, _ = self.ssh_command(cmd)
- for line in stdout:
- if exp in line:
- break
- else:
- self.fail('"%s" output does not contain "%s"' % (cmd, exp))
diff --git a/tests/avocado/avocado_qemu/linuxtest.py b/tests/avocado/avocado_qemu/linuxtest.py
deleted file mode 100644
index 66fb9f1..0000000
--- a/tests/avocado/avocado_qemu/linuxtest.py
+++ /dev/null
@@ -1,253 +0,0 @@
-# Test class and utilities for functional Linux-based tests
-#
-# Copyright (c) 2018 Red Hat, Inc.
-#
-# Author:
-# Cleber Rosa <crosa@redhat.com>
-#
-# This work is licensed under the terms of the GNU GPL, version 2 or
-# later. See the COPYING file in the top-level directory.
-
-import os
-import shutil
-
-from avocado.utils import cloudinit, datadrainer, process, vmimage
-
-from avocado_qemu import LinuxSSHMixIn
-from avocado_qemu import QemuSystemTest
-
-if os.path.islink(os.path.dirname(os.path.dirname(__file__))):
- # The link to the avocado tests dir in the source code directory
- lnk = os.path.dirname(os.path.dirname(__file__))
- #: The QEMU root source directory
- SOURCE_DIR = os.path.dirname(os.path.dirname(os.readlink(lnk)))
-else:
- SOURCE_DIR = BUILD_DIR
-
-class LinuxDistro:
- """Represents a Linux distribution
-
- Holds information of known distros.
- """
- #: A collection of known distros and their respective image checksum
- KNOWN_DISTROS = {
- 'fedora': {
- '31': {
- 'x86_64':
- {'checksum': ('e3c1b309d9203604922d6e255c2c5d09'
- '8a309c2d46215d8fc026954f3c5c27a0'),
- 'pxeboot_url': ('https://archives.fedoraproject.org/'
- 'pub/archive/fedora/linux/releases/31/'
- 'Everything/x86_64/os/images/pxeboot/'),
- 'kernel_params': ('root=UUID=b1438b9b-2cab-4065-a99a-'
- '08a96687f73c ro no_timer_check '
- 'net.ifnames=0 console=tty1 '
- 'console=ttyS0,115200n8'),
- },
- 'aarch64':
- {'checksum': ('1e18d9c0cf734940c4b5d5ec592facae'
- 'd2af0ad0329383d5639c997fdf16fe49'),
- 'pxeboot_url': 'https://archives.fedoraproject.org/'
- 'pub/archive/fedora/linux/releases/31/'
- 'Everything/aarch64/os/images/pxeboot/',
- 'kernel_params': ('root=UUID=b6950a44-9f3c-4076-a9c2-'
- '355e8475b0a7 ro earlyprintk=pl011,0x9000000'
- ' ignore_loglevel no_timer_check'
- ' printk.time=1 rd_NO_PLYMOUTH'
- ' console=ttyAMA0'),
- },
- 'ppc64':
- {'checksum': ('7c3528b85a3df4b2306e892199a9e1e4'
- '3f991c506f2cc390dc4efa2026ad2f58')},
- 's390x':
- {'checksum': ('4caaab5a434fd4d1079149a072fdc789'
- '1e354f834d355069ca982fdcaf5a122d')},
- },
- '32': {
- 'aarch64':
- {'checksum': ('b367755c664a2d7a26955bbfff985855'
- 'adfa2ca15e908baf15b4b176d68d3967'),
- 'pxeboot_url': ('http://dl.fedoraproject.org/pub/fedora/linux/'
- 'releases/32/Server/aarch64/os/images/'
- 'pxeboot/'),
- 'kernel_params': ('root=UUID=3df75b65-be8d-4db4-8655-'
- '14d95c0e90c5 ro no_timer_check net.ifnames=0'
- ' console=tty1 console=ttyS0,115200n8'),
- },
- },
- '33': {
- 'aarch64':
- {'checksum': ('e7f75cdfd523fe5ac2ca9eeece68edc1'
- 'a81f386a17f969c1d1c7c87031008a6b'),
- 'pxeboot_url': ('http://dl.fedoraproject.org/pub/fedora/linux/'
- 'releases/33/Server/aarch64/os/images/'
- 'pxeboot/'),
- 'kernel_params': ('root=UUID=d20b3ffa-6397-4a63-a734-'
- '1126a0208f8a ro no_timer_check net.ifnames=0'
- ' console=tty1 console=ttyS0,115200n8'
- ' console=tty0'),
- },
- },
- }
- }
-
- def __init__(self, name, version, arch):
- self.name = name
- self.version = version
- self.arch = arch
- try:
- info = self.KNOWN_DISTROS.get(name).get(version).get(arch)
- except AttributeError:
- # Unknown distro
- info = None
- self._info = info or {}
-
- @property
- def checksum(self):
- """Gets the cloud-image file checksum"""
- return self._info.get('checksum', None)
-
- @checksum.setter
- def checksum(self, value):
- self._info['checksum'] = value
-
- @property
- def pxeboot_url(self):
- """Gets the repository url where pxeboot files can be found"""
- return self._info.get('pxeboot_url', None)
-
- @property
- def default_kernel_params(self):
- """Gets the default kernel parameters"""
- return self._info.get('kernel_params', None)
-
-
-class LinuxTest(LinuxSSHMixIn, QemuSystemTest):
- """Facilitates having a cloud-image Linux based available.
-
- For tests that intend to interact with guests, this is a better choice
- to start with than the more vanilla `QemuSystemTest` class.
- """
-
- distro = None
- username = 'root'
- password = 'password'
- smp = '2'
- memory = '1024'
-
- def _set_distro(self):
- distro_name = self.params.get(
- 'distro',
- default=self._get_unique_tag_val('distro'))
- if not distro_name:
- distro_name = 'fedora'
-
- distro_version = self.params.get(
- 'distro_version',
- default=self._get_unique_tag_val('distro_version'))
- if not distro_version:
- distro_version = '31'
-
- self.distro = LinuxDistro(distro_name, distro_version, self.arch)
-
- # The distro checksum behaves differently than distro name and
- # version. First, it does not respect a tag with the same
- # name, given that it's not expected to be used for filtering
- # (distro name versions are the natural choice). Second, the
- # order of precedence is: parameter, attribute and then value
- # from KNOWN_DISTROS.
- distro_checksum = self.params.get('distro_checksum',
- default=None)
- if distro_checksum:
- self.distro.checksum = distro_checksum
-
- def setUp(self, ssh_pubkey=None, network_device_type='virtio-net'):
- super().setUp()
- self.require_netdev('user')
- self._set_distro()
- self.vm.add_args('-smp', self.smp)
- self.vm.add_args('-m', self.memory)
- # The following network device allows for SSH connections
- self.vm.add_args('-netdev', 'user,id=vnet,hostfwd=:127.0.0.1:0-:22',
- '-device', '%s,netdev=vnet' % network_device_type)
- self.set_up_boot()
- if ssh_pubkey is None:
- ssh_pubkey, self.ssh_key = self.set_up_existing_ssh_keys()
- self.set_up_cloudinit(ssh_pubkey)
-
- def set_up_existing_ssh_keys(self):
- ssh_public_key = os.path.join(SOURCE_DIR, 'tests', 'keys', 'id_rsa.pub')
- source_private_key = os.path.join(SOURCE_DIR, 'tests', 'keys', 'id_rsa')
- ssh_dir = os.path.join(self.workdir, '.ssh')
- os.mkdir(ssh_dir, mode=0o700)
- ssh_private_key = os.path.join(ssh_dir,
- os.path.basename(source_private_key))
- shutil.copyfile(source_private_key, ssh_private_key)
- os.chmod(ssh_private_key, 0o600)
- return (ssh_public_key, ssh_private_key)
-
- def download_boot(self):
- # Set the qemu-img binary.
- # If none is available, the test will cancel.
- vmimage.QEMU_IMG = super().get_qemu_img()
-
- self.log.info('Downloading/preparing boot image')
- # Fedora 31 only provides ppc64le images
- image_arch = self.arch
- if self.distro.name == 'fedora':
- if image_arch == 'ppc64':
- image_arch = 'ppc64le'
-
- try:
- boot = vmimage.get(
- self.distro.name, arch=image_arch, version=self.distro.version,
- checksum=self.distro.checksum,
- algorithm='sha256',
- cache_dir=self.cache_dirs[0],
- snapshot_dir=self.workdir)
- except:
- self.cancel('Failed to download/prepare boot image')
- return boot.path
-
- def prepare_cloudinit(self, ssh_pubkey=None):
- self.log.info('Preparing cloudinit image')
- try:
- cloudinit_iso = os.path.join(self.workdir, 'cloudinit.iso')
- pubkey_content = None
- if ssh_pubkey:
- with open(ssh_pubkey) as pubkey:
- pubkey_content = pubkey.read()
- cloudinit.iso(cloudinit_iso, self.name,
- username=self.username,
- password=self.password,
- # QEMU's hard coded usermode router address
- phone_home_host='10.0.2.2',
- phone_home_port=self.phone_server.server_port,
- authorized_key=pubkey_content)
- except Exception:
- self.cancel('Failed to prepare the cloudinit image')
- return cloudinit_iso
-
- def set_up_boot(self):
- path = self.download_boot()
- self.vm.add_args('-drive', 'file=%s' % path)
-
- def set_up_cloudinit(self, ssh_pubkey=None):
- self.phone_server = cloudinit.PhoneHomeServer(('0.0.0.0', 0),
- self.name)
- cloudinit_iso = self.prepare_cloudinit(ssh_pubkey)
- self.vm.add_args('-drive', 'file=%s,format=raw' % cloudinit_iso)
-
- def launch_and_wait(self, set_up_ssh_connection=True):
- self.vm.set_console()
- self.vm.launch()
- console_drainer = datadrainer.LineLogger(self.vm.console_socket.fileno(),
- logger=self.log.getChild('console'))
- console_drainer.start()
- self.log.info('VM launched, waiting for boot confirmation from guest')
- while not self.phone_server.instance_phoned_back:
- self.phone_server.handle_request()
-
- if set_up_ssh_connection:
- self.log.info('Setting up the SSH connection')
- self.ssh_connect(self.username, self.ssh_key)