diff options
Diffstat (limited to 'tests/qemu-iotests/iotests.py')
-rw-r--r-- | tests/qemu-iotests/iotests.py | 83 |
1 files changed, 43 insertions, 40 deletions
diff --git a/tests/qemu-iotests/iotests.py b/tests/qemu-iotests/iotests.py index 5f8c263..6f6363f 100644 --- a/tests/qemu-iotests/iotests.py +++ b/tests/qemu-iotests/iotests.py @@ -16,11 +16,9 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. # -import errno import os import re import subprocess -import string import unittest import sys import struct @@ -35,7 +33,7 @@ import faulthandler sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) from qemu import qtest -assert sys.version_info >= (3,6) +assert sys.version_info >= (3, 6) faulthandler.enable() @@ -141,11 +139,11 @@ def qemu_img_log(*args): return result def img_info_log(filename, filter_path=None, imgopts=False, extra_args=[]): - args = [ 'info' ] + args = ['info'] if imgopts: args.append('--image-opts') else: - args += [ '-f', imgfmt ] + args += ['-f', imgfmt] args += extra_args args.append(filename) @@ -224,7 +222,7 @@ class QemuIoInteractive: # quit command is in close(), '\n' is added automatically assert '\n' not in cmd cmd = cmd.strip() - assert cmd != 'q' and cmd != 'quit' + assert cmd not in ('q', 'quit') self._p.stdin.write(cmd + '\n') self._p.stdin.flush() return self._read_output() @@ -246,10 +244,8 @@ def qemu_nbd_early_pipe(*args): sys.stderr.write('qemu-nbd received signal %i: %s\n' % (-exitcode, ' '.join(qemu_nbd_args + ['--fork'] + list(args)))) - if exitcode == 0: - return exitcode, '' - else: - return exitcode, subp.communicate()[0] + + return exitcode, subp.communicate()[0] if exitcode else '' def qemu_nbd_popen(*args): '''Run qemu-nbd in daemon mode and return the parent's exit code''' @@ -313,7 +309,7 @@ def filter_qmp(qmsg, filter_fn): items = qmsg.items() for k, v in items: - if isinstance(v, list) or isinstance(v, dict): + if isinstance(v, (dict, list)): qmsg[k] = filter_qmp(v, filter_fn) else: qmsg[k] = filter_fn(k, v) @@ -324,7 +320,7 @@ def filter_testfiles(msg): return msg.replace(prefix, 'TEST_DIR/PID-') def filter_qmp_testfiles(qmsg): - def _filter(key, value): + def _filter(_key, value): if is_str(value): return filter_testfiles(value) return value @@ -351,7 +347,7 @@ def filter_imgfmt(msg): return msg.replace(imgfmt, 'IMGFMT') def filter_qmp_imgfmt(qmsg): - def _filter(key, value): + def _filter(_key, value): if is_str(value): return filter_imgfmt(value) return value @@ -362,7 +358,7 @@ def log(msg, filters=[], indent=None): If indent is provided, JSON serializable messages are pretty-printed.''' for flt in filters: msg = flt(msg) - if isinstance(msg, dict) or isinstance(msg, list): + if isinstance(msg, (dict, list)): # Python < 3.4 needs to know not to add whitespace when pretty-printing: separators = (', ', ': ') if indent is None else (',', ': ') # Don't sort if it's already sorted @@ -373,14 +369,14 @@ def log(msg, filters=[], indent=None): print(msg) class Timeout: - def __init__(self, seconds, errmsg = "Timeout"): + def __init__(self, seconds, errmsg="Timeout"): self.seconds = seconds self.errmsg = errmsg def __enter__(self): signal.signal(signal.SIGALRM, self.timeout) signal.setitimer(signal.ITIMER_REAL, self.seconds) return self - def __exit__(self, type, value, traceback): + def __exit__(self, exc_type, value, traceback): signal.setitimer(signal.ITIMER_REAL, 0) return False def timeout(self, signum, frame): @@ -389,7 +385,7 @@ class Timeout: def file_pattern(name): return "{0}-{1}".format(os.getpid(), name) -class FilePaths(object): +class FilePaths: """ FilePaths is an auto-generated filename that cleans itself up. @@ -536,11 +532,11 @@ class VM(qtest.QEMUQtestMachine): self.pause_drive(drive, "write_aio") return self.qmp('human-monitor-command', - command_line='qemu-io %s "break %s bp_%s"' % (drive, event, drive)) + command_line='qemu-io %s "break %s bp_%s"' % (drive, event, drive)) def resume_drive(self, drive): self.qmp('human-monitor-command', - command_line='qemu-io %s "remove_break bp_%s"' % (drive, drive)) + command_line='qemu-io %s "remove_break bp_%s"' % (drive, drive)) def hmp_qemu_io(self, drive, cmd): '''Write to a given drive using an HMP command''' @@ -551,8 +547,8 @@ class VM(qtest.QEMUQtestMachine): if output is None: output = dict() if isinstance(obj, list): - for i in range(len(obj)): - self.flatten_qmp_object(obj[i], output, basestr + str(i) + '.') + for i, item in enumerate(obj): + self.flatten_qmp_object(item, output, basestr + str(i) + '.') elif isinstance(obj, dict): for key in obj: self.flatten_qmp_object(obj[key], output, basestr + key + '.') @@ -710,9 +706,7 @@ class VM(qtest.QEMUQtestMachine): for bitmap in bitmaps[node_name]: if bitmap.get('name', '') == bitmap_name: - if recording is None: - return bitmap - elif bitmap.get('recording') == recording: + if recording is None or bitmap.get('recording') == recording: return bitmap return None @@ -763,12 +757,13 @@ class VM(qtest.QEMUQtestMachine): assert node is not None, 'Cannot follow path %s%s' % (root, path) try: - node_id = next(edge['child'] for edge in graph['edges'] \ - if edge['parent'] == node['id'] and - edge['name'] == child_name) + node_id = next(edge['child'] for edge in graph['edges'] + if (edge['parent'] == node['id'] and + edge['name'] == child_name)) + + node = next(node for node in graph['nodes'] + if node['id'] == node_id) - node = next(node for node in graph['nodes'] \ - if node['id'] == node_id) except StopIteration: node = None @@ -786,6 +781,12 @@ index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') class QMPTestCase(unittest.TestCase): '''Abstract base class for QMP test cases''' + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + # Many users of this class set a VM property we rely on heavily + # in the methods below. + self.vm = None + def dictpath(self, d, path): '''Traverse a path in a nested dict''' for component in path.split('/'): @@ -831,7 +832,7 @@ class QMPTestCase(unittest.TestCase): else: self.assertEqual(result, value, '"%s" is "%s", expected "%s"' - % (path, str(result), str(value))) + % (path, str(result), str(value))) def assert_no_active_block_jobs(self): result = self.vm.qmp('query-block-jobs') @@ -841,15 +842,15 @@ class QMPTestCase(unittest.TestCase): """Issue a query-named-block-nodes and assert node_name and/or file_name is present in the result""" def check_equal_or_none(a, b): - return a == None or b == None or a == b + return a is None or b is None or a == b assert node_name or file_name result = self.vm.qmp('query-named-block-nodes') for x in result["return"]: if check_equal_or_none(x.get("node-name"), node_name) and \ check_equal_or_none(x.get("file"), file_name): return - self.assertTrue(False, "Cannot find %s %s in result:\n%s" % \ - (node_name, file_name, result)) + self.fail("Cannot find %s %s in result:\n%s" % + (node_name, file_name, result)) def assert_json_filename_equal(self, json_filename, reference): '''Asserts that the given filename is a json: filename and that its @@ -898,13 +899,13 @@ class QMPTestCase(unittest.TestCase): self.assert_qmp(event, 'data/error', error) self.assert_no_active_block_jobs() return event - elif event['event'] == 'JOB_STATUS_CHANGE': + if event['event'] == 'JOB_STATUS_CHANGE': self.assert_qmp(event, 'data/id', drive) def wait_ready(self, drive='drive0'): - '''Wait until a block job BLOCK_JOB_READY event''' - f = {'data': {'type': 'mirror', 'device': drive } } - event = self.vm.event_wait(name='BLOCK_JOB_READY', match=f) + """Wait until a BLOCK_JOB_READY event, and return the event.""" + f = {'data': {'type': 'mirror', 'device': drive}} + return self.vm.event_wait(name='BLOCK_JOB_READY', match=f) def wait_ready_and_cancel(self, drive='drive0'): self.wait_ready(drive=drive) @@ -933,7 +934,7 @@ class QMPTestCase(unittest.TestCase): for job in result['return']: if job['device'] == job_id: found = True - if job['paused'] == True and job['busy'] == False: + if job['paused'] and not job['busy']: return job break assert found @@ -1030,8 +1031,8 @@ def qemu_pipe(*args): universal_newlines=True) exitcode = subp.wait() if exitcode < 0: - sys.stderr.write('qemu received signal %i: %s\n' % (-exitcode, - ' '.join(args))) + sys.stderr.write('qemu received signal %i: %s\n' % + (-exitcode, ' '.join(args))) return subp.communicate()[0] def supported_formats(read_only=False): @@ -1063,6 +1064,7 @@ def skip_if_unsupported(required_formats=[], read_only=False): if usf_list: test_case.case_skip('{}: formats {} are not whitelisted'.format( test_case, usf_list)) + return None else: return func(test_case, *args, **kwargs) return func_wrapper @@ -1074,6 +1076,7 @@ def skip_if_user_is_root(func): def func_wrapper(*args, **kwargs): if os.getuid() == 0: case_notrun('{}: cannot be run as root'.format(args[0])) + return None else: return func(*args, **kwargs) return func_wrapper |