diff options
author | John Harrison <harjohn@google.com> | 2025-05-16 08:47:01 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-05-16 08:47:01 -0700 |
commit | 087a5d2ec7897cd99d3787820711fec76a8e1792 (patch) | |
tree | 4bec8a6486a79d36dcc2edbdd5e13fdb5f3c9437 /lldb/packages/Python/lldbsuite | |
parent | 6ebb84869e6cf1a643b053e53931c6075b2cc8ae (diff) | |
download | llvm-087a5d2ec7897cd99d3787820711fec76a8e1792.zip llvm-087a5d2ec7897cd99d3787820711fec76a8e1792.tar.gz llvm-087a5d2ec7897cd99d3787820711fec76a8e1792.tar.bz2 |
[lldb-dap] Adding additional asserts to unit tests. (#140107)
Adding an assert that the 'continue' request succeeds caused a number of
tests to fail. This showed a number of tests that were not specifying if
they should be stopped or not at key points in the test. This is likely
contributing to these tests being flaky since the debugger is not in the
expected state.
Additionally, I spent a little time trying to improve the readability of
the dap_server.py and lldbdap_testcase.py.
Diffstat (limited to 'lldb/packages/Python/lldbsuite')
-rw-r--r-- | lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py | 294 | ||||
-rw-r--r-- | lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py | 246 |
2 files changed, 231 insertions, 309 deletions
diff --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py index 73f7b0e..d3589e7 100644 --- a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py +++ b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py @@ -12,6 +12,13 @@ import signal import sys import threading import time +from typing import Any, Optional, Union, BinaryIO, TextIO + +## DAP type references +Event = dict[str, Any] +Request = dict[str, Any] +Response = dict[str, Any] +ProtocolMessage = Union[Event, Request, Response] def dump_memory(base_addr, data, num_per_line, outfile): @@ -98,55 +105,40 @@ def dump_dap_log(log_file): print("========= END =========", file=sys.stderr) -def read_packet_thread(vs_comm, log_file): - done = False - try: - while not done: - packet = read_packet(vs_comm.recv, trace_file=vs_comm.trace_file) - # `packet` will be `None` on EOF. We want to pass it down to - # handle_recv_packet anyway so the main thread can handle unexpected - # termination of lldb-dap and stop waiting for new packets. - done = not vs_comm.handle_recv_packet(packet) - finally: - # Wait for the process to fully exit before dumping the log file to - # ensure we have the entire log contents. - if vs_comm.process is not None: - try: - # Do not wait forever, some logs are better than none. - vs_comm.process.wait(timeout=20) - except subprocess.TimeoutExpired: - pass - dump_dap_log(log_file) - - class DebugCommunication(object): - def __init__(self, recv, send, init_commands, log_file=None): - self.trace_file = None + def __init__( + self, + recv: BinaryIO, + send: BinaryIO, + init_commands: list[str], + log_file: Optional[TextIO] = None, + ): + # For debugging test failures, try setting `trace_file = sys.stderr`. + self.trace_file: Optional[TextIO] = None + self.log_file = log_file self.send = send self.recv = recv - self.recv_packets = [] + self.recv_packets: list[Optional[ProtocolMessage]] = [] self.recv_condition = threading.Condition() - self.recv_thread = threading.Thread( - target=read_packet_thread, args=(self, log_file) - ) + self.recv_thread = threading.Thread(target=self._read_packet_thread) self.process_event_body = None - self.exit_status = None + self.exit_status: Optional[int] = None self.initialize_body = None - self.thread_stop_reasons = {} - self.progress_events = [] + self.progress_events: list[Event] = [] self.reverse_requests = [] self.sequence = 1 self.threads = None + self.thread_stop_reasons = {} self.recv_thread.start() self.output_condition = threading.Condition() - self.output = {} + self.output: dict[str, list[str]] = {} self.configuration_done_sent = False self.frame_scopes = {} self.init_commands = init_commands self.disassembled_instructions = {} @classmethod - def encode_content(cls, s): + def encode_content(cls, s: str) -> bytes: return ("Content-Length: %u\r\n\r\n%s" % (len(s), s)).encode("utf-8") @classmethod @@ -156,6 +148,18 @@ class DebugCommunication(object): if command["seq"] != response["request_seq"]: raise ValueError("seq mismatch in response") + def _read_packet_thread(self): + done = False + try: + while not done: + packet = read_packet(self.recv, trace_file=self.trace_file) + # `packet` will be `None` on EOF. We want to pass it down to + # handle_recv_packet anyway so the main thread can handle unexpected + # termination of lldb-dap and stop waiting for new packets. + done = not self._handle_recv_packet(packet) + finally: + dump_dap_log(self.log_file) + def get_modules(self): module_list = self.request_modules()["body"]["modules"] modules = {} @@ -190,13 +194,13 @@ class DebugCommunication(object): break return collected_output if collected_output else None - def enqueue_recv_packet(self, packet): + def _enqueue_recv_packet(self, packet: Optional[ProtocolMessage]): self.recv_condition.acquire() self.recv_packets.append(packet) self.recv_condition.notify() self.recv_condition.release() - def handle_recv_packet(self, packet): + def _handle_recv_packet(self, packet: Optional[ProtocolMessage]) -> bool: """Called by the read thread that is waiting for all incoming packets to store the incoming packet in "self.recv_packets" in a thread safe way. This function will then signal the "self.recv_condition" to @@ -205,7 +209,7 @@ class DebugCommunication(object): """ # If EOF, notify the read thread by enqueuing a None. if not packet: - self.enqueue_recv_packet(None) + self._enqueue_recv_packet(None) return False # Check the packet to see if is an event packet @@ -235,6 +239,18 @@ class DebugCommunication(object): # When a new process is attached or launched, remember the # details that are available in the body of the event self.process_event_body = body + elif event == "exited": + # Process exited, mark the status to indicate the process is not + # alive. + self.exit_status = body["exitCode"] + elif event == "continued": + # When the process continues, clear the known threads and + # thread_stop_reasons. + all_threads_continued = body.get("allThreadsContinued", True) + tid = body["threadId"] + if tid in self.thread_stop_reasons: + del self.thread_stop_reasons[tid] + self._process_continued(all_threads_continued) elif event == "stopped": # Each thread that stops with a reason will send a # 'stopped' event. We need to remember the thread stop @@ -252,10 +268,16 @@ class DebugCommunication(object): elif packet_type == "response": if packet["command"] == "disconnect": keepGoing = False - self.enqueue_recv_packet(packet) + self._enqueue_recv_packet(packet) return keepGoing - def send_packet(self, command_dict, set_sequence=True): + def _process_continued(self, all_threads_continued: bool): + self.threads = None + self.frame_scopes = {} + if all_threads_continued: + self.thread_stop_reasons = {} + + def send_packet(self, command_dict: Request, set_sequence=True): """Take the "command_dict" python dictionary and encode it as a JSON string and send the contents as a packet to the VSCode debug adapter""" @@ -273,7 +295,12 @@ class DebugCommunication(object): self.send.write(self.encode_content(json_str)) self.send.flush() - def recv_packet(self, filter_type=None, filter_event=None, timeout=None): + def recv_packet( + self, + filter_type: Optional[str] = None, + filter_event: Optional[Union[str, list[str]]] = None, + timeout: Optional[float] = None, + ) -> Optional[ProtocolMessage]: """Get a JSON packet from the VSCode debug adapter. This function assumes a thread that reads packets is running and will deliver any received packets by calling handle_recv_packet(...). This @@ -309,8 +336,6 @@ class DebugCommunication(object): finally: self.recv_condition.release() - return None - def send_recv(self, command): """Send a command python dictionary as JSON and receive the JSON response. Validates that the response is the correct sequence and @@ -360,47 +385,36 @@ class DebugCommunication(object): return None - def wait_for_event(self, filter=None, timeout=None): - while True: - return self.recv_packet( - filter_type="event", filter_event=filter, timeout=timeout - ) - return None - - def wait_for_events(self, events, timeout=None): - """Wait for a list of events in `events` in any order. - Return the events not hit before the timeout expired""" - events = events[:] # Make a copy to avoid modifying the input - while events: - event_dict = self.wait_for_event(filter=events, timeout=timeout) - if event_dict is None: - break - events.remove(event_dict["event"]) - return events + def wait_for_event( + self, filter: Union[str, list[str]], timeout: Optional[float] = None + ) -> Optional[Event]: + """Wait for the first event that matches the filter.""" + return self.recv_packet( + filter_type="event", filter_event=filter, timeout=timeout + ) - def wait_for_stopped(self, timeout=None): + def wait_for_stopped( + self, timeout: Optional[float] = None + ) -> Optional[list[Event]]: stopped_events = [] stopped_event = self.wait_for_event( filter=["stopped", "exited"], timeout=timeout ) - exited = False while stopped_event: stopped_events.append(stopped_event) # If we exited, then we are done if stopped_event["event"] == "exited": - self.exit_status = stopped_event["body"]["exitCode"] - exited = True break # Otherwise we stopped and there might be one or more 'stopped' # events for each thread that stopped with a reason, so keep # checking for more 'stopped' events and return all of them - stopped_event = self.wait_for_event(filter="stopped", timeout=0.25) - if exited: - self.threads = [] + stopped_event = self.wait_for_event( + filter=["stopped", "exited"], timeout=0.25 + ) return stopped_events - def wait_for_breakpoint_events(self, timeout=None): - breakpoint_events = [] + def wait_for_breakpoint_events(self, timeout: Optional[float] = None): + breakpoint_events: list[Event] = [] while True: event = self.wait_for_event("breakpoint", timeout=timeout) if not event: @@ -408,14 +422,14 @@ class DebugCommunication(object): breakpoint_events.append(event) return breakpoint_events - def wait_for_exited(self): - event_dict = self.wait_for_event("exited") + def wait_for_exited(self, timeout: Optional[float] = None): + event_dict = self.wait_for_event("exited", timeout=timeout) if event_dict is None: raise ValueError("didn't get exited event") return event_dict - def wait_for_terminated(self): - event_dict = self.wait_for_event("terminated") + def wait_for_terminated(self, timeout: Optional[float] = None): + event_dict = self.wait_for_event("terminated", timeout) if event_dict is None: raise ValueError("didn't get terminated event") return event_dict @@ -576,32 +590,30 @@ class DebugCommunication(object): def request_attach( self, - program=None, - pid=None, - waitFor=None, - trace=None, - initCommands=None, - preRunCommands=None, - stopCommands=None, - exitCommands=None, - attachCommands=None, - terminateCommands=None, - coreFile=None, + *, + program: Optional[str] = None, + pid: Optional[int] = None, + waitFor=False, + initCommands: Optional[list[str]] = None, + preRunCommands: Optional[list[str]] = None, + attachCommands: Optional[list[str]] = None, + postRunCommands: Optional[list[str]] = None, + stopCommands: Optional[list[str]] = None, + exitCommands: Optional[list[str]] = None, + terminateCommands: Optional[list[str]] = None, + coreFile: Optional[str] = None, stopOnAttach=True, - postRunCommands=None, - sourceMap=None, - gdbRemotePort=None, - gdbRemoteHostname=None, + sourceMap: Optional[Union[list[tuple[str, str]], dict[str, str]]] = None, + gdbRemotePort: Optional[int] = None, + gdbRemoteHostname: Optional[str] = None, ): args_dict = {} if pid is not None: args_dict["pid"] = pid if program is not None: args_dict["program"] = program - if waitFor is not None: + if waitFor: args_dict["waitFor"] = waitFor - if trace: - args_dict["trace"] = trace args_dict["initCommands"] = self.init_commands if initCommands: args_dict["initCommands"].extend(initCommands) @@ -671,7 +683,7 @@ class DebugCommunication(object): self.threads = None self.frame_scopes = {} - def request_continue(self, threadId=None): + def request_continue(self, threadId=None, singleThread=False): if self.exit_status is not None: raise ValueError("request_continue called after process exited") # If we have launched or attached, then the first continue is done by @@ -681,13 +693,18 @@ class DebugCommunication(object): args_dict = {} if threadId is None: threadId = self.get_thread_id() - args_dict["threadId"] = threadId + if threadId: + args_dict["threadId"] = threadId + if singleThread: + args_dict["singleThread"] = True command_dict = { "command": "continue", "type": "request", "arguments": args_dict, } response = self.send_recv(command_dict) + if response["success"]: + self._process_continued(response["body"]["allThreadsContinued"]) # Caller must still call wait_for_stopped. return response @@ -775,7 +792,7 @@ class DebugCommunication(object): } return self.send_recv(command_dict) - def request_initialize(self, sourceInitFile): + def request_initialize(self, sourceInitFile=False): command_dict = { "command": "initialize", "type": "request", @@ -802,32 +819,32 @@ class DebugCommunication(object): def request_launch( self, - program, - args=None, - cwd=None, - env=None, - stopOnEntry=False, + program: str, + *, + args: Optional[list[str]] = None, + cwd: Optional[str] = None, + env: Optional[dict[str, str]] = None, + stopOnEntry=True, disableASLR=True, disableSTDIO=False, shellExpandArguments=False, - trace=False, - initCommands=None, - preRunCommands=None, - stopCommands=None, - exitCommands=None, - terminateCommands=None, - sourcePath=None, - debuggerRoot=None, - launchCommands=None, - sourceMap=None, runInTerminal=False, - postRunCommands=None, enableAutoVariableSummaries=False, displayExtendedBacktrace=False, enableSyntheticChildDebugging=False, - commandEscapePrefix=None, - customFrameFormat=None, - customThreadFormat=None, + initCommands: Optional[list[str]] = None, + preRunCommands: Optional[list[str]] = None, + launchCommands: Optional[list[str]] = None, + postRunCommands: Optional[list[str]] = None, + stopCommands: Optional[list[str]] = None, + exitCommands: Optional[list[str]] = None, + terminateCommands: Optional[list[str]] = None, + sourceMap: Optional[Union[list[tuple[str, str]], dict[str, str]]] = None, + sourcePath: Optional[str] = None, + debuggerRoot: Optional[str] = None, + commandEscapePrefix: Optional[str] = None, + customFrameFormat: Optional[str] = None, + customThreadFormat: Optional[str] = None, ): args_dict = {"program": program} if args: @@ -842,8 +859,6 @@ class DebugCommunication(object): args_dict["disableSTDIO"] = disableSTDIO if shellExpandArguments: args_dict["shellExpandArguments"] = shellExpandArguments - if trace: - args_dict["trace"] = trace args_dict["initCommands"] = self.init_commands if initCommands: args_dict["initCommands"].extend(initCommands) @@ -1190,7 +1205,8 @@ class DebugCommunication(object): def terminate(self): self.send.close() - # self.recv.close() + if self.recv_thread.is_alive(): + self.recv_thread.join() def request_setInstructionBreakpoints(self, memory_reference=[]): breakpoints = [] @@ -1211,11 +1227,11 @@ class DebugCommunication(object): class DebugAdapterServer(DebugCommunication): def __init__( self, - executable=None, - connection=None, - init_commands=[], - log_file=None, - env=None, + executable: Optional[str] = None, + connection: Optional[str] = None, + init_commands: list[str] = [], + log_file: Optional[TextIO] = None, + env: Optional[dict[str, str]] = None, ): self.process = None self.connection = None @@ -1247,7 +1263,14 @@ class DebugAdapterServer(DebugCommunication): ) @classmethod - def launch(cls, /, executable, env=None, log_file=None, connection=None): + def launch( + cls, + *, + executable: str, + env: Optional[dict[str, str]] = None, + log_file: Optional[TextIO] = None, + connection: Optional[str] = None, + ) -> tuple[subprocess.Popen, Optional[str]]: adapter_env = os.environ.copy() if env is not None: adapter_env.update(env) @@ -1289,26 +1312,29 @@ class DebugAdapterServer(DebugCommunication): return (process, connection) - def get_pid(self): + def get_pid(self) -> int: if self.process: return self.process.pid return -1 def terminate(self): - super(DebugAdapterServer, self).terminate() - if self.process is not None: - process = self.process - self.process = None - try: - # When we close stdin it should signal the lldb-dap that no - # new messages will arrive and it should shutdown on its own. - process.stdin.close() - process.wait(timeout=20) - except subprocess.TimeoutExpired: - process.kill() - process.wait() - if process.returncode != 0: - raise DebugAdapterProcessError(process.returncode) + try: + if self.process is not None: + process = self.process + self.process = None + try: + # When we close stdin it should signal the lldb-dap that no + # new messages will arrive and it should shutdown on its + # own. + process.stdin.close() + process.wait(timeout=20) + except subprocess.TimeoutExpired: + process.kill() + process.wait() + if process.returncode != 0: + raise DebugAdapterProcessError(process.returncode) + finally: + super(DebugAdapterServer, self).terminate() class DebugAdapterError(Exception): diff --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py index c5a7eb7..d7cf8e2 100644 --- a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py +++ b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py @@ -1,5 +1,6 @@ import os import time +from typing import Optional import uuid import dap_server @@ -11,10 +12,14 @@ import lldbgdbserverutils class DAPTestCaseBase(TestBase): # set timeout based on whether ASAN was enabled or not. Increase # timeout by a factor of 10 if ASAN is enabled. - timeoutval = 10 * (10 if ("ASAN_OPTIONS" in os.environ) else 1) + DEFAULT_TIMEOUT = 10 * (10 if ("ASAN_OPTIONS" in os.environ) else 1) NO_DEBUG_INFO_TESTCASE = True - def create_debug_adapter(self, lldbDAPEnv=None, connection=None): + def create_debug_adapter( + self, + lldbDAPEnv: Optional[dict[str, str]] = None, + connection: Optional[str] = None, + ): """Create the Visual Studio Code debug adapter""" self.assertTrue( is_exe(self.lldbDAPExec), "lldb-dap must exist and be executable" @@ -28,7 +33,11 @@ class DAPTestCaseBase(TestBase): env=lldbDAPEnv, ) - def build_and_create_debug_adapter(self, lldbDAPEnv=None, dictionary=None): + def build_and_create_debug_adapter( + self, + lldbDAPEnv: Optional[dict[str, str]] = None, + dictionary: Optional[dict] = None, + ): self.build(dictionary=dictionary) self.create_debug_adapter(lldbDAPEnv) @@ -78,13 +87,13 @@ class DAPTestCaseBase(TestBase): time.sleep(0.5) return False - def verify_breakpoint_hit(self, breakpoint_ids): + def verify_breakpoint_hit(self, breakpoint_ids, timeout=DEFAULT_TIMEOUT): """Wait for the process we are debugging to stop, and verify we hit any breakpoint location in the "breakpoint_ids" array. "breakpoint_ids" should be a list of breakpoint ID strings (["1", "2"]). The return value from self.set_source_breakpoints() or self.set_function_breakpoints() can be passed to this function""" - stopped_events = self.dap_server.wait_for_stopped() + stopped_events = self.dap_server.wait_for_stopped(timeout) for stopped_event in stopped_events: if "body" in stopped_event: body = stopped_event["body"] @@ -110,16 +119,15 @@ class DAPTestCaseBase(TestBase): match_desc = "breakpoint %s." % (breakpoint_id) if match_desc in description: return - self.assertTrue(False, "breakpoint not hit") + self.assertTrue(False, f"breakpoint not hit, stopped_events={stopped_events}") - def verify_stop_exception_info(self, expected_description, timeout=timeoutval): + def verify_stop_exception_info(self, expected_description, timeout=DEFAULT_TIMEOUT): """Wait for the process we are debugging to stop, and verify the stop reason is 'exception' and that the description matches 'expected_description' """ - stopped_events = self.dap_server.wait_for_stopped(timeout=timeout) + stopped_events = self.dap_server.wait_for_stopped(timeout) for stopped_event in stopped_events: - print("stopped_event", stopped_event) if "body" in stopped_event: body = stopped_event["body"] if "reason" not in body: @@ -263,46 +271,61 @@ class DAPTestCaseBase(TestBase): return self.dap_server.request_setVariable(2, name, str(value), id=id) def stepIn( - self, threadId=None, targetId=None, waitForStop=True, granularity="statement" + self, + threadId=None, + targetId=None, + waitForStop=True, + granularity="statement", + timeout=DEFAULT_TIMEOUT, ): response = self.dap_server.request_stepIn( threadId=threadId, targetId=targetId, granularity=granularity ) self.assertTrue(response["success"]) if waitForStop: - return self.dap_server.wait_for_stopped() + return self.dap_server.wait_for_stopped(timeout) return None - def stepOver(self, threadId=None, waitForStop=True, granularity="statement"): + def stepOver( + self, + threadId=None, + waitForStop=True, + granularity="statement", + timeout=DEFAULT_TIMEOUT, + ): self.dap_server.request_next(threadId=threadId, granularity=granularity) if waitForStop: - return self.dap_server.wait_for_stopped() + return self.dap_server.wait_for_stopped(timeout) return None - def stepOut(self, threadId=None, waitForStop=True): + def stepOut(self, threadId=None, waitForStop=True, timeout=DEFAULT_TIMEOUT): self.dap_server.request_stepOut(threadId=threadId) if waitForStop: - return self.dap_server.wait_for_stopped() + return self.dap_server.wait_for_stopped(timeout) return None - def continue_to_next_stop(self): - self.dap_server.request_continue() - return self.dap_server.wait_for_stopped() + def do_continue(self): # `continue` is a keyword. + resp = self.dap_server.request_continue() + self.assertTrue(resp["success"], f"continue request failed: {resp}") + + def continue_to_next_stop(self, timeout=DEFAULT_TIMEOUT): + self.do_continue() + return self.dap_server.wait_for_stopped(timeout) - def continue_to_breakpoints(self, breakpoint_ids): - self.dap_server.request_continue() - self.verify_breakpoint_hit(breakpoint_ids) + def continue_to_breakpoints(self, breakpoint_ids, timeout=DEFAULT_TIMEOUT): + self.do_continue() + self.verify_breakpoint_hit(breakpoint_ids, timeout) - def continue_to_exception_breakpoint(self, filter_label): - self.dap_server.request_continue() + def continue_to_exception_breakpoint(self, filter_label, timeout=DEFAULT_TIMEOUT): + self.do_continue() self.assertTrue( - self.verify_stop_exception_info(filter_label), + self.verify_stop_exception_info(filter_label, timeout), 'verify we got "%s"' % (filter_label), ) - def continue_to_exit(self, exitCode=0): - self.dap_server.request_continue() - stopped_events = self.dap_server.wait_for_stopped() + def continue_to_exit(self, exitCode=0, timeout=DEFAULT_TIMEOUT): + self.do_continue() + stopped_events = self.dap_server.wait_for_stopped(timeout) self.assertEqual( len(stopped_events), 1, "stopped_events = {}".format(stopped_events) ) @@ -330,27 +353,15 @@ class DAPTestCaseBase(TestBase): def attach( self, - program=None, - pid=None, - waitFor=None, - trace=None, - initCommands=None, - preRunCommands=None, - stopCommands=None, - exitCommands=None, - attachCommands=None, - coreFile=None, + *, stopOnAttach=True, disconnectAutomatically=True, - terminateCommands=None, - postRunCommands=None, - sourceMap=None, sourceInitFile=False, expectFailure=False, - gdbRemotePort=None, - gdbRemoteHostname=None, sourceBreakpoints=None, functionBreakpoints=None, + timeout=DEFAULT_TIMEOUT, + **kwargs, ): """Build the default Makefile target, create the DAP debug adapter, and attach to the process. @@ -367,7 +378,7 @@ class DAPTestCaseBase(TestBase): self.addTearDownHook(cleanup) # Initialize and launch the program self.dap_server.request_initialize(sourceInitFile) - self.dap_server.wait_for_event("initialized") + self.dap_server.wait_for_event("initialized", timeout) # Set source breakpoints as part of the launch sequence. if sourceBreakpoints: @@ -389,64 +400,28 @@ class DAPTestCaseBase(TestBase): ) self.dap_server.request_configurationDone() - response = self.dap_server.request_attach( - program=program, - pid=pid, - waitFor=waitFor, - trace=trace, - initCommands=initCommands, - preRunCommands=preRunCommands, - stopCommands=stopCommands, - exitCommands=exitCommands, - attachCommands=attachCommands, - terminateCommands=terminateCommands, - coreFile=coreFile, - stopOnAttach=stopOnAttach, - postRunCommands=postRunCommands, - sourceMap=sourceMap, - gdbRemotePort=gdbRemotePort, - gdbRemoteHostname=gdbRemoteHostname, - ) + response = self.dap_server.request_attach(stopOnAttach=stopOnAttach, **kwargs) if expectFailure: return response if not (response and response["success"]): self.assertTrue( response["success"], "attach failed (%s)" % (response["message"]) ) + if stopOnAttach: + self.dap_server.wait_for_stopped(timeout) def launch( self, program=None, - args=None, - cwd=None, - env=None, - stopOnEntry=False, - disableASLR=False, - disableSTDIO=False, - shellExpandArguments=False, - trace=False, - initCommands=None, - preRunCommands=None, - stopCommands=None, - exitCommands=None, - terminateCommands=None, - sourcePath=None, - debuggerRoot=None, + *, sourceInitFile=False, - launchCommands=None, - sourceMap=None, disconnectAutomatically=True, - runInTerminal=False, - expectFailure=False, - postRunCommands=None, - enableAutoVariableSummaries=False, - displayExtendedBacktrace=False, - enableSyntheticChildDebugging=False, - commandEscapePrefix=None, - customFrameFormat=None, - customThreadFormat=None, sourceBreakpoints=None, functionBreakpoints=None, + expectFailure=False, + stopOnEntry=True, + timeout=DEFAULT_TIMEOUT, + **kwargs, ): """Sending launch request to dap""" @@ -462,7 +437,7 @@ class DAPTestCaseBase(TestBase): # Initialize and launch the program self.dap_server.request_initialize(sourceInitFile) - self.dap_server.wait_for_event("initialized") + self.dap_server.wait_for_event("initialized", timeout) # Set source breakpoints as part of the launch sequence. if sourceBreakpoints: @@ -487,76 +462,28 @@ class DAPTestCaseBase(TestBase): response = self.dap_server.request_launch( program, - args=args, - cwd=cwd, - env=env, stopOnEntry=stopOnEntry, - disableASLR=disableASLR, - disableSTDIO=disableSTDIO, - shellExpandArguments=shellExpandArguments, - trace=trace, - initCommands=initCommands, - preRunCommands=preRunCommands, - stopCommands=stopCommands, - exitCommands=exitCommands, - terminateCommands=terminateCommands, - sourcePath=sourcePath, - debuggerRoot=debuggerRoot, - launchCommands=launchCommands, - sourceMap=sourceMap, - runInTerminal=runInTerminal, - postRunCommands=postRunCommands, - enableAutoVariableSummaries=enableAutoVariableSummaries, - displayExtendedBacktrace=displayExtendedBacktrace, - enableSyntheticChildDebugging=enableSyntheticChildDebugging, - commandEscapePrefix=commandEscapePrefix, - customFrameFormat=customFrameFormat, - customThreadFormat=customThreadFormat, + **kwargs, ) if expectFailure: return response - if not (response and response["success"]): self.assertTrue( response["success"], "launch failed (%s)" % (response["body"]["error"]["format"]), ) + if stopOnEntry: + self.dap_server.wait_for_stopped(timeout) + return response def build_and_launch( self, program, - args=None, - cwd=None, - env=None, - stopOnEntry=False, - disableASLR=False, - disableSTDIO=False, - shellExpandArguments=False, - trace=False, - initCommands=None, - preRunCommands=None, - stopCommands=None, - exitCommands=None, - terminateCommands=None, - sourcePath=None, - debuggerRoot=None, - sourceInitFile=False, - runInTerminal=False, - disconnectAutomatically=True, - postRunCommands=None, - lldbDAPEnv=None, - enableAutoVariableSummaries=False, - displayExtendedBacktrace=False, - enableSyntheticChildDebugging=False, - commandEscapePrefix=None, - customFrameFormat=None, - customThreadFormat=None, - launchCommands=None, - expectFailure=False, - sourceBreakpoints=None, - functionBreakpoints=None, + *, + lldbDAPEnv: Optional[dict[str, str]] = None, + **kwargs, ): """Build the default Makefile target, create the DAP debug adapter, and launch the process. @@ -564,38 +491,7 @@ class DAPTestCaseBase(TestBase): self.build_and_create_debug_adapter(lldbDAPEnv) self.assertTrue(os.path.exists(program), "executable must exist") - return self.launch( - program, - args, - cwd, - env, - stopOnEntry, - disableASLR, - disableSTDIO, - shellExpandArguments, - trace, - initCommands, - preRunCommands, - stopCommands, - exitCommands, - terminateCommands, - sourcePath, - debuggerRoot, - sourceInitFile, - runInTerminal=runInTerminal, - disconnectAutomatically=disconnectAutomatically, - postRunCommands=postRunCommands, - enableAutoVariableSummaries=enableAutoVariableSummaries, - enableSyntheticChildDebugging=enableSyntheticChildDebugging, - displayExtendedBacktrace=displayExtendedBacktrace, - commandEscapePrefix=commandEscapePrefix, - customFrameFormat=customFrameFormat, - customThreadFormat=customThreadFormat, - launchCommands=launchCommands, - expectFailure=expectFailure, - sourceBreakpoints=sourceBreakpoints, - functionBreakpoints=functionBreakpoints, - ) + return self.launch(program, **kwargs) def getBuiltinDebugServerTool(self): # Tries to find simulation/lldb-server/gdbserver tool path. |