diff options
Diffstat (limited to 'lldb/packages/Python/lldbsuite/test/lldbutil.py')
-rw-r--r-- | lldb/packages/Python/lldbsuite/test/lldbutil.py | 834 |
1 files changed, 473 insertions, 361 deletions
diff --git a/lldb/packages/Python/lldbsuite/test/lldbutil.py b/lldb/packages/Python/lldbsuite/test/lldbutil.py index 309b51b..3491370 100644 --- a/lldb/packages/Python/lldbsuite/test/lldbutil.py +++ b/lldb/packages/Python/lldbsuite/test/lldbutil.py @@ -28,6 +28,7 @@ SIMULATOR_RETRY = 3 # Utilities for locating/checking executable programs # =================================================== + def is_exe(fpath): """Returns True if fpath is an executable.""" return os.path.isfile(fpath) and os.access(fpath, os.X_OK) @@ -46,6 +47,7 @@ def which(program): return exe_file return None + def mkdir_p(path): try: os.makedirs(path) @@ -53,13 +55,14 @@ def mkdir_p(path): if e.errno != errno.EEXIST: raise if not os.path.isdir(path): - raise OSError(errno.ENOTDIR, "%s is not a directory"%path) + raise OSError(errno.ENOTDIR, "%s is not a directory" % path) # ============================ # Dealing with SDK and triples # ============================ + def get_xcode_sdk(os, env): # Respect --apple-sdk <path> if it's specified. If the SDK is simply # mounted from some disk image, and not actually installed, this is the @@ -84,18 +87,27 @@ def get_xcode_sdk(os, env): def get_xcode_sdk_version(sdk): - return subprocess.check_output( - ['xcrun', '--sdk', sdk, '--show-sdk-version']).rstrip().decode('utf-8') + return ( + subprocess.check_output(["xcrun", "--sdk", sdk, "--show-sdk-version"]) + .rstrip() + .decode("utf-8") + ) def get_xcode_sdk_root(sdk): - return subprocess.check_output(['xcrun', '--sdk', sdk, '--show-sdk-path' - ]).rstrip().decode('utf-8') + return ( + subprocess.check_output(["xcrun", "--sdk", sdk, "--show-sdk-path"]) + .rstrip() + .decode("utf-8") + ) def get_xcode_clang(sdk): - return subprocess.check_output(['xcrun', '-sdk', sdk, '-f', 'clang' - ]).rstrip().decode("utf-8") + return ( + subprocess.check_output(["xcrun", "-sdk", sdk, "-f", "clang"]) + .rstrip() + .decode("utf-8") + ) # =================================================== @@ -114,6 +126,7 @@ def disassemble(target, function_or_symbol): print(i, file=buf) return buf.getvalue() + # ========================================================== # Integer (byte size 1, 2, 4, and 8) to bytearray conversion # ========================================================== @@ -133,11 +146,11 @@ def int_to_bytearray(val, bytesize): # Little endian followed by a format character. template = "<%c" if bytesize == 2: - fmt = template % 'h' + fmt = template % "h" elif bytesize == 4: - fmt = template % 'i' + fmt = template % "i" elif bytesize == 4: - fmt = template % 'q' + fmt = template % "q" else: return None @@ -159,11 +172,11 @@ def bytearray_to_int(bytes, bytesize): # Little endian followed by a format character. template = "<%c" if bytesize == 2: - fmt = template % 'h' + fmt = template % "h" elif bytesize == 4: - fmt = template % 'i' + fmt = template % "i" elif bytesize == 4: - fmt = template % 'q' + fmt = template % "q" else: return None @@ -184,7 +197,7 @@ def get_description(obj, option=None): o lldb.eDescriptionLevelFull o lldb.eDescriptionLevelVerbose """ - method = getattr(obj, 'GetDescription') + method = getattr(obj, "GetDescription") if not method: return None tuple = (lldb.SBTarget, lldb.SBBreakpointLocation, lldb.SBWatchpoint) @@ -206,6 +219,7 @@ def get_description(obj, option=None): # Convert some enum value to its string counterpart # ================================================= + def _enum_names(prefix: str) -> Dict[int, str]: """Generate a mapping of enum value to name, for the enum prefix.""" suffix_start = len(prefix) @@ -218,6 +232,7 @@ def _enum_names(prefix: str) -> Dict[int, str]: _STATE_NAMES = _enum_names(prefix="eState") + def state_type_to_str(enum: int) -> str: """Returns the stateType string given an enum.""" name = _STATE_NAMES.get(enum) @@ -228,6 +243,7 @@ def state_type_to_str(enum: int) -> str: _STOP_REASON_NAMES = _enum_names(prefix="eStopReason") + def stop_reason_to_str(enum: int) -> str: """Returns the stopReason string given an enum.""" name = _STOP_REASON_NAMES.get(enum) @@ -238,6 +254,7 @@ def stop_reason_to_str(enum: int) -> str: _SYMBOL_TYPE_NAMES = _enum_names(prefix="eSymbolType") + def symbol_type_to_str(enum: int) -> str: """Returns the symbolType string given an enum.""" name = _SYMBOL_TYPE_NAMES.get(enum) @@ -248,6 +265,7 @@ def symbol_type_to_str(enum: int) -> str: _VALUE_TYPE_NAMES = _enum_names(prefix="eValueType") + def value_type_to_str(enum: int) -> str: """Returns the valueType string given an enum.""" name = _VALUE_TYPE_NAMES.get(enum) @@ -260,46 +278,53 @@ def value_type_to_str(enum: int) -> str: # Get stopped threads due to each stop reason. # ================================================== -def sort_stopped_threads(process, - breakpoint_threads=None, - crashed_threads=None, - watchpoint_threads=None, - signal_threads=None, - exiting_threads=None, - other_threads=None): - """ Fills array *_threads with threads stopped for the corresponding stop - reason. + +def sort_stopped_threads( + process, + breakpoint_threads=None, + crashed_threads=None, + watchpoint_threads=None, + signal_threads=None, + exiting_threads=None, + other_threads=None, +): + """Fills array *_threads with threads stopped for the corresponding stop + reason. """ - for lst in [breakpoint_threads, - watchpoint_threads, - signal_threads, - exiting_threads, - other_threads]: + for lst in [ + breakpoint_threads, + watchpoint_threads, + signal_threads, + exiting_threads, + other_threads, + ]: if lst is not None: lst[:] = [] for thread in process: dispatched = False - for (reason, list) in [(lldb.eStopReasonBreakpoint, breakpoint_threads), - (lldb.eStopReasonException, crashed_threads), - (lldb.eStopReasonWatchpoint, watchpoint_threads), - (lldb.eStopReasonSignal, signal_threads), - (lldb.eStopReasonThreadExiting, exiting_threads), - (None, other_threads)]: + for reason, list in [ + (lldb.eStopReasonBreakpoint, breakpoint_threads), + (lldb.eStopReasonException, crashed_threads), + (lldb.eStopReasonWatchpoint, watchpoint_threads), + (lldb.eStopReasonSignal, signal_threads), + (lldb.eStopReasonThreadExiting, exiting_threads), + (None, other_threads), + ]: if not dispatched and list is not None: if thread.GetStopReason() == reason or reason is None: list.append(thread) dispatched = True + # ================================================== # Utility functions for setting breakpoints # ================================================== + def run_break_set_by_script( - test, - class_name, - extra_options=None, - num_expected_locations=1): + test, class_name, extra_options=None, num_expected_locations=1 +): """Set a scripted breakpoint. Check that it got the right number of locations.""" test.assertTrue(class_name is not None, "Must pass in a class name.") command = "breakpoint set -P " + class_name @@ -310,14 +335,16 @@ def run_break_set_by_script( check_breakpoint_result(test, break_results, num_locations=num_expected_locations) return get_bpno_from_match(break_results) + def run_break_set_by_file_and_line( - test, - file_name, - line_number, - extra_options=None, - num_expected_locations=1, - loc_exact=False, - module_name=None): + test, + file_name, + line_number, + extra_options=None, + num_expected_locations=1, + loc_exact=False, + module_name=None, +): """Set a breakpoint by file and line, returning the breakpoint number. If extra_options is not None, then we append it to the breakpoint set command. @@ -325,10 +352,11 @@ def run_break_set_by_file_and_line( If num_expected_locations is -1, we check that we got AT LEAST one location. If num_expected_locations is -2, we don't check the actual number at all. Otherwise, we check that num_expected_locations equals the number of locations. - If loc_exact is true, we check that there is one location, and that location must be at the input file and line number.""" + If loc_exact is true, we check that there is one location, and that location must be at the input file and line number. + """ if file_name is None: - command = 'breakpoint set -l %d' % (line_number) + command = "breakpoint set -l %d" % (line_number) else: command = 'breakpoint set -f "%s" -l %d' % (file_name, line_number) @@ -347,26 +375,28 @@ def run_break_set_by_file_and_line( num_locations=num_expected_locations, file_name=file_name, line_number=line_number, - module_name=module_name) + module_name=module_name, + ) else: check_breakpoint_result( - test, - break_results, - num_locations=num_expected_locations) + test, break_results, num_locations=num_expected_locations + ) return get_bpno_from_match(break_results) def run_break_set_by_symbol( - test, - symbol, - extra_options=None, - num_expected_locations=-1, - sym_exact=False, - module_name=None): + test, + symbol, + extra_options=None, + num_expected_locations=-1, + sym_exact=False, + module_name=None, +): """Set a breakpoint by symbol name. Common options are the same as run_break_set_by_file_and_line. - If sym_exact is true, then the output symbol must match the input exactly, otherwise we do a substring match.""" + If sym_exact is true, then the output symbol must match the input exactly, otherwise we do a substring match. + """ command = 'breakpoint set -n "%s"' % (symbol) if module_name: @@ -383,22 +413,19 @@ def run_break_set_by_symbol( break_results, num_locations=num_expected_locations, symbol_name=symbol, - module_name=module_name) + module_name=module_name, + ) else: check_breakpoint_result( - test, - break_results, - num_locations=num_expected_locations) + test, break_results, num_locations=num_expected_locations + ) return get_bpno_from_match(break_results) def run_break_set_by_selector( - test, - selector, - extra_options=None, - num_expected_locations=-1, - module_name=None): + test, selector, extra_options=None, num_expected_locations=-1, module_name=None +): """Set a breakpoint by selector. Common options are the same as run_break_set_by_file_and_line.""" command = 'breakpoint set -S "%s"' % (selector) @@ -418,21 +445,19 @@ def run_break_set_by_selector( num_locations=num_expected_locations, symbol_name=selector, symbol_match_exact=False, - module_name=module_name) + module_name=module_name, + ) else: check_breakpoint_result( - test, - break_results, - num_locations=num_expected_locations) + test, break_results, num_locations=num_expected_locations + ) return get_bpno_from_match(break_results) def run_break_set_by_regexp( - test, - regexp, - extra_options=None, - num_expected_locations=-1): + test, regexp, extra_options=None, num_expected_locations=-1 +): """Set a breakpoint by regular expression match on symbol name. Common options are the same as run_break_set_by_file_and_line.""" command = 'breakpoint set -r "%s"' % (regexp) @@ -441,19 +466,14 @@ def run_break_set_by_regexp( break_results = run_break_set_command(test, command) - check_breakpoint_result( - test, - break_results, - num_locations=num_expected_locations) + check_breakpoint_result(test, break_results, num_locations=num_expected_locations) return get_bpno_from_match(break_results) def run_break_set_by_source_regexp( - test, - regexp, - extra_options=None, - num_expected_locations=-1): + test, regexp, extra_options=None, num_expected_locations=-1 +): """Set a breakpoint by source regular expression. Common options are the same as run_break_set_by_file_and_line.""" command = 'breakpoint set -p "%s"' % (regexp) if extra_options: @@ -461,22 +481,21 @@ def run_break_set_by_source_regexp( break_results = run_break_set_command(test, command) - check_breakpoint_result( - test, - break_results, - num_locations=num_expected_locations) + check_breakpoint_result(test, break_results, num_locations=num_expected_locations) return get_bpno_from_match(break_results) + def run_break_set_by_file_colon_line( - test, - specifier, - path, - line_number, - column_number = 0, - extra_options=None, - num_expected_locations=-1): - command = 'breakpoint set -y "%s"'%(specifier) + test, + specifier, + path, + line_number, + column_number=0, + extra_options=None, + num_expected_locations=-1, +): + command = 'breakpoint set -y "%s"' % (specifier) if extra_options: command += " " + extra_options @@ -485,13 +504,15 @@ def run_break_set_by_file_colon_line( check_breakpoint_result( test, break_results, - num_locations = num_expected_locations, - file_name = path, - line_number = line_number, - column_number = column_number) + num_locations=num_expected_locations, + file_name=path, + line_number=line_number, + column_number=column_number, + ) return get_bpno_from_match(break_results) + def run_break_set_command(test, command): """Run the command passed in - it must be some break set variant - and analyze the result. Returns a dictionary of information gleaned from the command-line results. @@ -515,146 +536,147 @@ def run_break_set_command(test, command): r"^Breakpoint (?P<bpno>[0-9]+): (?P<num_locations>[0-9]+) locations\.$", r"^Breakpoint (?P<bpno>[0-9]+): (?P<num_locations>no) locations \(pending\)\.", r"^Breakpoint (?P<bpno>[0-9]+): where = (?P<module>.*)`(?P<symbol>[+\-]{0,1}[^+]+)( \+ (?P<offset>[0-9]+)){0,1}( \[inlined\] (?P<inline_symbol>.*)){0,1} at (?P<file>[^:]+):(?P<line_no>[0-9]+)(?P<column>(:[0-9]+)?), address = (?P<address>0x[0-9a-fA-F]+)$", - r"^Breakpoint (?P<bpno>[0-9]+): where = (?P<module>.*)`(?P<symbol>.*)( \+ (?P<offset>[0-9]+)){0,1}, address = (?P<address>0x[0-9a-fA-F]+)$"] + r"^Breakpoint (?P<bpno>[0-9]+): where = (?P<module>.*)`(?P<symbol>.*)( \+ (?P<offset>[0-9]+)){0,1}, address = (?P<address>0x[0-9a-fA-F]+)$", + ] match_object = test.match(command, patterns) break_results = match_object.groupdict() # We always insert the breakpoint number, setting it to -1 if we couldn't find it # Also, make sure it gets stored as an integer. - if not 'bpno' in break_results: - break_results['bpno'] = -1 + if not "bpno" in break_results: + break_results["bpno"] = -1 else: - break_results['bpno'] = int(break_results['bpno']) + break_results["bpno"] = int(break_results["bpno"]) # We always insert the number of locations # If ONE location is set for the breakpoint, then the output doesn't mention locations, but it has to be 1... # We also make sure it is an integer. - if not 'num_locations' in break_results: + if not "num_locations" in break_results: num_locations = 1 else: - num_locations = break_results['num_locations'] - if num_locations == 'no': + num_locations = break_results["num_locations"] + if num_locations == "no": num_locations = 0 else: - num_locations = int(break_results['num_locations']) + num_locations = int(break_results["num_locations"]) - break_results['num_locations'] = num_locations + break_results["num_locations"] = num_locations - if 'line_no' in break_results: - break_results['line_no'] = int(break_results['line_no']) + if "line_no" in break_results: + break_results["line_no"] = int(break_results["line_no"]) return break_results def get_bpno_from_match(break_results): - return int(break_results['bpno']) + return int(break_results["bpno"]) def check_breakpoint_result( - test, - break_results, - file_name=None, - line_number=-1, - column_number=0, - symbol_name=None, - symbol_match_exact=True, - module_name=None, - offset=-1, - num_locations=-1): - - out_num_locations = break_results['num_locations'] + test, + break_results, + file_name=None, + line_number=-1, + column_number=0, + symbol_name=None, + symbol_match_exact=True, + module_name=None, + offset=-1, + num_locations=-1, +): + out_num_locations = break_results["num_locations"] if num_locations == -1: - test.assertTrue(out_num_locations > 0, - "Expecting one or more locations, got none.") + test.assertTrue( + out_num_locations > 0, "Expecting one or more locations, got none." + ) elif num_locations != -2: test.assertTrue( num_locations == out_num_locations, - "Expecting %d locations, got %d." % - (num_locations, - out_num_locations)) + "Expecting %d locations, got %d." % (num_locations, out_num_locations), + ) if file_name: out_file_name = "" - if 'file' in break_results: - out_file_name = break_results['file'] + if "file" in break_results: + out_file_name = break_results["file"] test.assertTrue( file_name.endswith(out_file_name), - "Breakpoint file name '%s' doesn't match resultant name '%s'." % - (file_name, - out_file_name)) + "Breakpoint file name '%s' doesn't match resultant name '%s'." + % (file_name, out_file_name), + ) if line_number != -1: out_line_number = -1 - if 'line_no' in break_results: - out_line_number = break_results['line_no'] + if "line_no" in break_results: + out_line_number = break_results["line_no"] test.assertTrue( line_number == out_line_number, - "Breakpoint line number %s doesn't match resultant line %s." % - (line_number, - out_line_number)) + "Breakpoint line number %s doesn't match resultant line %s." + % (line_number, out_line_number), + ) if column_number != 0: out_column_number = 0 - if 'column' in break_results: - out_column_number = break_results['column'] + if "column" in break_results: + out_column_number = break_results["column"] test.assertTrue( column_number == out_column_number, - "Breakpoint column number %s doesn't match resultant column %s." % - (column_number, - out_column_number)) + "Breakpoint column number %s doesn't match resultant column %s." + % (column_number, out_column_number), + ) if symbol_name: out_symbol_name = "" # Look first for the inlined symbol name, otherwise use the symbol # name: - if 'inline_symbol' in break_results and break_results['inline_symbol']: - out_symbol_name = break_results['inline_symbol'] - elif 'symbol' in break_results: - out_symbol_name = break_results['symbol'] + if "inline_symbol" in break_results and break_results["inline_symbol"]: + out_symbol_name = break_results["inline_symbol"] + elif "symbol" in break_results: + out_symbol_name = break_results["symbol"] if symbol_match_exact: test.assertTrue( symbol_name == out_symbol_name, - "Symbol name '%s' doesn't match resultant symbol '%s'." % - (symbol_name, - out_symbol_name)) + "Symbol name '%s' doesn't match resultant symbol '%s'." + % (symbol_name, out_symbol_name), + ) else: test.assertTrue( - out_symbol_name.find(symbol_name) != - - 1, - "Symbol name '%s' isn't in resultant symbol '%s'." % - (symbol_name, - out_symbol_name)) + out_symbol_name.find(symbol_name) != -1, + "Symbol name '%s' isn't in resultant symbol '%s'." + % (symbol_name, out_symbol_name), + ) if module_name: out_module_name = None - if 'module' in break_results: - out_module_name = break_results['module'] + if "module" in break_results: + out_module_name = break_results["module"] test.assertTrue( - module_name.find(out_module_name) != - - 1, - "Symbol module name '%s' isn't in expected module name '%s'." % - (out_module_name, - module_name)) + module_name.find(out_module_name) != -1, + "Symbol module name '%s' isn't in expected module name '%s'." + % (out_module_name, module_name), + ) + def check_breakpoint( - test, - bpno, - expected_locations = None, - expected_resolved_count = None, - expected_hit_count = None, - location_id = None, - expected_location_resolved = True, - expected_location_hit_count = None): + test, + bpno, + expected_locations=None, + expected_resolved_count=None, + expected_hit_count=None, + location_id=None, + expected_location_resolved=True, + expected_location_hit_count=None, +): """ Test breakpoint or breakpoint location. - Breakpoint resolved count is always checked. If not specified the assumption is that all locations - should be resolved. + Breakpoint resolved count is always checked. If not specified the assumption is that all locations + should be resolved. To test a breakpoint location, breakpoint number (bpno) and location_id must be set. In this case the resolved count for a breakpoint is not tested by default. The location is expected to be resolved, unless expected_location_resolved is set to False. @@ -697,7 +719,6 @@ def check_breakpoint( test.assertEquals(expected_location_hit_count, loc_bkpt.GetHitCount()) - # ================================================== # Utility functions related to Threads and Processes # ================================================== @@ -745,7 +766,7 @@ def get_stopped_thread(process, reason): def get_threads_stopped_at_breakpoint_id(process, bpid): - """ For a stopped process returns the thread stopped at the breakpoint passed in bkpt""" + """For a stopped process returns the thread stopped at the breakpoint passed in bkpt""" stopped_threads = [] threads = [] @@ -767,8 +788,7 @@ def get_threads_stopped_at_breakpoint(process, bkpt): return get_threads_stopped_at_breakpoint_id(process, bkpt.GetID()) -def get_one_thread_stopped_at_breakpoint_id( - process, bpid, require_exactly_one=True): +def get_one_thread_stopped_at_breakpoint_id(process, bpid, require_exactly_one=True): threads = get_threads_stopped_at_breakpoint_id(process, bpid) if len(threads) == 0: return None @@ -778,21 +798,26 @@ def get_one_thread_stopped_at_breakpoint_id( return threads[0] -def get_one_thread_stopped_at_breakpoint( - process, bkpt, require_exactly_one=True): +def get_one_thread_stopped_at_breakpoint(process, bkpt, require_exactly_one=True): return get_one_thread_stopped_at_breakpoint_id( - process, bkpt.GetID(), require_exactly_one) + process, bkpt.GetID(), require_exactly_one + ) def is_thread_crashed(test, thread): """In the test suite we dereference a null pointer to simulate a crash. The way this is reported depends on the platform.""" if test.platformIsDarwin(): - return thread.GetStopReason( - ) == lldb.eStopReasonException and "EXC_BAD_ACCESS" in thread.GetStopDescription(100) + return ( + thread.GetStopReason() == lldb.eStopReasonException + and "EXC_BAD_ACCESS" in thread.GetStopDescription(100) + ) elif test.getPlatform() == "linux": - return thread.GetStopReason() == lldb.eStopReasonSignal and thread.GetStopReasonDataAtIndex( - 0) == thread.GetProcess().GetUnixSignals().GetSignalNumberFromName("SIGSEGV") + return ( + thread.GetStopReason() == lldb.eStopReasonSignal + and thread.GetStopReasonDataAtIndex(0) + == thread.GetProcess().GetUnixSignals().GetSignalNumberFromName("SIGSEGV") + ) elif test.getPlatform() == "windows": return "Exception 0xc0000005" in thread.GetStopDescription(200) else: @@ -808,24 +833,29 @@ def get_crashed_threads(test, process): threads.append(thread) return threads + # Helper functions for run_to_{source,name}_breakpoint: -def run_to_breakpoint_make_target(test, exe_name = "a.out", in_cwd = True): + +def run_to_breakpoint_make_target(test, exe_name="a.out", in_cwd=True): exe = test.getBuildArtifact(exe_name) if in_cwd else exe_name # Create the target target = test.dbg.CreateTarget(exe) - test.assertTrue(target, "Target: %s is not valid."%(exe_name)) + test.assertTrue(target, "Target: %s is not valid." % (exe_name)) # Set environment variables for the inferior. if lldbtest_config.inferior_env: - test.runCmd('settings set target.env-vars {}'.format( - lldbtest_config.inferior_env)) + test.runCmd( + "settings set target.env-vars {}".format(lldbtest_config.inferior_env) + ) return target -def run_to_breakpoint_do_run(test, target, bkpt, launch_info = None, - only_one_thread = True, extra_images = None): + +def run_to_breakpoint_do_run( + test, target, bkpt, launch_info=None, only_one_thread=True, extra_images=None +): # Launch the process, and do not stop at the entry point. if not launch_info: launch_info = target.GetLaunchInfo() @@ -840,20 +870,26 @@ def run_to_breakpoint_do_run(test, target, bkpt, launch_info = None, # Unfortunate workaround for the iPhone simulator. retry = SIMULATOR_RETRY - while (retry and error.Fail() and error.GetCString() and - "Unable to boot the Simulator" in error.GetCString()): + while ( + retry + and error.Fail() + and error.GetCString() + and "Unable to boot the Simulator" in error.GetCString() + ): retry -= 1 - print("** Simulator is unresponsive. Retrying %d more time(s)"%retry) + print("** Simulator is unresponsive. Retrying %d more time(s)" % retry) import time + time.sleep(60) error = lldb.SBError() process = target.Launch(launch_info, error) - test.assertTrue(process, - "Could not create a valid process for %s: %s" % - (target.GetExecutable().GetFilename(), error.GetCString())) - test.assertFalse(error.Fail(), - "Process launch failed: %s" % (error.GetCString())) + test.assertTrue( + process, + "Could not create a valid process for %s: %s" + % (target.GetExecutable().GetFilename(), error.GetCString()), + ) + test.assertFalse(error.Fail(), "Process launch failed: %s" % (error.GetCString())) def processStateInfo(process): info = "state: {}".format(state_type_to_str(process.state)) @@ -870,115 +906,149 @@ def run_to_breakpoint_do_run(test, target, bkpt, launch_info = None, return info if process.state != lldb.eStateStopped: - test.fail("Test process is not stopped at breakpoint: {}".format(processStateInfo(process))) + test.fail( + "Test process is not stopped at breakpoint: {}".format( + processStateInfo(process) + ) + ) # Frame #0 should be at our breakpoint. - threads = get_threads_stopped_at_breakpoint( - process, bkpt) + threads = get_threads_stopped_at_breakpoint(process, bkpt) num_threads = len(threads) if only_one_thread: - test.assertEqual(num_threads, 1, "Expected 1 thread to stop at breakpoint, %d did."%(num_threads)) + test.assertEqual( + num_threads, + 1, + "Expected 1 thread to stop at breakpoint, %d did." % (num_threads), + ) else: test.assertGreater(num_threads, 0, "No threads stopped at breakpoint") thread = threads[0] return (target, process, thread, bkpt) -def run_to_name_breakpoint (test, bkpt_name, launch_info = None, - exe_name = "a.out", - bkpt_module = None, - in_cwd = True, - only_one_thread = True, - extra_images = None): + +def run_to_name_breakpoint( + test, + bkpt_name, + launch_info=None, + exe_name="a.out", + bkpt_module=None, + in_cwd=True, + only_one_thread=True, + extra_images=None, +): """Start up a target, using exe_name as the executable, and run it to - a breakpoint set by name on bkpt_name restricted to bkpt_module. + a breakpoint set by name on bkpt_name restricted to bkpt_module. - If you want to pass in launch arguments or environment - variables, you can optionally pass in an SBLaunchInfo. If you - do that, remember to set the working directory as well. + If you want to pass in launch arguments or environment + variables, you can optionally pass in an SBLaunchInfo. If you + do that, remember to set the working directory as well. - If your executable isn't called a.out, you can pass that in. - And if your executable isn't in the CWD, pass in the absolute - path to the executable in exe_name, and set in_cwd to False. + If your executable isn't called a.out, you can pass that in. + And if your executable isn't in the CWD, pass in the absolute + path to the executable in exe_name, and set in_cwd to False. - If you need to restrict the breakpoint to a particular module, - pass the module name (a string not a FileSpec) in bkpt_module. If - nothing is passed in setting will be unrestricted. + If you need to restrict the breakpoint to a particular module, + pass the module name (a string not a FileSpec) in bkpt_module. If + nothing is passed in setting will be unrestricted. - If the target isn't valid, the breakpoint isn't found, or hit, the - function will cause a testsuite failure. + If the target isn't valid, the breakpoint isn't found, or hit, the + function will cause a testsuite failure. - If successful it returns a tuple with the target process and - thread that hit the breakpoint, and the breakpoint that we set - for you. + If successful it returns a tuple with the target process and + thread that hit the breakpoint, and the breakpoint that we set + for you. - If only_one_thread is true, we require that there be only one - thread stopped at the breakpoint. Otherwise we only require one - or more threads stop there. If there are more than one, we return - the first thread that stopped. + If only_one_thread is true, we require that there be only one + thread stopped at the breakpoint. Otherwise we only require one + or more threads stop there. If there are more than one, we return + the first thread that stopped. """ target = run_to_breakpoint_make_target(test, exe_name, in_cwd) breakpoint = target.BreakpointCreateByName(bkpt_name, bkpt_module) - - test.assertTrue(breakpoint.GetNumLocations() > 0, - "No locations found for name breakpoint: '%s'."%(bkpt_name)) - return run_to_breakpoint_do_run(test, target, breakpoint, launch_info, - only_one_thread, extra_images) - -def run_to_source_breakpoint(test, bkpt_pattern, source_spec, - launch_info = None, exe_name = "a.out", - bkpt_module = None, - in_cwd = True, - only_one_thread = True, - extra_images = None, - has_locations_before_run = True): + test.assertTrue( + breakpoint.GetNumLocations() > 0, + "No locations found for name breakpoint: '%s'." % (bkpt_name), + ) + return run_to_breakpoint_do_run( + test, target, breakpoint, launch_info, only_one_thread, extra_images + ) + + +def run_to_source_breakpoint( + test, + bkpt_pattern, + source_spec, + launch_info=None, + exe_name="a.out", + bkpt_module=None, + in_cwd=True, + only_one_thread=True, + extra_images=None, + has_locations_before_run=True, +): """Start up a target, using exe_name as the executable, and run it to - a breakpoint set by source regex bkpt_pattern. + a breakpoint set by source regex bkpt_pattern. - The rest of the behavior is the same as run_to_name_breakpoint. + The rest of the behavior is the same as run_to_name_breakpoint. """ target = run_to_breakpoint_make_target(test, exe_name, in_cwd) # Set the breakpoints breakpoint = target.BreakpointCreateBySourceRegex( - bkpt_pattern, source_spec, bkpt_module) + bkpt_pattern, source_spec, bkpt_module + ) if has_locations_before_run: - test.assertTrue(breakpoint.GetNumLocations() > 0, - 'No locations found for source breakpoint: "%s", file: "%s", dir: "%s"' - %(bkpt_pattern, source_spec.GetFilename(), source_spec.GetDirectory())) - return run_to_breakpoint_do_run(test, target, breakpoint, launch_info, - only_one_thread, extra_images) - -def run_to_line_breakpoint(test, source_spec, line_number, column = 0, - launch_info = None, exe_name = "a.out", - bkpt_module = None, - in_cwd = True, - only_one_thread = True, - extra_images = None): + test.assertTrue( + breakpoint.GetNumLocations() > 0, + 'No locations found for source breakpoint: "%s", file: "%s", dir: "%s"' + % (bkpt_pattern, source_spec.GetFilename(), source_spec.GetDirectory()), + ) + return run_to_breakpoint_do_run( + test, target, breakpoint, launch_info, only_one_thread, extra_images + ) + + +def run_to_line_breakpoint( + test, + source_spec, + line_number, + column=0, + launch_info=None, + exe_name="a.out", + bkpt_module=None, + in_cwd=True, + only_one_thread=True, + extra_images=None, +): """Start up a target, using exe_name as the executable, and run it to - a breakpoint set by (source_spec, line_number(, column)). + a breakpoint set by (source_spec, line_number(, column)). - The rest of the behavior is the same as run_to_name_breakpoint. + The rest of the behavior is the same as run_to_name_breakpoint. """ target = run_to_breakpoint_make_target(test, exe_name, in_cwd) # Set the breakpoints breakpoint = target.BreakpointCreateByLocation( - source_spec, line_number, column, 0, lldb.SBFileSpecList()) - test.assertTrue(breakpoint.GetNumLocations() > 0, + source_spec, line_number, column, 0, lldb.SBFileSpecList() + ) + test.assertTrue( + breakpoint.GetNumLocations() > 0, 'No locations found for line breakpoint: "%s:%d(:%d)", dir: "%s"' - %(source_spec.GetFilename(), line_number, column, - source_spec.GetDirectory())) - return run_to_breakpoint_do_run(test, target, breakpoint, launch_info, - only_one_thread, extra_images) + % (source_spec.GetFilename(), line_number, column, source_spec.GetDirectory()), + ) + return run_to_breakpoint_do_run( + test, target, breakpoint, launch_info, only_one_thread, extra_images + ) def continue_to_breakpoint(process, bkpt): - """ Continues the process, if it stops, returns the threads stopped at bkpt; otherwise, returns None""" + """Continues the process, if it stops, returns the threads stopped at bkpt; otherwise, returns None""" process.Continue() if process.GetState() != lldb.eStateStopped: return None @@ -992,10 +1062,13 @@ def continue_to_source_breakpoint(test, process, bkpt_pattern, source_spec): Otherwise the same as `continue_to_breakpoint` """ breakpoint = process.target.BreakpointCreateBySourceRegex( - bkpt_pattern, source_spec, None) - test.assertTrue(breakpoint.GetNumLocations() > 0, - 'No locations found for source breakpoint: "%s", file: "%s", dir: "%s"' - %(bkpt_pattern, source_spec.GetFilename(), source_spec.GetDirectory())) + bkpt_pattern, source_spec, None + ) + test.assertTrue( + breakpoint.GetNumLocations() > 0, + 'No locations found for source breakpoint: "%s", file: "%s", dir: "%s"' + % (bkpt_pattern, source_spec.GetFilename(), source_spec.GetDirectory()), + ) stopped_threads = continue_to_breakpoint(process, breakpoint) process.target.BreakpointDelete(breakpoint.GetID()) return stopped_threads @@ -1019,6 +1092,7 @@ def get_function_names(thread): """ Returns a sequence of function names from the stack frames of this thread. """ + def GetFuncName(i): return thread.GetFrameAtIndex(i).GetFunctionName() @@ -1029,6 +1103,7 @@ def get_symbol_names(thread): """ Returns a sequence of symbols for this thread. """ + def GetSymbol(i): return thread.GetFrameAtIndex(i).GetSymbol().GetName() @@ -1039,6 +1114,7 @@ def get_pc_addresses(thread): """ Returns a sequence of pc addresses for this thread. """ + def GetPCAddress(i): return thread.GetFrameAtIndex(i).GetPCAddress() @@ -1049,9 +1125,9 @@ def get_filenames(thread): """ Returns a sequence of file names from the stack frames of this thread. """ + def GetFilename(i): - return thread.GetFrameAtIndex( - i).GetLineEntry().GetFileSpec().GetFilename() + return thread.GetFrameAtIndex(i).GetLineEntry().GetFileSpec().GetFilename() return list(map(GetFilename, list(range(thread.GetNumFrames())))) @@ -1060,6 +1136,7 @@ def get_line_numbers(thread): """ Returns a sequence of line numbers from the stack frames of this thread. """ + def GetLineNumber(i): return thread.GetFrameAtIndex(i).GetLineEntry().GetLine() @@ -1070,9 +1147,9 @@ def get_module_names(thread): """ Returns a sequence of module names from the stack frames of this thread. """ + def GetModuleName(i): - return thread.GetFrameAtIndex( - i).GetModule().GetFileSpec().GetFilename() + return thread.GetFrameAtIndex(i).GetModule().GetFileSpec().GetFilename() return list(map(GetModuleName, list(range(thread.GetNumFrames())))) @@ -1081,6 +1158,7 @@ def get_stack_frames(thread): """ Returns a sequence of stack frames for this thread. """ + def GetStackFrame(i): return thread.GetFrameAtIndex(i) @@ -1108,10 +1186,11 @@ def print_stacktrace(thread, string_buffer=False): desc = "" print( "Stack trace for thread id={0:#x} name={1} queue={2} ".format( - thread.GetThreadID(), - thread.GetName(), - thread.GetQueueName()) + desc, - file=output) + thread.GetThreadID(), thread.GetName(), thread.GetQueueName() + ) + + desc, + file=output, + ) for i in range(depth): frame = thread.GetFrameAtIndex(i) @@ -1128,22 +1207,25 @@ def print_stacktrace(thread, string_buffer=False): addr=load_addr, mod=mods[i], symbol=symbols[i], - offset=symbol_offset), - file=output) + offset=symbol_offset, + ), + file=output, + ) else: print( " frame #{num}: {addr:#016x} {mod}`{func} at {file}:{line} {args}".format( num=i, addr=load_addr, mod=mods[i], - func='%s [inlined]' % - funcs[i] if frame.IsInlined() else funcs[i], + func="%s [inlined]" % funcs[i] if frame.IsInlined() else funcs[i], file=files[i], line=lines[i], - args=get_args_as_string( - frame, - showFuncName=False) if not frame.IsInlined() else '()'), - file=output) + args=get_args_as_string(frame, showFuncName=False) + if not frame.IsInlined() + else "()", + ), + file=output, + ) if string_buffer: return output.getvalue() @@ -1165,35 +1247,40 @@ def print_stacktraces(process, string_buffer=False): def expect_state_changes(test, listener, process, states, timeout=30): """Listens for state changed events on the listener and makes sure they match what we - expect. Stop-and-restart events (where GetRestartedFromEvent() returns true) are ignored.""" + expect. Stop-and-restart events (where GetRestartedFromEvent() returns true) are ignored. + """ for expected_state in states: + def get_next_event(): event = lldb.SBEvent() if not listener.WaitForEventForBroadcasterWithType( - timeout, - process.GetBroadcaster(), - lldb.SBProcess.eBroadcastBitStateChanged, - event): + timeout, + process.GetBroadcaster(), + lldb.SBProcess.eBroadcastBitStateChanged, + event, + ): test.fail( - "Timed out while waiting for a transition to state %s" % - lldb.SBDebugger.StateAsCString(expected_state)) + "Timed out while waiting for a transition to state %s" + % lldb.SBDebugger.StateAsCString(expected_state) + ) return event event = get_next_event() - while (lldb.SBProcess.GetStateFromEvent(event) == lldb.eStateStopped and - lldb.SBProcess.GetRestartedFromEvent(event)): + while lldb.SBProcess.GetStateFromEvent( + event + ) == lldb.eStateStopped and lldb.SBProcess.GetRestartedFromEvent(event): # Ignore restarted event and the subsequent running event. event = get_next_event() test.assertEqual( lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning, - "Restarted event followed by a running event") + "Restarted event followed by a running event", + ) event = get_next_event() - test.assertEqual( - lldb.SBProcess.GetStateFromEvent(event), - expected_state) + test.assertEqual(lldb.SBProcess.GetStateFromEvent(event), expected_state) + def start_listening_from(broadcaster, event_mask): """Creates a listener for a specific event mask and add it to the source broadcaster.""" @@ -1202,6 +1289,7 @@ def start_listening_from(broadcaster, event_mask): broadcaster.AddListener(listener, event_mask) return listener + def fetch_next_event(test, listener, broadcaster, match_class=False, timeout=10): """Fetch one event from the listener and return it if it matches the provided broadcaster. If `match_class` is set to True, this will match an event with an entire broadcaster class. @@ -1219,8 +1307,10 @@ def fetch_next_event(test, listener, broadcaster, match_class=False, timeout=10) stream = lldb.SBStream() event.GetDescription(stream) - test.fail("received event '%s' from unexpected broadcaster '%s'." % - (stream.GetData(), event.GetBroadcaster().GetName())) + test.fail( + "received event '%s' from unexpected broadcaster '%s'." + % (stream.GetData(), event.GetBroadcaster().GetName()) + ) test.fail("couldn't fetch an event before reaching the timeout.") @@ -1257,9 +1347,7 @@ def get_args_as_string(frame, showFuncName=True): vars = frame.GetVariables(True, False, False, True) # type of SBValueList args = [] # list of strings for var in vars: - args.append("(%s)%s=%s" % (var.GetTypeName(), - var.GetName(), - var.GetValue())) + args.append("(%s)%s=%s" % (var.GetTypeName(), var.GetName(), var.GetValue())) if frame.GetFunction(): name = frame.GetFunction().GetName() elif frame.GetSymbol(): @@ -1280,18 +1368,20 @@ def print_registers(frame, string_buffer=False): print("Register sets for " + str(frame), file=output) registerSet = frame.GetRegisters() # Return type of SBValueList. - print("Frame registers (size of register set = %d):" % - registerSet.GetSize(), file=output) + print( + "Frame registers (size of register set = %d):" % registerSet.GetSize(), + file=output, + ) for value in registerSet: - #print(value, file=output) - print("%s (number of children = %d):" % - (value.GetName(), value.GetNumChildren()), file=output) + # print(value, file=output) + print( + "%s (number of children = %d):" % (value.GetName(), value.GetNumChildren()), + file=output, + ) for child in value: print( - "Name: %s, Value: %s" % - (child.GetName(), - child.GetValue()), - file=output) + "Name: %s, Value: %s" % (child.GetName(), child.GetValue()), file=output + ) if string_buffer: return output.getvalue() @@ -1351,6 +1441,7 @@ def get_ESRs(frame): """ return get_registers(frame, "exception state") + # ====================================== # Utility classes/functions for SBValues # ====================================== @@ -1371,11 +1462,15 @@ class BasicFormatter(object): val = value.GetValue() if val is None and value.GetNumChildren() > 0: val = "%s (location)" % value.GetLocation() - print("{indentation}({type}) {name} = {value}".format( - indentation=' ' * indent, - type=value.GetTypeName(), - name=value.GetName(), - value=val), file=output) + print( + "{indentation}({type}) {name} = {value}".format( + indentation=" " * indent, + type=value.GetTypeName(), + name=value.GetName(), + value=val, + ), + file=output, + ) return output.getvalue() @@ -1397,8 +1492,7 @@ class ChildVisitingFormatter(BasicFormatter): BasicFormatter.format(self, value, buffer=output) for child in value: - BasicFormatter.format( - self, child, buffer=output, indent=self.cindent) + BasicFormatter.format(self, child, buffer=output, indent=self.cindent) return output.getvalue() @@ -1426,18 +1520,17 @@ class RecursiveDecentFormatter(BasicFormatter): new_indent = self.lindent + self.cindent for child in value: if child.GetSummary() is not None: - BasicFormatter.format( - self, child, buffer=output, indent=new_indent) + BasicFormatter.format(self, child, buffer=output, indent=new_indent) else: if child.GetNumChildren() > 0: rdf = RecursiveDecentFormatter(indent_level=new_indent) rdf.format(child, buffer=output) else: - BasicFormatter.format( - self, child, buffer=output, indent=new_indent) + BasicFormatter.format(self, child, buffer=output, indent=new_indent) return output.getvalue() + # =========================================================== # Utility functions for path manipulation on remote platforms # =========================================================== @@ -1445,9 +1538,9 @@ class RecursiveDecentFormatter(BasicFormatter): def join_remote_paths(*paths): # TODO: update with actual platform name for remote windows once it exists - if lldb.remote_platform.GetName() == 'remote-windows': - return os.path.join(*paths).replace(os.path.sep, '\\') - return os.path.join(*paths).replace(os.path.sep, '/') + if lldb.remote_platform.GetName() == "remote-windows": + return os.path.join(*paths).replace(os.path.sep, "\\") + return os.path.join(*paths).replace(os.path.sep, "/") def append_to_process_working_directory(test, *paths): @@ -1456,6 +1549,7 @@ def append_to_process_working_directory(test, *paths): return join_remote_paths(remote.GetWorkingDirectory(), *paths) return os.path.join(test.getBuildDir(), *paths) + # ================================================== # Utility functions to get the correct signal number # ================================================== @@ -1474,45 +1568,51 @@ def get_signal_number(signal_name): # No remote platform; fall back to using local python signals. return getattr(signal, signal_name) -def get_actions_for_signal(testcase, signal_name, from_target=False, expected_absent=False): + +def get_actions_for_signal( + testcase, signal_name, from_target=False, expected_absent=False +): """Returns a triple of (pass, stop, notify)""" return_obj = lldb.SBCommandReturnObject() command = "process handle {0}".format(signal_name) if from_target: command += " -t" - testcase.dbg.GetCommandInterpreter().HandleCommand( - command, return_obj) + testcase.dbg.GetCommandInterpreter().HandleCommand(command, return_obj) match = re.match( - 'NAME *PASS *STOP *NOTIFY.*(false|true|not set) *(false|true|not set) *(false|true|not set)', + "NAME *PASS *STOP *NOTIFY.*(false|true|not set) *(false|true|not set) *(false|true|not set)", return_obj.GetOutput(), - re.IGNORECASE | re.DOTALL) + re.IGNORECASE | re.DOTALL, + ) if match and expected_absent: testcase.fail('Signal "{0}" was supposed to be absent'.format(signal_name)) if not match: if expected_absent: return (None, None, None) - testcase.fail('Unable to retrieve default signal disposition.') + testcase.fail("Unable to retrieve default signal disposition.") return (match.group(1), match.group(2), match.group(3)) +def set_actions_for_signal( + testcase, signal_name, pass_action, stop_action, notify_action, expect_success=True +): + return_obj = lldb.SBCommandReturnObject() + command = "process handle {0}".format(signal_name) + if pass_action != None: + command += " -p {0}".format(pass_action) + if stop_action != None: + command += " -s {0}".format(stop_action) + if notify_action != None: + command += " -n {0}".format(notify_action) -def set_actions_for_signal(testcase, signal_name, pass_action, stop_action, notify_action, expect_success=True): - return_obj = lldb.SBCommandReturnObject() - command = "process handle {0}".format(signal_name) - if pass_action != None: - command += " -p {0}".format(pass_action) - if stop_action != None: - command += " -s {0}".format(stop_action) - if notify_action != None: - command +=" -n {0}".format(notify_action) - - testcase.dbg.GetCommandInterpreter().HandleCommand(command, return_obj) - testcase.assertEqual(expect_success, - return_obj.Succeeded(), - "Setting signal handling for {0} worked as expected".format(signal_name)) + testcase.dbg.GetCommandInterpreter().HandleCommand(command, return_obj) + testcase.assertEqual( + expect_success, + return_obj.Succeeded(), + "Setting signal handling for {0} worked as expected".format(signal_name), + ) -class PrintableRegex(object): +class PrintableRegex(object): def __init__(self, text): self.regex = re.compile(text) self.text = text @@ -1542,36 +1642,41 @@ def skip_if_library_missing(test, target, library): if isinstance(library, str): if library == filename: return False - elif hasattr(library, 'match'): + elif hasattr(library, "match"): if library.match(filename): return False return True def find_library_callable(test): return find_library(target, library) + return skip_if_callable( test, find_library_callable, - "could not find library matching '%s' in target %s" % - (library, - target)) + "could not find library matching '%s' in target %s" % (library, target), + ) def read_file_on_target(test, remote): if lldb.remote_platform: local = test.getBuildArtifact("file_from_target") - error = lldb.remote_platform.Get(lldb.SBFileSpec(remote, False), - lldb.SBFileSpec(local, True)) - test.assertTrue(error.Success(), "Reading file {0} failed: {1}".format(remote, error)) + error = lldb.remote_platform.Get( + lldb.SBFileSpec(remote, False), lldb.SBFileSpec(local, True) + ) + test.assertTrue( + error.Success(), "Reading file {0} failed: {1}".format(remote, error) + ) else: local = remote - with open(local, 'r') as f: + with open(local, "r") as f: return f.read() + def read_file_from_process_wd(test, name): path = append_to_process_working_directory(test, name) return read_file_on_target(test, path) + def wait_for_file_on_target(testcase, file_path, max_attempts=6): for i in range(max_attempts): err, retcode, msg = testcase.run_platform_command("ls %s" % file_path) @@ -1580,14 +1685,16 @@ def wait_for_file_on_target(testcase, file_path, max_attempts=6): if i < max_attempts: # Exponential backoff! import time + time.sleep(pow(2, i) * 0.25) else: testcase.fail( - "File %s not found even after %d attempts." % - (file_path, max_attempts)) + "File %s not found even after %d attempts." % (file_path, max_attempts) + ) return read_file_on_target(testcase, file_path) + def packetlog_get_process_info(log): """parse a gdb-remote packet log file and extract the response to qProcessInfo""" process_info = dict() @@ -1596,32 +1703,37 @@ def packetlog_get_process_info(log): expect_process_info_response = False for line in logfile: if expect_process_info_response: - for pair in line.split(';'): - keyval = pair.split(':') + for pair in line.split(";"): + keyval = pair.split(":") if len(keyval) == 2: process_info[keyval[0]] = keyval[1] break - if 'send packet: $qProcessInfo#' in line: + if "send packet: $qProcessInfo#" in line: expect_process_info_response = True return process_info + def packetlog_get_dylib_info(log): """parse a gdb-remote packet log file and extract the *last* complete (=> fetch_all_solibs=true) response to jGetLoadedDynamicLibrariesInfos""" import json + dylib_info = None with open(log, "r") as logfile: dylib_info = None expect_dylib_info_response = False for line in logfile: if expect_dylib_info_response: - while line[0] != '$': + while line[0] != "$": line = line[1:] line = line[1:] # Unescape '}'. - dylib_info = json.loads(line.replace('}]','}')[:-4]) + dylib_info = json.loads(line.replace("}]", "}")[:-4]) expect_dylib_info_response = False - if 'send packet: $jGetLoadedDynamicLibrariesInfos:{"fetch_all_solibs":true}' in line: + if ( + 'send packet: $jGetLoadedDynamicLibrariesInfos:{"fetch_all_solibs":true}' + in line + ): expect_dylib_info_response = True return dylib_info |