aboutsummaryrefslogtreecommitdiff
path: root/lldb/packages/Python/lldbsuite/test/gdbclientutils.py
diff options
context:
space:
mode:
Diffstat (limited to 'lldb/packages/Python/lldbsuite/test/gdbclientutils.py')
-rw-r--r--lldb/packages/Python/lldbsuite/test/gdbclientutils.py613
1 files changed, 613 insertions, 0 deletions
diff --git a/lldb/packages/Python/lldbsuite/test/gdbclientutils.py b/lldb/packages/Python/lldbsuite/test/gdbclientutils.py
new file mode 100644
index 0000000..183e6a0
--- /dev/null
+++ b/lldb/packages/Python/lldbsuite/test/gdbclientutils.py
@@ -0,0 +1,613 @@
+import ctypes
+import errno
+import io
+import threading
+import socket
+import traceback
+from lldbsuite.support import seven
+
+def checksum(message):
+ """
+ Calculate the GDB server protocol checksum of the message.
+
+ The GDB server protocol uses a simple modulo 256 sum.
+ """
+ check = 0
+ for c in message:
+ check += ord(c)
+ return check % 256
+
+
+def frame_packet(message):
+ """
+ Create a framed packet that's ready to send over the GDB connection
+ channel.
+
+ Framing includes surrounding the message between $ and #, and appending
+ a two character hex checksum.
+ """
+ return "$%s#%02x" % (message, checksum(message))
+
+
+def escape_binary(message):
+ """
+ Escape the binary message using the process described in the GDB server
+ protocol documentation.
+
+ Most bytes are sent through as-is, but $, #, and { are escaped by writing
+ a { followed by the original byte mod 0x20.
+ """
+ out = ""
+ for c in message:
+ d = ord(c)
+ if d in (0x23, 0x24, 0x7d):
+ out += chr(0x7d)
+ out += chr(d ^ 0x20)
+ else:
+ out += c
+ return out
+
+
+def hex_encode_bytes(message):
+ """
+ Encode the binary message by converting each byte into a two-character
+ hex string.
+ """
+ out = ""
+ for c in message:
+ out += "%02x" % ord(c)
+ return out
+
+
+def hex_decode_bytes(hex_bytes):
+ """
+ Decode the hex string into a binary message by converting each two-character
+ hex string into a single output byte.
+ """
+ out = ""
+ hex_len = len(hex_bytes)
+ while i < hex_len - 1:
+ out += chr(int(hex_bytes[i:i + 2]), 16)
+ i += 2
+ return out
+
+
+class MockGDBServerResponder:
+ """
+ A base class for handling client packets and issuing server responses for
+ GDB tests.
+
+ This handles many typical situations, while still allowing subclasses to
+ completely customize their responses.
+
+ Most subclasses will be interested in overriding the other() method, which
+ handles any packet not recognized in the common packet handling code.
+ """
+
+ registerCount = 40
+ packetLog = None
+
+ def __init__(self):
+ self.packetLog = []
+
+ def respond(self, packet):
+ """
+ Return the unframed packet data that the server should issue in response
+ to the given packet received from the client.
+ """
+ self.packetLog.append(packet)
+ if packet is MockGDBServer.PACKET_INTERRUPT:
+ return self.interrupt()
+ if packet == "c":
+ return self.cont()
+ if packet.startswith("vCont;c"):
+ return self.vCont(packet)
+ if packet[0] == "A":
+ return self.A(packet)
+ if packet[0] == "D":
+ return self.D(packet)
+ if packet[0] == "g":
+ return self.readRegisters()
+ if packet[0] == "G":
+ # Gxxxxxxxxxxx
+ # Gxxxxxxxxxxx;thread:1234;
+ return self.writeRegisters(packet[1:].split(';')[0])
+ if packet[0] == "p":
+ regnum = packet[1:].split(';')[0]
+ return self.readRegister(int(regnum, 16))
+ if packet[0] == "P":
+ register, value = packet[1:].split("=")
+ return self.writeRegister(int(register, 16), value)
+ if packet[0] == "m":
+ addr, length = [int(x, 16) for x in packet[1:].split(',')]
+ return self.readMemory(addr, length)
+ if packet[0] == "M":
+ location, encoded_data = packet[1:].split(":")
+ addr, length = [int(x, 16) for x in location.split(',')]
+ return self.writeMemory(addr, encoded_data)
+ if packet[0:7] == "qSymbol":
+ return self.qSymbol(packet[8:])
+ if packet[0:10] == "qSupported":
+ return self.qSupported(packet[11:].split(";"))
+ if packet == "qfThreadInfo":
+ return self.qfThreadInfo()
+ if packet == "qsThreadInfo":
+ return self.qsThreadInfo()
+ if packet == "qC":
+ return self.qC()
+ if packet == "QEnableErrorStrings":
+ return self.QEnableErrorStrings()
+ if packet == "?":
+ return self.haltReason()
+ if packet == "s":
+ return self.haltReason()
+ if packet[0] == "H":
+ tid = packet[2:]
+ if "." in tid:
+ assert tid.startswith("p")
+ # TODO: do we want to do anything with PID?
+ tid = tid.split(".", 1)[1]
+ return self.selectThread(packet[1], int(tid, 16))
+ if packet[0:6] == "qXfer:":
+ obj, read, annex, location = packet[6:].split(":")
+ offset, length = [int(x, 16) for x in location.split(',')]
+ data, has_more = self.qXferRead(obj, annex, offset, length)
+ if data is not None:
+ return self._qXferResponse(data, has_more)
+ return ""
+ if packet.startswith("vAttach;"):
+ pid = packet.partition(';')[2]
+ return self.vAttach(int(pid, 16))
+ if packet[0] == "Z":
+ return self.setBreakpoint(packet)
+ if packet.startswith("qThreadStopInfo"):
+ threadnum = int (packet[15:], 16)
+ return self.threadStopInfo(threadnum)
+ if packet == "QThreadSuffixSupported":
+ return self.QThreadSuffixSupported()
+ if packet == "QListThreadsInStopReply":
+ return self.QListThreadsInStopReply()
+ if packet.startswith("qMemoryRegionInfo:"):
+ return self.qMemoryRegionInfo(int(packet.split(':')[1], 16))
+ if packet == "qQueryGDBServer":
+ return self.qQueryGDBServer()
+ if packet == "qHostInfo":
+ return self.qHostInfo()
+ if packet == "qGetWorkingDir":
+ return self.qGetWorkingDir()
+ if packet == "qOffsets":
+ return self.qOffsets();
+ if packet == "qsProcessInfo":
+ return self.qsProcessInfo()
+ if packet.startswith("qfProcessInfo"):
+ return self.qfProcessInfo(packet)
+ if packet.startswith("qPathComplete:"):
+ return self.qPathComplete()
+ if packet.startswith("vFile:"):
+ return self.vFile(packet)
+ if packet.startswith("vRun;"):
+ return self.vRun(packet)
+ if packet.startswith("qLaunchSuccess"):
+ return self.qLaunchSuccess()
+ if packet.startswith("QEnvironment:"):
+ return self.QEnvironment(packet)
+ if packet.startswith("QEnvironmentHexEncoded:"):
+ return self.QEnvironmentHexEncoded(packet)
+ if packet.startswith("qRegisterInfo"):
+ regnum = int(packet[len("qRegisterInfo"):], 16)
+ return self.qRegisterInfo(regnum)
+ if packet == "k":
+ return self.k()
+
+ return self.other(packet)
+
+ def qsProcessInfo(self):
+ return "E04"
+
+ def qfProcessInfo(self, packet):
+ return "E04"
+
+ def qGetWorkingDir(self):
+ return "2f"
+
+ def qOffsets(self):
+ return ""
+
+ def qHostInfo(self):
+ return "ptrsize:8;endian:little;"
+
+ def qQueryGDBServer(self):
+ return "E04"
+
+ def interrupt(self):
+ raise self.UnexpectedPacketException()
+
+ def cont(self):
+ raise self.UnexpectedPacketException()
+
+ def vCont(self, packet):
+ raise self.UnexpectedPacketException()
+
+ def A(self, packet):
+ return ""
+
+ def D(self, packet):
+ return "OK"
+
+ def readRegisters(self):
+ return "00000000" * self.registerCount
+
+ def readRegister(self, register):
+ return "00000000"
+
+ def writeRegisters(self, registers_hex):
+ return "OK"
+
+ def writeRegister(self, register, value_hex):
+ return "OK"
+
+ def readMemory(self, addr, length):
+ return "00" * length
+
+ def writeMemory(self, addr, data_hex):
+ return "OK"
+
+ def qSymbol(self, symbol_args):
+ return "OK"
+
+ def qSupported(self, client_supported):
+ return "qXfer:features:read+;PacketSize=3fff;QStartNoAckMode+"
+
+ def qfThreadInfo(self):
+ return "l"
+
+ def qsThreadInfo(self):
+ return "l"
+
+ def qC(self):
+ return "QC0"
+
+ def QEnableErrorStrings(self):
+ return "OK"
+
+ def haltReason(self):
+ # SIGINT is 2, return type is 2 digit hex string
+ return "S02"
+
+ def qXferRead(self, obj, annex, offset, length):
+ return None, False
+
+ def _qXferResponse(self, data, has_more):
+ return "%s%s" % ("m" if has_more else "l", escape_binary(data))
+
+ def vAttach(self, pid):
+ raise self.UnexpectedPacketException()
+
+ def selectThread(self, op, thread_id):
+ return "OK"
+
+ def setBreakpoint(self, packet):
+ raise self.UnexpectedPacketException()
+
+ def threadStopInfo(self, threadnum):
+ return ""
+
+ def other(self, packet):
+ # empty string means unsupported
+ return ""
+
+ def QThreadSuffixSupported(self):
+ return ""
+
+ def QListThreadsInStopReply(self):
+ return ""
+
+ def qMemoryRegionInfo(self, addr):
+ return ""
+
+ def qPathComplete(self):
+ return ""
+
+ def vFile(self, packet):
+ return ""
+
+ def vRun(self, packet):
+ return ""
+
+ def qLaunchSuccess(self):
+ return ""
+
+ def QEnvironment(self, packet):
+ return "OK"
+
+ def QEnvironmentHexEncoded(self, packet):
+ return "OK"
+
+ def qRegisterInfo(self, num):
+ return ""
+
+ def k(self):
+ return ""
+
+ """
+ Raised when we receive a packet for which there is no default action.
+ Override the responder class to implement behavior suitable for the test at
+ hand.
+ """
+ class UnexpectedPacketException(Exception):
+ pass
+
+
+class ServerSocket:
+ """
+ A wrapper class for TCP or pty-based server.
+ """
+
+ def get_connect_address(self):
+ """Get address for the client to connect to."""
+
+ def get_connect_url(self):
+ """Get URL suitable for process connect command."""
+
+ def close_server(self):
+ """Close all resources used by the server."""
+
+ def accept(self):
+ """Accept a single client connection to the server."""
+
+ def close_connection(self):
+ """Close all resources used by the accepted connection."""
+
+ def recv(self):
+ """Receive a data packet from the connected client."""
+
+ def sendall(self, data):
+ """Send the data to the connected client."""
+
+
+class TCPServerSocket(ServerSocket):
+ def __init__(self):
+ family, type, proto, _, addr = socket.getaddrinfo(
+ "localhost", 0, proto=socket.IPPROTO_TCP)[0]
+ self._server_socket = socket.socket(family, type, proto)
+ self._connection = None
+
+ self._server_socket.bind(addr)
+ self._server_socket.listen(1)
+
+ def get_connect_address(self):
+ return "[{}]:{}".format(*self._server_socket.getsockname())
+
+ def get_connect_url(self):
+ return "connect://" + self.get_connect_address()
+
+ def close_server(self):
+ self._server_socket.close()
+
+ def accept(self):
+ assert self._connection is None
+ # accept() is stubborn and won't fail even when the socket is
+ # shutdown, so we'll use a timeout
+ self._server_socket.settimeout(30.0)
+ client, client_addr = self._server_socket.accept()
+ # The connected client inherits its timeout from self._socket,
+ # but we'll use a blocking socket for the client
+ client.settimeout(None)
+ self._connection = client
+
+ def close_connection(self):
+ assert self._connection is not None
+ self._connection.close()
+ self._connection = None
+
+ def recv(self):
+ assert self._connection is not None
+ return self._connection.recv(4096)
+
+ def sendall(self, data):
+ assert self._connection is not None
+ return self._connection.sendall(data)
+
+
+class PtyServerSocket(ServerSocket):
+ def __init__(self):
+ import pty
+ import tty
+ primary, secondary = pty.openpty()
+ tty.setraw(primary)
+ self._primary = io.FileIO(primary, 'r+b')
+ self._secondary = io.FileIO(secondary, 'r+b')
+
+ def get_connect_address(self):
+ libc = ctypes.CDLL(None)
+ libc.ptsname.argtypes = (ctypes.c_int,)
+ libc.ptsname.restype = ctypes.c_char_p
+ return libc.ptsname(self._primary.fileno()).decode()
+
+ def get_connect_url(self):
+ return "serial://" + self.get_connect_address()
+
+ def close_server(self):
+ self._secondary.close()
+ self._primary.close()
+
+ def recv(self):
+ try:
+ return self._primary.read(4096)
+ except OSError as e:
+ # closing the pty results in EIO on Linux, convert it to EOF
+ if e.errno == errno.EIO:
+ return b''
+ raise
+
+ def sendall(self, data):
+ return self._primary.write(data)
+
+
+class MockGDBServer:
+ """
+ A simple TCP-based GDB server that can test client behavior by receiving
+ commands and issuing custom-tailored responses.
+
+ Responses are generated via the .responder property, which should be an
+ instance of a class based on MockGDBServerResponder.
+ """
+
+ responder = None
+ _socket = None
+ _thread = None
+ _receivedData = None
+ _receivedDataOffset = None
+ _shouldSendAck = True
+
+ def __init__(self, socket_class):
+ self._socket_class = socket_class
+ self.responder = MockGDBServerResponder()
+
+ def start(self):
+ self._socket = self._socket_class()
+ # Start a thread that waits for a client connection.
+ self._thread = threading.Thread(target=self._run)
+ self._thread.start()
+
+ def stop(self):
+ self._socket.close_server()
+ self._thread.join()
+ self._thread = None
+
+ def get_connect_address(self):
+ return self._socket.get_connect_address()
+
+ def get_connect_url(self):
+ return self._socket.get_connect_url()
+
+ def _run(self):
+ # For testing purposes, we only need to worry about one client
+ # connecting just one time.
+ try:
+ self._socket.accept()
+ except:
+ return
+ self._shouldSendAck = True
+ self._receivedData = ""
+ self._receivedDataOffset = 0
+ data = None
+ while True:
+ try:
+ data = seven.bitcast_to_string(self._socket.recv())
+ if data is None or len(data) == 0:
+ break
+ self._receive(data)
+ except Exception as e:
+ print("An exception happened when receiving the response from the gdb server. Closing the client...")
+ traceback.print_exc()
+ self._socket.close_connection()
+ break
+
+ def _receive(self, data):
+ """
+ Collects data, parses and responds to as many packets as exist.
+ Any leftover data is kept for parsing the next time around.
+ """
+ self._receivedData += data
+ try:
+ packet = self._parsePacket()
+ while packet is not None:
+ self._handlePacket(packet)
+ packet = self._parsePacket()
+ except self.InvalidPacketException:
+ self._socket.close_connection()
+
+ def _parsePacket(self):
+ """
+ Reads bytes from self._receivedData, returning:
+ - a packet's contents if a valid packet is found
+ - the PACKET_ACK unique object if we got an ack
+ - None if we only have a partial packet
+
+ Raises an InvalidPacketException if unexpected data is received
+ or if checksums fail.
+
+ Once a complete packet is found at the front of self._receivedData,
+ its data is removed form self._receivedData.
+ """
+ data = self._receivedData
+ i = self._receivedDataOffset
+ data_len = len(data)
+ if data_len == 0:
+ return None
+ if i == 0:
+ # If we're looking at the start of the received data, that means
+ # we're looking for the start of a new packet, denoted by a $.
+ # It's also possible we'll see an ACK here, denoted by a +
+ if data[0] == '+':
+ self._receivedData = data[1:]
+ return self.PACKET_ACK
+ if ord(data[0]) == 3:
+ self._receivedData = data[1:]
+ return self.PACKET_INTERRUPT
+ if data[0] == '$':
+ i += 1
+ else:
+ raise self.InvalidPacketException(
+ "Unexpected leading byte: %s" % data[0])
+
+ # If we're looking beyond the start of the received data, then we're
+ # looking for the end of the packet content, denoted by a #.
+ # Note that we pick up searching from where we left off last time
+ while i < data_len and data[i] != '#':
+ i += 1
+
+ # If there isn't enough data left for a checksum, just remember where
+ # we left off so we can pick up there the next time around
+ if i > data_len - 3:
+ self._receivedDataOffset = i
+ return None
+
+ # If we have enough data remaining for the checksum, extract it and
+ # compare to the packet contents
+ packet = data[1:i]
+ i += 1
+ try:
+ check = int(data[i:i + 2], 16)
+ except ValueError:
+ raise self.InvalidPacketException("Checksum is not valid hex")
+ i += 2
+ if check != checksum(packet):
+ raise self.InvalidPacketException(
+ "Checksum %02x does not match content %02x" %
+ (check, checksum(packet)))
+ # remove parsed bytes from _receivedData and reset offset so parsing
+ # can start on the next packet the next time around
+ self._receivedData = data[i:]
+ self._receivedDataOffset = 0
+ return packet
+
+ def _handlePacket(self, packet):
+ if packet is self.PACKET_ACK:
+ # Ignore ACKs from the client. For the future, we can consider
+ # adding validation code to make sure the client only sends ACKs
+ # when it's supposed to.
+ return
+ response = ""
+ # We'll handle the ack stuff here since it's not something any of the
+ # tests will be concerned about, and it'll get turned off quickly anyway.
+ if self._shouldSendAck:
+ self._socket.sendall(seven.bitcast_to_bytes('+'))
+ if packet == "QStartNoAckMode":
+ self._shouldSendAck = False
+ response = "OK"
+ elif self.responder is not None:
+ # Delegate everything else to our responder
+ response = self.responder.respond(packet)
+ # Handle packet framing since we don't want to bother tests with it.
+ if response is not None:
+ framed = frame_packet(response)
+ self._socket.sendall(seven.bitcast_to_bytes(framed))
+
+ PACKET_ACK = object()
+ PACKET_INTERRUPT = object()
+
+ class InvalidPacketException(Exception):
+ pass
+