aboutsummaryrefslogtreecommitdiff
path: root/tests/qemu-iotests/iotests.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/qemu-iotests/iotests.py')
-rw-r--r--tests/qemu-iotests/iotests.py366
1 files changed, 219 insertions, 147 deletions
diff --git a/tests/qemu-iotests/iotests.py b/tests/qemu-iotests/iotests.py
index 5f8c263..6c0e781 100644
--- a/tests/qemu-iotests/iotests.py
+++ b/tests/qemu-iotests/iotests.py
@@ -16,26 +16,39 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
-import errno
+import atexit
+from collections import OrderedDict
+import faulthandler
+import io
+import json
+import logging
import os
import re
+import signal
+import struct
import subprocess
-import string
-import unittest
import sys
-import struct
-import json
-import signal
-import logging
-import atexit
-import io
-from collections import OrderedDict
-import faulthandler
+from typing import (Any, Callable, Dict, Iterable,
+ List, Optional, Sequence, TypeVar)
+import unittest
+# pylint: disable=import-error, wrong-import-position
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
from qemu import qtest
-assert sys.version_info >= (3,6)
+assert sys.version_info >= (3, 6)
+
+# Type Aliases
+QMPResponse = Dict[str, Any]
+
+
+# Use this logger for logging messages directly from the iotests module
+logger = logging.getLogger('qemu.iotests')
+logger.addHandler(logging.NullHandler())
+
+# Use this logger for messages that ought to be used for diff output.
+test_logger = logging.getLogger('qemu.iotests.diff_io')
+
faulthandler.enable()
@@ -80,9 +93,11 @@ luks_default_key_secret_opt = 'key-secret=keysec0'
def qemu_img(*args):
'''Run qemu-img and return the exit code'''
devnull = open('/dev/null', 'r+')
- exitcode = subprocess.call(qemu_img_args + list(args), stdin=devnull, stdout=devnull)
+ exitcode = subprocess.call(qemu_img_args + list(args),
+ stdin=devnull, stdout=devnull)
if exitcode < 0:
- sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
+ sys.stderr.write('qemu-img received signal %i: %s\n'
+ % (-exitcode, ' '.join(qemu_img_args + list(args))))
return exitcode
def ordered_qmp(qmsg, conv_keys=True):
@@ -121,7 +136,8 @@ def qemu_img_verbose(*args):
'''Run qemu-img without suppressing its output and return the exit code'''
exitcode = subprocess.call(qemu_img_args + list(args))
if exitcode < 0:
- sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
+ sys.stderr.write('qemu-img received signal %i: %s\n'
+ % (-exitcode, ' '.join(qemu_img_args + list(args))))
return exitcode
def qemu_img_pipe(*args):
@@ -132,7 +148,8 @@ def qemu_img_pipe(*args):
universal_newlines=True)
exitcode = subp.wait()
if exitcode < 0:
- sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
+ sys.stderr.write('qemu-img received signal %i: %s\n'
+ % (-exitcode, ' '.join(qemu_img_args + list(args))))
return subp.communicate()[0]
def qemu_img_log(*args):
@@ -140,12 +157,12 @@ def qemu_img_log(*args):
log(result, filters=[filter_testfiles])
return result
-def img_info_log(filename, filter_path=None, imgopts=False, extra_args=[]):
- args = [ 'info' ]
+def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
+ args = ['info']
if imgopts:
args.append('--image-opts')
else:
- args += [ '-f', imgfmt ]
+ args += ['-f', imgfmt]
args += extra_args
args.append(filename)
@@ -162,7 +179,8 @@ def qemu_io(*args):
universal_newlines=True)
exitcode = subp.wait()
if exitcode < 0:
- sys.stderr.write('qemu-io received signal %i: %s\n' % (-exitcode, ' '.join(args)))
+ sys.stderr.write('qemu-io received signal %i: %s\n'
+ % (-exitcode, ' '.join(args)))
return subp.communicate()[0]
def qemu_io_log(*args):
@@ -224,7 +242,7 @@ class QemuIoInteractive:
# quit command is in close(), '\n' is added automatically
assert '\n' not in cmd
cmd = cmd.strip()
- assert cmd != 'q' and cmd != 'quit'
+ assert cmd not in ('q', 'quit')
self._p.stdin.write(cmd + '\n')
self._p.stdin.flush()
return self._read_output()
@@ -246,10 +264,8 @@ def qemu_nbd_early_pipe(*args):
sys.stderr.write('qemu-nbd received signal %i: %s\n' %
(-exitcode,
' '.join(qemu_nbd_args + ['--fork'] + list(args))))
- if exitcode == 0:
- return exitcode, ''
- else:
- return exitcode, subp.communicate()[0]
+
+ return exitcode, subp.communicate()[0] if exitcode else ''
def qemu_nbd_popen(*args):
'''Run qemu-nbd in daemon mode and return the parent's exit code'''
@@ -286,10 +302,13 @@ win32_re = re.compile(r"\r")
def filter_win32(msg):
return win32_re.sub("", msg)
-qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* \([0-9\/.inf]* [EPTGMKiBbytes]*\/sec and [0-9\/.inf]* ops\/sec\)")
+qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
+ r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
+ r"and [0-9\/.inf]* ops\/sec\)")
def filter_qemu_io(msg):
msg = filter_win32(msg)
- return qemu_io_re.sub("X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)", msg)
+ return qemu_io_re.sub("X ops; XX:XX:XX.X "
+ "(XXX YYY/sec and XXX ops/sec)", msg)
chown_re = re.compile(r"chown [0-9]+:[0-9]+")
def filter_chown(msg):
@@ -313,7 +332,7 @@ def filter_qmp(qmsg, filter_fn):
items = qmsg.items()
for k, v in items:
- if isinstance(v, list) or isinstance(v, dict):
+ if isinstance(v, (dict, list)):
qmsg[k] = filter_qmp(v, filter_fn)
else:
qmsg[k] = filter_fn(k, v)
@@ -324,7 +343,7 @@ def filter_testfiles(msg):
return msg.replace(prefix, 'TEST_DIR/PID-')
def filter_qmp_testfiles(qmsg):
- def _filter(key, value):
+ def _filter(_key, value):
if is_str(value):
return filter_testfiles(value)
return value
@@ -342,7 +361,9 @@ def filter_img_info(output, filename):
line = filter_testfiles(line)
line = line.replace(imgfmt, 'IMGFMT')
line = re.sub('iters: [0-9]+', 'iters: XXX', line)
- line = re.sub('uuid: [-a-f0-9]+', 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', line)
+ line = re.sub('uuid: [-a-f0-9]+',
+ 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
+ line)
line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
lines.append(line)
return '\n'.join(lines)
@@ -351,36 +372,40 @@ def filter_imgfmt(msg):
return msg.replace(imgfmt, 'IMGFMT')
def filter_qmp_imgfmt(qmsg):
- def _filter(key, value):
+ def _filter(_key, value):
if is_str(value):
return filter_imgfmt(value)
return value
return filter_qmp(qmsg, _filter)
-def log(msg, filters=[], indent=None):
- '''Logs either a string message or a JSON serializable message (like QMP).
- If indent is provided, JSON serializable messages are pretty-printed.'''
+
+Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
+
+def log(msg: Msg,
+ filters: Iterable[Callable[[Msg], Msg]] = (),
+ indent: Optional[int] = None) -> None:
+ """
+ Logs either a string message or a JSON serializable message (like QMP).
+ If indent is provided, JSON serializable messages are pretty-printed.
+ """
for flt in filters:
msg = flt(msg)
- if isinstance(msg, dict) or isinstance(msg, list):
- # Python < 3.4 needs to know not to add whitespace when pretty-printing:
- separators = (', ', ': ') if indent is None else (',', ': ')
+ if isinstance(msg, (dict, list)):
# Don't sort if it's already sorted
do_sort = not isinstance(msg, OrderedDict)
- print(json.dumps(msg, sort_keys=do_sort,
- indent=indent, separators=separators))
+ test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
else:
- print(msg)
+ test_logger.info(msg)
class Timeout:
- def __init__(self, seconds, errmsg = "Timeout"):
+ def __init__(self, seconds, errmsg="Timeout"):
self.seconds = seconds
self.errmsg = errmsg
def __enter__(self):
signal.signal(signal.SIGALRM, self.timeout)
signal.setitimer(signal.ITIMER_REAL, self.seconds)
return self
- def __exit__(self, type, value, traceback):
+ def __exit__(self, exc_type, value, traceback):
signal.setitimer(signal.ITIMER_REAL, 0)
return False
def timeout(self, signum, frame):
@@ -389,7 +414,7 @@ class Timeout:
def file_pattern(name):
return "{0}-{1}".format(os.getpid(), name)
-class FilePaths(object):
+class FilePaths:
"""
FilePaths is an auto-generated filename that cleans itself up.
@@ -490,21 +515,21 @@ class VM(qtest.QEMUQtestMachine):
self._args.append(opts)
return self
- def add_drive(self, path, opts='', interface='virtio', format=imgfmt):
+ def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
'''Add a virtio-blk drive to the VM'''
options = ['if=%s' % interface,
'id=drive%d' % self._num_drives]
if path is not None:
options.append('file=%s' % path)
- options.append('format=%s' % format)
+ options.append('format=%s' % img_format)
options.append('cache=%s' % cachemode)
options.append('aio=%s' % aiomode)
if opts:
options.append(opts)
- if format == 'luks' and 'key-secret' not in opts:
+ if img_format == 'luks' and 'key-secret' not in opts:
# default luks support
if luks_default_secret_object not in self._args:
self.add_object(luks_default_secret_object)
@@ -529,30 +554,37 @@ class VM(qtest.QEMUQtestMachine):
self._args.append(addr)
return self
- def pause_drive(self, drive, event=None):
- '''Pause drive r/w operations'''
+ def hmp(self, command_line: str, use_log: bool = False) -> QMPResponse:
+ cmd = 'human-monitor-command'
+ kwargs = {'command-line': command_line}
+ if use_log:
+ return self.qmp_log(cmd, **kwargs)
+ else:
+ return self.qmp(cmd, **kwargs)
+
+ def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
+ """Pause drive r/w operations"""
if not event:
self.pause_drive(drive, "read_aio")
self.pause_drive(drive, "write_aio")
return
- self.qmp('human-monitor-command',
- command_line='qemu-io %s "break %s bp_%s"' % (drive, event, drive))
+ self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
- def resume_drive(self, drive):
- self.qmp('human-monitor-command',
- command_line='qemu-io %s "remove_break bp_%s"' % (drive, drive))
+ def resume_drive(self, drive: str) -> None:
+ """Resume drive r/w operations"""
+ self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
- def hmp_qemu_io(self, drive, cmd):
- '''Write to a given drive using an HMP command'''
- return self.qmp('human-monitor-command',
- command_line='qemu-io %s "%s"' % (drive, cmd))
+ def hmp_qemu_io(self, drive: str, cmd: str,
+ use_log: bool = False) -> QMPResponse:
+ """Write to a given drive using an HMP command"""
+ return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
def flatten_qmp_object(self, obj, output=None, basestr=''):
if output is None:
output = dict()
if isinstance(obj, list):
- for i in range(len(obj)):
- self.flatten_qmp_object(obj[i], output, basestr + str(i) + '.')
+ for i, item in enumerate(obj):
+ self.flatten_qmp_object(item, output, basestr + str(i) + '.')
elif isinstance(obj, dict):
for key in obj:
self.flatten_qmp_object(obj[key], output, basestr + key + '.')
@@ -573,7 +605,7 @@ class VM(qtest.QEMUQtestMachine):
result.append(filter_qmp_event(ev))
return result
- def qmp_log(self, cmd, filters=[], indent=None, **kwargs):
+ def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
full_cmd = OrderedDict((
("execute", cmd),
("arguments", ordered_qmp(kwargs))
@@ -585,7 +617,7 @@ class VM(qtest.QEMUQtestMachine):
# Returns None on success, and an error string on failure
def run_job(self, job, auto_finalize=True, auto_dismiss=False,
- pre_finalize=None, cancel=False, use_log=True, wait=60.0):
+ pre_finalize=None, cancel=False, wait=60.0):
"""
run_job moves a job from creation through to dismissal.
@@ -598,7 +630,6 @@ class VM(qtest.QEMUQtestMachine):
invoked prior to issuing job-finalize, if any.
:param cancel: Bool. When true, cancels the job after the pre_finalize
callback.
- :param use_log: Bool. When false, does not log QMP messages.
:param wait: Float. Timeout value specifying how long to wait for any
event, in seconds. Defaults to 60.0.
"""
@@ -616,8 +647,7 @@ class VM(qtest.QEMUQtestMachine):
while True:
ev = filter_qmp_event(self.events_wait(events, timeout=wait))
if ev['event'] != 'JOB_STATUS_CHANGE':
- if use_log:
- log(ev)
+ log(ev)
continue
status = ev['data']['status']
if status == 'aborting':
@@ -625,29 +655,18 @@ class VM(qtest.QEMUQtestMachine):
for j in result['return']:
if j['id'] == job:
error = j['error']
- if use_log:
- log('Job failed: %s' % (j['error']))
+ log('Job failed: %s' % (j['error']))
elif status == 'ready':
- if use_log:
- self.qmp_log('job-complete', id=job)
- else:
- self.qmp('job-complete', id=job)
+ self.qmp_log('job-complete', id=job)
elif status == 'pending' and not auto_finalize:
if pre_finalize:
pre_finalize()
- if cancel and use_log:
+ if cancel:
self.qmp_log('job-cancel', id=job)
- elif cancel:
- self.qmp('job-cancel', id=job)
- elif use_log:
- self.qmp_log('job-finalize', id=job)
else:
- self.qmp('job-finalize', id=job)
+ self.qmp_log('job-finalize', id=job)
elif status == 'concluded' and not auto_dismiss:
- if use_log:
- self.qmp_log('job-dismiss', id=job)
- else:
- self.qmp('job-dismiss', id=job)
+ self.qmp_log('job-dismiss', id=job)
elif status == 'null':
return error
@@ -710,9 +729,7 @@ class VM(qtest.QEMUQtestMachine):
for bitmap in bitmaps[node_name]:
if bitmap.get('name', '') == bitmap_name:
- if recording is None:
- return bitmap
- elif bitmap.get('recording') == recording:
+ if recording is None or bitmap.get('recording') == recording:
return bitmap
return None
@@ -763,12 +780,13 @@ class VM(qtest.QEMUQtestMachine):
assert node is not None, 'Cannot follow path %s%s' % (root, path)
try:
- node_id = next(edge['child'] for edge in graph['edges'] \
- if edge['parent'] == node['id'] and
- edge['name'] == child_name)
+ node_id = next(edge['child'] for edge in graph['edges']
+ if (edge['parent'] == node['id'] and
+ edge['name'] == child_name))
+
+ node = next(node for node in graph['nodes']
+ if node['id'] == node_id)
- node = next(node for node in graph['nodes'] \
- if node['id'] == node_id)
except StopIteration:
node = None
@@ -786,6 +804,12 @@ index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
class QMPTestCase(unittest.TestCase):
'''Abstract base class for QMP test cases'''
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ # Many users of this class set a VM property we rely on heavily
+ # in the methods below.
+ self.vm = None
+
def dictpath(self, d, path):
'''Traverse a path in a nested dict'''
for component in path.split('/'):
@@ -795,16 +819,18 @@ class QMPTestCase(unittest.TestCase):
idx = int(idx)
if not isinstance(d, dict) or component not in d:
- self.fail('failed path traversal for "%s" in "%s"' % (path, str(d)))
+ self.fail(f'failed path traversal for "{path}" in "{d}"')
d = d[component]
if m:
if not isinstance(d, list):
- self.fail('path component "%s" in "%s" is not a list in "%s"' % (component, path, str(d)))
+ self.fail(f'path component "{component}" in "{path}" '
+ f'is not a list in "{d}"')
try:
d = d[idx]
except IndexError:
- self.fail('invalid index "%s" in path "%s" in "%s"' % (idx, path, str(d)))
+ self.fail(f'invalid index "{idx}" in path "{path}" '
+ f'in "{d}"')
return d
def assert_qmp_absent(self, d, path):
@@ -831,7 +857,7 @@ class QMPTestCase(unittest.TestCase):
else:
self.assertEqual(result, value,
'"%s" is "%s", expected "%s"'
- % (path, str(result), str(value)))
+ % (path, str(result), str(value)))
def assert_no_active_block_jobs(self):
result = self.vm.qmp('query-block-jobs')
@@ -841,24 +867,27 @@ class QMPTestCase(unittest.TestCase):
"""Issue a query-named-block-nodes and assert node_name and/or
file_name is present in the result"""
def check_equal_or_none(a, b):
- return a == None or b == None or a == b
+ return a is None or b is None or a == b
assert node_name or file_name
result = self.vm.qmp('query-named-block-nodes')
for x in result["return"]:
if check_equal_or_none(x.get("node-name"), node_name) and \
check_equal_or_none(x.get("file"), file_name):
return
- self.assertTrue(False, "Cannot find %s %s in result:\n%s" % \
- (node_name, file_name, result))
+ self.fail("Cannot find %s %s in result:\n%s" %
+ (node_name, file_name, result))
def assert_json_filename_equal(self, json_filename, reference):
'''Asserts that the given filename is a json: filename and that its
content is equal to the given reference object'''
self.assertEqual(json_filename[:5], 'json:')
- self.assertEqual(self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
- self.vm.flatten_qmp_object(reference))
+ self.assertEqual(
+ self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
+ self.vm.flatten_qmp_object(reference)
+ )
- def cancel_and_wait(self, drive='drive0', force=False, resume=False, wait=60.0):
+ def cancel_and_wait(self, drive='drive0', force=False,
+ resume=False, wait=60.0):
'''Cancel a block job and wait for it to finish, returning the event'''
result = self.vm.qmp('block-job-cancel', device=drive, force=force)
self.assert_qmp(result, 'return', {})
@@ -882,8 +911,8 @@ class QMPTestCase(unittest.TestCase):
self.assert_no_active_block_jobs()
return result
- def wait_until_completed(self, drive='drive0', check_offset=True, wait=60.0,
- error=None):
+ def wait_until_completed(self, drive='drive0', check_offset=True,
+ wait=60.0, error=None):
'''Wait for a block job to finish, returning the event'''
while True:
for event in self.vm.get_qmp_events(wait=wait):
@@ -898,13 +927,13 @@ class QMPTestCase(unittest.TestCase):
self.assert_qmp(event, 'data/error', error)
self.assert_no_active_block_jobs()
return event
- elif event['event'] == 'JOB_STATUS_CHANGE':
+ if event['event'] == 'JOB_STATUS_CHANGE':
self.assert_qmp(event, 'data/id', drive)
def wait_ready(self, drive='drive0'):
- '''Wait until a block job BLOCK_JOB_READY event'''
- f = {'data': {'type': 'mirror', 'device': drive } }
- event = self.vm.event_wait(name='BLOCK_JOB_READY', match=f)
+ """Wait until a BLOCK_JOB_READY event, and return the event."""
+ f = {'data': {'type': 'mirror', 'device': drive}}
+ return self.vm.event_wait(name='BLOCK_JOB_READY', match=f)
def wait_ready_and_cancel(self, drive='drive0'):
self.wait_ready(drive=drive)
@@ -933,7 +962,7 @@ class QMPTestCase(unittest.TestCase):
for job in result['return']:
if job['device'] == job_id:
found = True
- if job['paused'] == True and job['busy'] == False:
+ if job['paused'] and not job['busy']:
return job
break
assert found
@@ -957,7 +986,7 @@ def notrun(reason):
seq = os.path.basename(sys.argv[0])
open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
- print('%s not run: %s' % (seq, reason))
+ logger.warning("%s not run: %s", seq, reason)
sys.exit(0)
def case_notrun(reason):
@@ -972,7 +1001,8 @@ def case_notrun(reason):
open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
' [case not run] ' + reason + '\n')
-def verify_image_format(supported_fmts=[], unsupported_fmts=[]):
+def _verify_image_format(supported_fmts: Sequence[str] = (),
+ unsupported_fmts: Sequence[str] = ()) -> None:
assert not (supported_fmts and unsupported_fmts)
if 'generic' in supported_fmts and \
@@ -986,7 +1016,8 @@ def verify_image_format(supported_fmts=[], unsupported_fmts=[]):
if not_sup or (imgfmt in unsupported_fmts):
notrun('not suitable for this image format: %s' % imgfmt)
-def verify_protocol(supported=[], unsupported=[]):
+def _verify_protocol(supported: Sequence[str] = (),
+ unsupported: Sequence[str] = ()) -> None:
assert not (supported and unsupported)
if 'generic' in supported:
@@ -996,20 +1027,20 @@ def verify_protocol(supported=[], unsupported=[]):
if not_sup or (imgproto in unsupported):
notrun('not suitable for this protocol: %s' % imgproto)
-def verify_platform(supported=None, unsupported=None):
- if unsupported is not None:
- if any((sys.platform.startswith(x) for x in unsupported)):
- notrun('not suitable for this OS: %s' % sys.platform)
+def _verify_platform(supported: Sequence[str] = (),
+ unsupported: Sequence[str] = ()) -> None:
+ if any((sys.platform.startswith(x) for x in unsupported)):
+ notrun('not suitable for this OS: %s' % sys.platform)
- if supported is not None:
+ if supported:
if not any((sys.platform.startswith(x) for x in supported)):
notrun('not suitable for this OS: %s' % sys.platform)
-def verify_cache_mode(supported_cache_modes=[]):
+def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
if supported_cache_modes and (cachemode not in supported_cache_modes):
notrun('not suitable for this cache mode: %s' % cachemode)
-def verify_aio_mode(supported_aio_modes=[]):
+def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()):
if supported_aio_modes and (aiomode not in supported_aio_modes):
notrun('not suitable for this aio mode: %s' % aiomode)
@@ -1022,16 +1053,19 @@ def verify_quorum():
notrun('quorum support missing')
def qemu_pipe(*args):
- '''Run qemu with an option to print something and exit (e.g. a help option),
- and return its output'''
+ """
+ Run qemu with an option to print something and exit (e.g. a help option).
+
+ :return: QEMU's stdout output.
+ """
args = [qemu_prog] + qemu_opts + list(args)
subp = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True)
exitcode = subp.wait()
if exitcode < 0:
- sys.stderr.write('qemu received signal %i: %s\n' % (-exitcode,
- ' '.join(args)))
+ sys.stderr.write('qemu received signal %i: %s\n' %
+ (-exitcode, ' '.join(args)))
return subp.communicate()[0]
def supported_formats(read_only=False):
@@ -1049,7 +1083,7 @@ def supported_formats(read_only=False):
return supported_formats.formats[read_only]
-def skip_if_unsupported(required_formats=[], read_only=False):
+def skip_if_unsupported(required_formats=(), read_only=False):
'''Skip Test Decorator
Runs the test if all the required formats are whitelisted'''
def skip_test_decorator(func):
@@ -1061,8 +1095,9 @@ def skip_if_unsupported(required_formats=[], read_only=False):
usf_list = list(set(fmts) - set(supported_formats(read_only)))
if usf_list:
- test_case.case_skip('{}: formats {} are not whitelisted'.format(
- test_case, usf_list))
+ msg = f'{test_case}: formats {usf_list} are not whitelisted'
+ test_case.case_skip(msg)
+ return None
else:
return func(test_case, *args, **kwargs)
return func_wrapper
@@ -1074,11 +1109,23 @@ def skip_if_user_is_root(func):
def func_wrapper(*args, **kwargs):
if os.getuid() == 0:
case_notrun('{}: cannot be run as root'.format(args[0]))
+ return None
else:
return func(*args, **kwargs)
return func_wrapper
-def execute_unittest(output, verbosity, debug):
+def execute_unittest(debug=False):
+ """Executes unittests within the calling module."""
+
+ verbosity = 2 if debug else 1
+
+ if debug:
+ output = sys.stdout
+ else:
+ # We need to filter out the time taken from the output so that
+ # qemu-iotest can reliably diff the results against master output.
+ output = io.StringIO()
+
runner = unittest.TextTestRunner(stream=output, descriptions=True,
verbosity=verbosity)
try:
@@ -1086,6 +1133,8 @@ def execute_unittest(output, verbosity, debug):
# exception
unittest.main(testRunner=runner)
finally:
+ # We need to filter out the time taken from the output so that
+ # qemu-iotest can reliably diff the results against master output.
if not debug:
out = output.getvalue()
out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
@@ -1097,13 +1146,19 @@ def execute_unittest(output, verbosity, debug):
sys.stderr.write(out)
-def execute_test(test_function=None,
- supported_fmts=[],
- supported_platforms=None,
- supported_cache_modes=[], supported_aio_modes={},
- unsupported_fmts=[], supported_protocols=[],
- unsupported_protocols=[]):
- """Run either unittest or script-style tests."""
+def execute_setup_common(supported_fmts: Sequence[str] = (),
+ supported_platforms: Sequence[str] = (),
+ supported_cache_modes: Sequence[str] = (),
+ supported_aio_modes: Sequence[str] = (),
+ unsupported_fmts: Sequence[str] = (),
+ supported_protocols: Sequence[str] = (),
+ unsupported_protocols: Sequence[str] = ()) -> bool:
+ """
+ Perform necessary setup for either script-style or unittest-style tests.
+
+ :return: Bool; Whether or not debug mode has been requested via the CLI.
+ """
+ # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
# We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
# indicate that we're not being run via "check". There may be
@@ -1113,34 +1168,51 @@ def execute_test(test_function=None,
sys.stderr.write('Please run this test via the "check" script\n')
sys.exit(os.EX_USAGE)
- debug = '-d' in sys.argv
- verbosity = 1
- verify_image_format(supported_fmts, unsupported_fmts)
- verify_protocol(supported_protocols, unsupported_protocols)
- verify_platform(supported=supported_platforms)
- verify_cache_mode(supported_cache_modes)
- verify_aio_mode(supported_aio_modes)
+ _verify_image_format(supported_fmts, unsupported_fmts)
+ _verify_protocol(supported_protocols, unsupported_protocols)
+ _verify_platform(supported=supported_platforms)
+ _verify_cache_mode(supported_cache_modes)
+ _verify_aio_mode(supported_aio_modes)
+ debug = '-d' in sys.argv
if debug:
- output = sys.stdout
- verbosity = 2
sys.argv.remove('-d')
- else:
- # We need to filter out the time taken from the output so that
- # qemu-iotest can reliably diff the results against master output.
- output = io.StringIO()
-
logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
+ logger.debug("iotests debugging messages active")
+
+ return debug
+
+def execute_test(*args, test_function=None, **kwargs):
+ """Run either unittest or script-style tests."""
+ debug = execute_setup_common(*args, **kwargs)
if not test_function:
- execute_unittest(output, verbosity, debug)
+ execute_unittest(debug)
else:
test_function()
+def activate_logging():
+ """Activate iotests.log() output to stdout for script-style tests."""
+ handler = logging.StreamHandler(stream=sys.stdout)
+ formatter = logging.Formatter('%(message)s')
+ handler.setFormatter(formatter)
+ test_logger.addHandler(handler)
+ test_logger.setLevel(logging.INFO)
+ test_logger.propagate = False
+
+# This is called from script-style iotests without a single point of entry
+def script_initialize(*args, **kwargs):
+ """Initialize script-style tests without running any tests."""
+ activate_logging()
+ execute_setup_common(*args, **kwargs)
+
+# This is called from script-style iotests with a single point of entry
def script_main(test_function, *args, **kwargs):
"""Run script-style tests outside of the unittest framework"""
- execute_test(test_function, *args, **kwargs)
+ activate_logging()
+ execute_test(*args, test_function=test_function, **kwargs)
+# This is called from unittest style iotests
def main(*args, **kwargs):
"""Run tests using the unittest framework"""
- execute_test(None, *args, **kwargs)
+ execute_test(*args, **kwargs)