aboutsummaryrefslogtreecommitdiff
path: root/lldb/packages/Python
diff options
context:
space:
mode:
authorJohn Harrison <harjohn@google.com>2025-10-29 13:52:42 -0700
committerGitHub <noreply@github.com>2025-10-29 13:52:42 -0700
commita49bfbeb15418ccf51ed1bad8d60d5fe6830353b (patch)
treec150ac2096a0319a2af12cc6b612633f5a505f49 /lldb/packages/Python
parentad29838a44f7184e3887b34a1ed1c732022259cc (diff)
downloadllvm-a49bfbeb15418ccf51ed1bad8d60d5fe6830353b.tar.gz
llvm-a49bfbeb15418ccf51ed1bad8d60d5fe6830353b.tar.bz2
llvm-a49bfbeb15418ccf51ed1bad8d60d5fe6830353b.zip
[lldb-dap] Improving consistency of tests by removing concurrency. (#165496)
We currently use a background thread to read the DAP output. This means the test thread and the background thread can race at times and we may have inconsistent timing due to these races. To improve the consistency I've removed the reader thread and instead switched to using the `selectors` module that wraps `select` in a platform independent way.
Diffstat (limited to 'lldb/packages/Python')
-rw-r--r--lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py206
-rw-r--r--lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py2
2 files changed, 80 insertions, 128 deletions
diff --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py
index d892c01f0bc7..8f3652172dfd 100644
--- a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py
+++ b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py
@@ -10,8 +10,8 @@ import string
import subprocess
import signal
import sys
-import threading
import warnings
+import selectors
import time
from typing import (
Any,
@@ -139,35 +139,6 @@ def dump_memory(base_addr, data, num_per_line, outfile):
outfile.write("\n")
-def read_packet(
- f: IO[bytes], trace_file: Optional[IO[str]] = None
-) -> Optional[ProtocolMessage]:
- """Decode a JSON packet that starts with the content length and is
- followed by the JSON bytes from a file 'f'. Returns None on EOF.
- """
- line = f.readline().decode("utf-8")
- if len(line) == 0:
- return None # EOF.
-
- # Watch for line that starts with the prefix
- prefix = "Content-Length: "
- if line.startswith(prefix):
- # Decode length of JSON bytes
- length = int(line[len(prefix) :])
- # Skip empty line
- separator = f.readline().decode()
- if separator != "":
- Exception("malformed DAP content header, unexpected line: " + separator)
- # Read JSON bytes
- json_str = f.read(length).decode()
- if trace_file:
- trace_file.write("from adapter:\n%s\n" % (json_str))
- # Decode the JSON bytes into a python dictionary
- return json.loads(json_str)
-
- raise Exception("unexpected malformed message from lldb-dap: " + line)
-
-
def packet_type_is(packet, packet_type):
return "type" in packet and packet["type"] == packet_type
@@ -199,16 +170,8 @@ class DebugCommunication(object):
self.log_file = log_file
self.send = send
self.recv = recv
-
- # Packets that have been received and processed but have not yet been
- # requested by a test case.
- self._pending_packets: List[Optional[ProtocolMessage]] = []
- # Received packets that have not yet been processed.
- self._recv_packets: List[Optional[ProtocolMessage]] = []
- # Used as a mutex for _recv_packets and for notify when _recv_packets
- # changes.
- self._recv_condition = threading.Condition()
- self._recv_thread = threading.Thread(target=self._read_packet_thread)
+ self.selector = selectors.DefaultSelector()
+ self.selector.register(recv, selectors.EVENT_READ)
# session state
self.init_commands = init_commands
@@ -234,9 +197,6 @@ class DebugCommunication(object):
# keyed by breakpoint id
self.resolved_breakpoints: dict[str, Breakpoint] = {}
- # trigger enqueue thread
- self._recv_thread.start()
-
@classmethod
def encode_content(cls, s: str) -> bytes:
return ("Content-Length: %u\r\n\r\n%s" % (len(s), s)).encode("utf-8")
@@ -252,17 +212,46 @@ class DebugCommunication(object):
f"seq mismatch in response {command['seq']} != {response['request_seq']}"
)
- def _read_packet_thread(self):
- try:
- while True:
- packet = read_packet(self.recv, trace_file=self.trace_file)
- # `packet` will be `None` on EOF. We want to pass it down to
- # handle_recv_packet anyway so the main thread can handle unexpected
- # termination of lldb-dap and stop waiting for new packets.
- if not self._handle_recv_packet(packet):
- break
- finally:
- dump_dap_log(self.log_file)
+ def _read_packet(
+ self,
+ timeout: float = DEFAULT_TIMEOUT,
+ ) -> Optional[ProtocolMessage]:
+ """Decode a JSON packet that starts with the content length and is
+ followed by the JSON bytes from self.recv. Returns None on EOF.
+ """
+
+ ready = self.selector.select(timeout)
+ if not ready:
+ warnings.warn(
+ "timeout occurred waiting for a packet, check if the test has a"
+ " negative assertion and see if it can be inverted.",
+ stacklevel=4,
+ )
+ return None # timeout
+
+ line = self.recv.readline().decode("utf-8")
+ if len(line) == 0:
+ return None # EOF.
+
+ # Watch for line that starts with the prefix
+ prefix = "Content-Length: "
+ if line.startswith(prefix):
+ # Decode length of JSON bytes
+ length = int(line[len(prefix) :])
+ # Skip empty line
+ separator = self.recv.readline().decode()
+ if separator != "":
+ Exception("malformed DAP content header, unexpected line: " + separator)
+ # Read JSON bytes
+ json_str = self.recv.read(length).decode()
+ if self.trace_file:
+ self.trace_file.write(
+ "%s from adapter:\n%s\n" % (time.time(), json_str)
+ )
+ # Decode the JSON bytes into a python dictionary
+ return json.loads(json_str)
+
+ raise Exception("unexpected malformed message from lldb-dap: " + line)
def get_modules(
self, start_module: Optional[int] = None, module_count: Optional[int] = None
@@ -310,34 +299,6 @@ class DebugCommunication(object):
output += self.get_output(category, clear=clear)
return output
- def _enqueue_recv_packet(self, packet: Optional[ProtocolMessage]):
- with self.recv_condition:
- self.recv_packets.append(packet)
- self.recv_condition.notify()
-
- def _handle_recv_packet(self, packet: Optional[ProtocolMessage]) -> bool:
- """Handles an incoming packet.
-
- Called by the read thread that is waiting for all incoming packets
- to store the incoming packet in "self._recv_packets" in a thread safe
- way. This function will then signal the "self._recv_condition" to
- indicate a new packet is available.
-
- Args:
- packet: A new packet to store.
-
- Returns:
- True if the caller should keep calling this function for more
- packets.
- """
- with self._recv_condition:
- self._recv_packets.append(packet)
- self._recv_condition.notify()
- # packet is None on EOF
- return packet is not None and not (
- packet["type"] == "response" and packet["command"] == "disconnect"
- )
-
def _recv_packet(
self,
*,
@@ -361,46 +322,34 @@ class DebugCommunication(object):
The first matching packet for the given predicate, if specified,
otherwise None.
"""
- assert (
- threading.current_thread != self._recv_thread
- ), "Must not be called from the _recv_thread"
-
- def process_until_match():
- self._process_recv_packets()
- for i, packet in enumerate(self._pending_packets):
- if packet is None:
- # We need to return a truthy value to break out of the
- # wait_for, use `EOFError` as an indicator of EOF.
- return EOFError()
- if predicate and predicate(packet):
- self._pending_packets.pop(i)
- return packet
-
- with self._recv_condition:
- packet = self._recv_condition.wait_for(process_until_match, timeout)
- return None if isinstance(packet, EOFError) else packet
-
- def _process_recv_packets(self) -> None:
+ deadline = time.time() + timeout
+
+ while time.time() < deadline:
+ packet = self._read_packet(timeout=deadline - time.time())
+ if packet is None:
+ return None
+ self._process_recv_packet(packet)
+ if not predicate or predicate(packet):
+ return packet
+
+ def _process_recv_packet(self, packet) -> None:
"""Process received packets, updating the session state."""
- with self._recv_condition:
- for packet in self._recv_packets:
- if packet and ("seq" not in packet or packet["seq"] == 0):
- warnings.warn(
- f"received a malformed packet, expected 'seq != 0' for {packet!r}"
- )
- # Handle events that may modify any stateful properties of
- # the DAP session.
- if packet and packet["type"] == "event":
- self._handle_event(packet)
- elif packet and packet["type"] == "request":
- # Handle reverse requests and keep processing.
- self._handle_reverse_request(packet)
- # Move the packet to the pending queue.
- self._pending_packets.append(packet)
- self._recv_packets.clear()
+ if packet and ("seq" not in packet or packet["seq"] == 0):
+ warnings.warn(
+ f"received a malformed packet, expected 'seq != 0' for {packet!r}"
+ )
+ # Handle events that may modify any stateful properties of
+ # the DAP session.
+ if packet and packet["type"] == "event":
+ self._handle_event(packet)
+ elif packet and packet["type"] == "request":
+ # Handle reverse requests and keep processing.
+ self._handle_reverse_request(packet)
def _handle_event(self, packet: Event) -> None:
"""Handle any events that modify debug session state we track."""
+ self.events.append(packet)
+
event = packet["event"]
body: Optional[Dict] = packet.get("body", None)
@@ -453,6 +402,8 @@ class DebugCommunication(object):
self.invalidated_event = packet
elif event == "memory":
self.memory_event = packet
+ elif event == "module":
+ self.module_events.append(packet)
def _handle_reverse_request(self, request: Request) -> None:
if request in self.reverse_requests:
@@ -521,18 +472,14 @@ class DebugCommunication(object):
Returns the seq number of the request.
"""
- # Set the seq for requests.
- if packet["type"] == "request":
- packet["seq"] = self.sequence
- self.sequence += 1
- else:
- packet["seq"] = 0
+ packet["seq"] = self.sequence
+ self.sequence += 1
# Encode our command dictionary as a JSON string
json_str = json.dumps(packet, separators=(",", ":"))
if self.trace_file:
- self.trace_file.write("to adapter:\n%s\n" % (json_str))
+ self.trace_file.write("%s to adapter:\n%s\n" % (time.time(), json_str))
length = len(json_str)
if length > 0:
@@ -913,6 +860,8 @@ class DebugCommunication(object):
if restartArguments:
command_dict["arguments"] = restartArguments
+ # Clear state, the process is about to restart...
+ self._process_continued(True)
response = self._send_recv(command_dict)
# Caller must still call wait_for_stopped.
return response
@@ -1479,8 +1428,10 @@ class DebugCommunication(object):
def terminate(self):
self.send.close()
- if self._recv_thread.is_alive():
- self._recv_thread.join()
+ self.recv.close()
+ self.selector.close()
+ if self.log_file:
+ dump_dap_log(self.log_file)
def request_setInstructionBreakpoints(self, memory_reference=[]):
breakpoints = []
@@ -1577,6 +1528,7 @@ class DebugAdapterServer(DebugCommunication):
stdout=subprocess.PIPE,
stderr=sys.stderr,
env=adapter_env,
+ bufsize=0,
)
if connection is None:
diff --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py
index 29935bb8046f..fd07324d2ddd 100644
--- a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py
+++ b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py
@@ -416,7 +416,7 @@ class DAPTestCaseBase(TestBase):
return self.dap_server.wait_for_stopped()
def continue_to_breakpoint(self, breakpoint_id: str):
- self.continue_to_breakpoints((breakpoint_id))
+ self.continue_to_breakpoints([breakpoint_id])
def continue_to_breakpoints(self, breakpoint_ids):
self.do_continue()