#!/usr/bin/env python3 # # pylint: disable=C0103,E0213,E1135,E1136,E1137,R0902,R0903,R0912,R0913,R0917 # SPDX-License-Identifier: GPL-2.0-or-later # # Copyright (C) 2024-2025 Mauro Carvalho Chehab """ Helper classes to be used by ghes_inject command classes. """ import json import sys from datetime import datetime from os import path as os_path try: qemu_dir = os_path.abspath(os_path.dirname(os_path.dirname(__file__))) sys.path.append(os_path.join(qemu_dir, 'python')) from qemu.qmp.legacy import QEMUMonitorProtocol except ModuleNotFoundError as exc: print(f"Module '{exc.name}' not found.") print("Try export PYTHONPATH=top-qemu-dir/python or run from top-qemu-dir") sys.exit(1) from base64 import b64encode class util: """ Ancillary functions to deal with bitmaps, parse arguments, generate GUID and encode data on a bytearray buffer. """ # # Helper routines to handle multiple choice arguments # def get_choice(name, value, choices, suffixes=None, bitmask=True): """Produce a list from multiple choice argument""" new_values = 0 if not value: return new_values for val in value.split(","): val = val.lower() if suffixes: for suffix in suffixes: val = val.removesuffix(suffix) if val not in choices.keys(): if suffixes: for suffix in suffixes: if val + suffix in choices.keys(): val += suffix break if val not in choices.keys(): sys.exit(f"Error on '{name}': choice '{val}' is invalid.") val = choices[val] if bitmask: new_values |= val else: if new_values: sys.exit(f"Error on '{name}': only one value is accepted.") new_values = val return new_values def get_array(name, values, max_val=None): """Add numbered hashes from integer lists into an array""" array = [] for value in values: for val in value.split(","): try: val = int(val, 0) except ValueError: sys.exit(f"Error on '{name}': {val} is not an integer") if val < 0: sys.exit(f"Error on '{name}': {val} is not unsigned") if max_val and val > max_val: sys.exit(f"Error on '{name}': {val} is too little") array.append(val) return array def get_mult_array(mult, name, values, allow_zero=False, max_val=None): """Add numbered hashes from integer lists""" if not allow_zero: if not values: return else: if values is None: return if not values: i = 0 if i not in mult: mult[i] = {} mult[i][name] = [] return i = 0 for value in values: for val in value.split(","): try: val = int(val, 0) except ValueError: sys.exit(f"Error on '{name}': {val} is not an integer") if val < 0: sys.exit(f"Error on '{name}': {val} is not unsigned") if max_val and val > max_val: sys.exit(f"Error on '{name}': {val} is too little") if i not in mult: mult[i] = {} if name not in mult[i]: mult[i][name] = [] mult[i][name].append(val) i += 1 def get_mult_choices(mult, name, values, choices, suffixes=None, allow_zero=False): """Add numbered hashes from multiple choice arguments""" if not allow_zero: if not values: return else: if values is None: return i = 0 for val in values: new_values = util.get_choice(name, val, choices, suffixes) if i not in mult: mult[i] = {} mult[i][name] = new_values i += 1 def get_mult_int(mult, name, values, allow_zero=False): """Add numbered hashes from integer arguments""" if not allow_zero: if not values: return else: if values is None: return i = 0 for val in values: try: val = int(val, 0) except ValueError: sys.exit(f"Error on '{name}': {val} is not an integer") if val < 0: sys.exit(f"Error on '{name}': {val} is not unsigned") if i not in mult: mult[i] = {} mult[i][name] = val i += 1 # # Data encode helper functions # def bit(b): """Simple macro to define a bit on a bitmask""" return 1 << b def data_add(data, value, num_bytes): """Adds bytes from value inside a bitarray""" data.extend(value.to_bytes(num_bytes, byteorder="little")) # pylint: disable=E1101 def dump_bytearray(name, data): """Does an hexdump of a byte array, grouping in bytes""" print(f"{name} ({len(data)} bytes):") for ln_start in range(0, len(data), 16): ln_end = min(ln_start + 16, len(data)) print(f" {ln_start:08x} ", end="") for i in range(ln_start, ln_end): print(f"{data[i]:02x} ", end="") for i in range(ln_end, ln_start + 16): print(" ", end="") print(" ", end="") for i in range(ln_start, ln_end): if data[i] >= 32 and data[i] < 127: print(chr(data[i]), end="") else: print(".", end="") print() print() def time(string): """Handle BCD timestamps used on Generic Error Data Block""" time = None # Formats to be used when parsing time stamps formats = [ "%Y-%m-%d %H:%M:%S", ] if string == "now": time = datetime.now() if time is None: for fmt in formats: try: time = datetime.strptime(string, fmt) break except ValueError: pass if time is None: raise ValueError("Invalid time format") return time class guid: """ Simple class to handle GUID fields. """ def __init__(self, time_low, time_mid, time_high, nodes): """Initialize a GUID value""" assert len(nodes) == 8 self.time_low = time_low self.time_mid = time_mid self.time_high = time_high self.nodes = nodes @classmethod def UUID(cls, guid_str): """Initialize a GUID using a string on its standard format""" if len(guid_str) != 36: print("Size not 36") raise ValueError('Invalid GUID size') # It is easier to parse without separators. So, drop them guid_str = guid_str.replace('-', '') if len(guid_str) != 32: print("Size not 32", guid_str, len(guid_str)) raise ValueError('Invalid GUID hex size') time_low = 0 time_mid = 0 time_high = 0 nodes = [] for i in reversed(range(16, 32, 2)): h = guid_str[i:i + 2] value = int(h, 16) nodes.insert(0, value) time_high = int(guid_str[12:16], 16) time_mid = int(guid_str[8:12], 16) time_low = int(guid_str[0:8], 16) return cls(time_low, time_mid, time_high, nodes) def __str__(self): """Output a GUID value on its default string representation""" clock = self.nodes[0] << 8 | self.nodes[1] node = 0 for i in range(2, len(self.nodes)): node = node << 8 | self.nodes[i] s = f"{self.time_low:08x}-{self.time_mid:04x}-" s += f"{self.time_high:04x}-{clock:04x}-{node:012x}" return s def to_bytes(self): """Output a GUID value in bytes""" data = bytearray() util.data_add(data, self.time_low, 4) util.data_add(data, self.time_mid, 2) util.data_add(data, self.time_high, 2) data.extend(bytearray(self.nodes)) return data class qmp: """ Opens a connection and send/receive QMP commands. """ def send_cmd(self, command, args=None, may_open=False, return_error=True): """Send a command to QMP, optinally opening a connection""" if may_open: self._connect() elif not self.connected: return False msg = { 'execute': command } if args: msg['arguments'] = args try: obj = self.qmp_monitor.cmd_obj(msg) # Can we use some other exception class here? except Exception as e: # pylint: disable=W0718 print(f"Command: {command}") print(f"Failed to inject error: {e}.") return None if "return" in obj: if isinstance(obj.get("return"), dict): if obj["return"]: return obj["return"] return "OK" return obj["return"] if isinstance(obj.get("error"), dict): error = obj["error"] if return_error: print(f"Command: {msg}") print(f'{error["class"]}: {error["desc"]}') else: print(json.dumps(obj)) return None def _close(self): """Shutdown and close the socket, if opened""" if not self.connected: return self.qmp_monitor.close() self.connected = False def _connect(self): """Connect to a QMP TCP/IP port, if not connected yet""" if self.connected: return True try: self.qmp_monitor.connect(negotiate=True) except ConnectionError: sys.exit(f"Can't connect to QMP host {self.host}:{self.port}") self.connected = True return True BLOCK_STATUS_BITS = { "uncorrectable": util.bit(0), "correctable": util.bit(1), "multi-uncorrectable": util.bit(2), "multi-correctable": util.bit(3), } ERROR_SEVERITY = { "recoverable": 0, "fatal": 1, "corrected": 2, "none": 3, } VALIDATION_BITS = { "fru-id": util.bit(0), "fru-text": util.bit(1), "timestamp": util.bit(2), } GEDB_FLAGS_BITS = { "recovered": util.bit(0), "prev-error": util.bit(1), "simulated": util.bit(2), } GENERIC_DATA_SIZE = 72 def argparse(parser): """Prepare a parser group to query generic error data""" block_status_bits = ",".join(qmp.BLOCK_STATUS_BITS.keys()) error_severity_enum = ",".join(qmp.ERROR_SEVERITY.keys()) validation_bits = ",".join(qmp.VALIDATION_BITS.keys()) gedb_flags_bits = ",".join(qmp.GEDB_FLAGS_BITS.keys()) g_gen = parser.add_argument_group("Generic Error Data") # pylint: disable=E1101 g_gen.add_argument("--block-status", help=f"block status bits: {block_status_bits}") g_gen.add_argument("--raw-data", nargs="+", help="Raw data inside the Error Status Block") g_gen.add_argument("--error-severity", "--severity", help=f"error severity: {error_severity_enum}") g_gen.add_argument("--gen-err-valid-bits", "--generic-error-validation-bits", help=f"validation bits: {validation_bits}") g_gen.add_argument("--fru-id", type=guid.UUID, help="GUID representing a physical device") g_gen.add_argument("--fru-text", help="ASCII string identifying the FRU hardware") g_gen.add_argument("--timestamp", type=util.time, help="Time when the error info was collected") g_gen.add_argument("--precise", "--precise-timestamp", action='store_true', help="Marks the timestamp as precise if --timestamp is used") g_gen.add_argument("--gedb-flags", help=f"General Error Data Block flags: {gedb_flags_bits}") def set_args(self, args): """Set the arguments optionally defined via self.argparse()""" if args.block_status: self.block_status = util.get_choice(name="block-status", value=args.block_status, choices=self.BLOCK_STATUS_BITS, bitmask=False) if args.raw_data: self.raw_data = util.get_array("raw-data", args.raw_data, max_val=255) print(self.raw_data) if args.error_severity: self.error_severity = util.get_choice(name="error-severity", value=args.error_severity, choices=self.ERROR_SEVERITY, bitmask=False) if args.fru_id: self.fru_id = args.fru_id.to_bytes() if not args.gen_err_valid_bits: self.validation_bits |= self.VALIDATION_BITS["fru-id"] if args.fru_text: text = bytearray(args.fru_text.encode('ascii')) if len(text) > 20: sys.exit("FRU text is too big to fit") self.fru_text = text if not args.gen_err_valid_bits: self.validation_bits |= self.VALIDATION_BITS["fru-text"] if args.timestamp: time = args.timestamp century = int(time.year / 100) bcd = bytearray() util.data_add(bcd, (time.second // 10) << 4 | (time.second % 10), 1) util.data_add(bcd, (time.minute // 10) << 4 | (time.minute % 10), 1) util.data_add(bcd, (time.hour // 10) << 4 | (time.hour % 10), 1) if args.precise: util.data_add(bcd, 1, 1) else: util.data_add(bcd, 0, 1) util.data_add(bcd, (time.day // 10) << 4 | (time.day % 10), 1) util.data_add(bcd, (time.month // 10) << 4 | (time.month % 10), 1) util.data_add(bcd, ((time.year % 100) // 10) << 4 | (time.year % 10), 1) util.data_add(bcd, ((century % 100) // 10) << 4 | (century % 10), 1) self.timestamp = bcd if not args.gen_err_valid_bits: self.validation_bits |= self.VALIDATION_BITS["timestamp"] if args.gen_err_valid_bits: self.validation_bits = util.get_choice(name="validation", value=args.gen_err_valid_bits, choices=self.VALIDATION_BITS) def __init__(self, host, port, debug=False): """Initialize variables used by the QMP send logic""" self.connected = False self.host = host self.port = port self.debug = debug # ACPI 6.1: 18.3.2.7.1 Generic Error Data: Generic Error Status Block self.block_status = self.BLOCK_STATUS_BITS["uncorrectable"] self.raw_data = [] self.error_severity = self.ERROR_SEVERITY["recoverable"] # ACPI 6.1: 18.3.2.7.1 Generic Error Data: Generic Error Data Entry self.validation_bits = 0 self.flags = 0 self.fru_id = bytearray(16) self.fru_text = bytearray(20) self.timestamp = bytearray(8) self.qmp_monitor = QEMUMonitorProtocol(address=(self.host, self.port)) # # Socket QMP send command # def send_cper_raw(self, cper_data): """Send a raw CPER data to QEMU though QMP TCP socket""" data = b64encode(bytes(cper_data)).decode('ascii') cmd_arg = { 'cper': data } self._connect() if self.send_cmd("inject-ghes-v2-error", cmd_arg): print("Error injected.") def send_cper(self, notif_type, payload): """Send commands to QEMU though QMP TCP socket""" # Fill CPER record header # NOTE: bits 4 to 13 of block status contain the number of # data entries in the data section. This is currently unsupported. cper_length = len(payload) data_length = cper_length + len(self.raw_data) + self.GENERIC_DATA_SIZE # Generic Error Data Entry gede = bytearray() gede.extend(notif_type.to_bytes()) util.data_add(gede, self.error_severity, 4) util.data_add(gede, 0x300, 2) util.data_add(gede, self.validation_bits, 1) util.data_add(gede, self.flags, 1) util.data_add(gede, cper_length, 4) gede.extend(self.fru_id) gede.extend(self.fru_text) gede.extend(self.timestamp) # Generic Error Status Block gebs = bytearray() if self.raw_data: raw_data_offset = len(gebs) else: raw_data_offset = 0 util.data_add(gebs, self.block_status, 4) util.data_add(gebs, raw_data_offset, 4) util.data_add(gebs, len(self.raw_data), 4) util.data_add(gebs, data_length, 4) util.data_add(gebs, self.error_severity, 4) cper_data = bytearray() cper_data.extend(gebs) cper_data.extend(gede) cper_data.extend(bytearray(self.raw_data)) cper_data.extend(bytearray(payload)) if self.debug: print(f"GUID: {notif_type}") util.dump_bytearray("Generic Error Status Block", gebs) util.dump_bytearray("Generic Error Data Entry", gede) if self.raw_data: util.dump_bytearray("Raw data", bytearray(self.raw_data)) util.dump_bytearray("Payload", payload) self.send_cper_raw(cper_data) def search_qom(self, path, prop, regex): """ Return a list of devices that match path array like: /machine/unattached/device /machine/peripheral-anon/device ... """ found = [] i = 0 while 1: dev = f"{path}[{i}]" args = { 'path': dev, 'property': prop } ret = self.send_cmd("qom-get", args, may_open=True, return_error=False) if not ret: break if isinstance(ret, str): if regex.search(ret): found.append(dev) i += 1 if i > 10000: print("Too many objects returned by qom-get!") break return found class cper_guid: """ Contains CPER GUID, as per: https://uefi.org/specs/UEFI/2.10/Apx_N_Common_Platform_Error_Record.html """ CPER_PROC_GENERIC = guid(0x9876CCAD, 0x47B4, 0x4bdb, [0xB6, 0x5E, 0x16, 0xF1, 0x93, 0xC4, 0xF3, 0xDB]) CPER_PROC_X86 = guid(0xDC3EA0B0, 0xA144, 0x4797, [0xB9, 0x5B, 0x53, 0xFA, 0x24, 0x2B, 0x6E, 0x1D]) CPER_PROC_ITANIUM = guid(0xe429faf1, 0x3cb7, 0x11d4, [0xbc, 0xa7, 0x00, 0x80, 0xc7, 0x3c, 0x88, 0x81]) CPER_PROC_ARM = guid(0xE19E3D16, 0xBC11, 0x11E4, [0x9C, 0xAA, 0xC2, 0x05, 0x1D, 0x5D, 0x46, 0xB0]) CPER_PLATFORM_MEM = guid(0xA5BC1114, 0x6F64, 0x4EDE, [0xB8, 0x63, 0x3E, 0x83, 0xED, 0x7C, 0x83, 0xB1]) CPER_PLATFORM_MEM2 = guid(0x61EC04FC, 0x48E6, 0xD813, [0x25, 0xC9, 0x8D, 0xAA, 0x44, 0x75, 0x0B, 0x12]) CPER_PCIE = guid(0xD995E954, 0xBBC1, 0x430F, [0xAD, 0x91, 0xB4, 0x4D, 0xCB, 0x3C, 0x6F, 0x35]) CPER_PCI_BUS = guid(0xC5753963, 0x3B84, 0x4095, [0xBF, 0x78, 0xED, 0xDA, 0xD3, 0xF9, 0xC9, 0xDD]) CPER_PCI_DEV = guid(0xEB5E4685, 0xCA66, 0x4769, [0xB6, 0xA2, 0x26, 0x06, 0x8B, 0x00, 0x13, 0x26]) CPER_FW_ERROR = guid(0x81212A96, 0x09ED, 0x4996, [0x94, 0x71, 0x8D, 0x72, 0x9C, 0x8E, 0x69, 0xED]) CPER_DMA_GENERIC = guid(0x5B51FEF7, 0xC79D, 0x4434, [0x8F, 0x1B, 0xAA, 0x62, 0xDE, 0x3E, 0x2C, 0x64]) CPER_DMA_VT = guid(0x71761D37, 0x32B2, 0x45cd, [0xA7, 0xD0, 0xB0, 0xFE, 0xDD, 0x93, 0xE8, 0xCF]) CPER_DMA_IOMMU = guid(0x036F84E1, 0x7F37, 0x428c, [0xA7, 0x9E, 0x57, 0x5F, 0xDF, 0xAA, 0x84, 0xEC]) CPER_CCIX_PER = guid(0x91335EF6, 0xEBFB, 0x4478, [0xA6, 0xA6, 0x88, 0xB7, 0x28, 0xCF, 0x75, 0xD7]) CPER_CXL_PROT_ERR = guid(0x80B9EFB4, 0x52B5, 0x4DE3, [0xA7, 0x77, 0x68, 0x78, 0x4B, 0x77, 0x10, 0x48])