aboutsummaryrefslogtreecommitdiff
path: root/lldb/test/API/python_api
diff options
context:
space:
mode:
Diffstat (limited to 'lldb/test/API/python_api')
-rw-r--r--lldb/test/API/python_api/event/TestEvents.py118
1 files changed, 118 insertions, 0 deletions
diff --git a/lldb/test/API/python_api/event/TestEvents.py b/lldb/test/API/python_api/event/TestEvents.py
index e009fb3..a8a67a9 100644
--- a/lldb/test/API/python_api/event/TestEvents.py
+++ b/lldb/test/API/python_api/event/TestEvents.py
@@ -313,3 +313,121 @@ class EventAPITestCase(TestBase):
self.assertEqual(
self.state, "stopped", "Both expected state changed events received"
)
+
+ def wait_for_next_event(self, expected_state, test_shadow = False):
+ """Wait for an event from self.primary & self.shadow listener.
+ If test_shadow is true, we also check that the shadow listener only
+ receives events AFTER the primary listener does."""
+ # Waiting on the shadow listener shouldn't have events yet because
+ # we haven't fetched them for the primary listener yet:
+ event = lldb.SBEvent()
+
+ if test_shadow:
+ success = self.shadow_listener.WaitForEvent(1, event)
+ self.assertFalse(success, "Shadow listener doesn't pull events")
+
+ # But there should be an event for the primary listener:
+ success = self.primary_listener.WaitForEvent(5, event)
+ self.assertTrue(success, "Primary listener got the event")
+
+ state = lldb.SBProcess.GetStateFromEvent(event)
+ restart = False
+ if state == lldb.eStateStopped:
+ restart = lldb.SBProcess.GetRestartedFromEvent(event)
+
+ if expected_state != None:
+ self.assertEqual(state, expected_state, "Primary thread got the correct event")
+
+ # And after pulling that one there should be an equivalent event for the shadow
+ # listener:
+ success = self.shadow_listener.WaitForEvent(5, event)
+ self.assertTrue(success, "Shadow listener got event too")
+ self.assertEqual(state, lldb.SBProcess.GetStateFromEvent(event), "It was the same event")
+ self.assertEqual(restart, lldb.SBProcess.GetRestartedFromEvent(event), "It was the same restarted")
+
+ return state, restart
+
+ def test_shadow_listener(self):
+ self.build()
+ exe = self.getBuildArtifact("a.out")
+
+ # Create a target by the debugger.
+ target = self.dbg.CreateTarget(exe)
+ self.assertTrue(target, VALID_TARGET)
+
+ # Now create a breakpoint on main.c by name 'c'.
+ bkpt1 = target.BreakpointCreateByName("c", "a.out")
+ self.trace("breakpoint:", bkpt1)
+ self.assertTrue(bkpt1.GetNumLocations() == 1, VALID_BREAKPOINT)
+
+ self.primary_listener = lldb.SBListener("my listener")
+ self.shadow_listener = lldb.SBListener("shadow listener")
+
+ self.cur_thread = None
+
+ error = lldb.SBError()
+ launch_info = target.GetLaunchInfo()
+ launch_info.SetListener(self.primary_listener)
+ launch_info.SetShadowListener(self.shadow_listener)
+
+ self.runCmd("settings set target.process.extra-startup-command QSetLogging:bitmask=LOG_PROCESS|LOG_EXCEPTIONS|LOG_RNB_PACKETS|LOG_STEP;")
+ self.dbg.SetAsync(True)
+
+ self.process = target.Launch(launch_info, error)
+ self.assertSuccess(error, "Process launched successfully")
+
+ # Keep fetching events from the primary to trigger the do on removal and
+ # then from the shadow listener, and make sure they match:
+
+ # Events in the launch sequence might be platform dependent, so don't
+ # expect any particular event till we get the stopped:
+ state = lldb.eStateInvalid
+ while state != lldb.eStateStopped:
+ state, restart = self.wait_for_next_event(None, False)
+
+ # Okay, we're now at a good stop, so try a next:
+ self.cur_thread = self.process.threads[0]
+
+ # Make sure we're at our expected breakpoint:
+ self.assertTrue(self.cur_thread.IsValid(), "Got a zeroth thread")
+ self.assertEqual(self.cur_thread.stop_reason, lldb.eStopReasonBreakpoint)
+ self.assertEqual(self.cur_thread.GetStopReasonDataCount(), 2, "Only one breakpoint/loc here")
+ self.assertEqual(bkpt1.GetID(), self.cur_thread.GetStopReasonDataAtIndex(0), "Hit the right breakpoint")
+ # Disable the first breakpoint so it doesn't get in the way...
+ bkpt1.SetEnabled(False)
+
+ self.cur_thread.StepOver()
+ # We'll run the test for "shadow listener blocked by primary listener
+ # for the first couple rounds, then we'll skip the 1 second pause...
+ self.wait_for_next_event(lldb.eStateRunning, True)
+ self.wait_for_next_event(lldb.eStateStopped, True)
+
+ # Next try an auto-continue breakpoint and make sure the shadow listener got
+ # the resumed info as well. Note that I'm not explicitly counting
+ # running events here. At the point when I wrote this lldb sometimes
+ # emits two running events in a row. Apparently the code to coalesce running
+ # events isn't working. But that's not what this test is testing, we're really
+ # testing that the primary & shadow listeners hear the same thing and in the
+ # right order.
+
+ main_spec = lldb.SBFileSpec("main.c")
+ bkpt2 = target.BreakpointCreateBySourceRegex("b.2. returns %d", main_spec)
+ self.assertTrue(bkpt2.GetNumLocations() > 0, "BP2 worked")
+ bkpt2.SetAutoContinue(True)
+
+ bkpt3 = target.BreakpointCreateBySourceRegex("a.3. returns %d", main_spec)
+ self.assertTrue(bkpt3.GetNumLocations() > 0, "BP3 worked")
+
+ state = lldb.eStateStopped
+ restarted = False
+
+ # Put in a counter to make sure we don't spin forever if there is some
+ # error in the logic.
+ counter = 0
+ while state != lldb.eStateExited:
+ counter += 1
+ self.assertLess(counter, 50, "Took more than 50 events to hit two breakpoints.")
+ if state == lldb.eStateStopped and not restarted:
+ self.process.Continue()
+ state, restarted = self.wait_for_next_event(None, False)
+