aboutsummaryrefslogtreecommitdiff
path: root/lldb/scripts/reproducer-replay.py
blob: f44e3cf4935385da6603a55a6b0c08c749c5be43 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#!/usr/bin/env python3

from multiprocessing import Pool
import multiprocessing
import argparse
import tempfile
import logging
import os
import subprocess


def run_reproducer(path):
    proc = subprocess.Popen(
        [LLDB, "--replay", path], stdout=subprocess.PIPE, stderr=subprocess.PIPE
    )
    reason = None
    try:
        outs, errs = proc.communicate(timeout=TIMEOUT)
        success = proc.returncode == 0
        result = "PASSED" if success else "FAILED"
        if not success:
            outs = outs.decode()
            errs = errs.decode()
            # Do some pattern matching to find out the cause of the failure.
            if "Encountered unexpected packet during replay" in errs:
                reason = "Unexpected packet"
            elif "Assertion failed" in errs:
                reason = "Assertion failed"
            elif "UNREACHABLE" in errs:
                reason = "Unreachable executed"
            elif "Segmentation fault" in errs:
                reason = "Segmentation fault"
            elif "Illegal instruction" in errs:
                reason = "Illegal instruction"
            else:
                reason = f"Exit code {proc.returncode}"
    except subprocess.TimeoutExpired:
        proc.kill()
        success = False
        outs, errs = proc.communicate()
        result = "TIMEOUT"

    if not FAILURE_ONLY or not success:
        reason_str = f" ({reason})" if reason else ""
        print(f"{result}: {path}{reason_str}")
        if VERBOSE:
            if outs:
                print(outs)
            if errs:
                print(errs)


def find_reproducers(path):
    for root, dirs, files in os.walk(path):
        for dir in dirs:
            _, extension = os.path.splitext(dir)
            if dir.startswith("Test") and extension == ".py":
                yield os.path.join(root, dir)


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="LLDB API Test Replay Driver. "
        "Replay one or more reproducers in parallel using the specified LLDB driver. "
        "The script will look for reproducers generated by the API lit test suite. "
        "To generate the reproducers, pass --param 'lldb-run-with-repro=capture' to lit."
    )
    parser.add_argument(
        "-j",
        "--threads",
        type=int,
        default=multiprocessing.cpu_count(),
        help="Number of threads. The number of CPU threads if not specified.",
    )
    parser.add_argument(
        "-t",
        "--timeout",
        type=int,
        default=60,
        help="Replay timeout in seconds. 60 seconds if not specified.",
    )
    parser.add_argument(
        "-p",
        "--path",
        type=str,
        default=os.getcwd(),
        help="Path to the directory containing the reproducers. The current working directory if not specified.",
    )
    parser.add_argument(
        "-l",
        "--lldb",
        type=str,
        required=True,
        help="Path to the LLDB command line driver",
    )
    parser.add_argument(
        "-v", "--verbose", help="Print replay output.", action="store_true"
    )
    parser.add_argument(
        "--failure-only", help="Only log failures.", action="store_true"
    )
    args = parser.parse_args()

    global LLDB
    global TIMEOUT
    global VERBOSE
    global FAILURE_ONLY
    LLDB = args.lldb
    TIMEOUT = args.timeout
    VERBOSE = args.verbose
    FAILURE_ONLY = args.failure_only

    print(
        f"Replaying reproducers in {args.path} with {args.threads} threads and a {args.timeout} seconds timeout"
    )

    try:
        pool = Pool(args.threads)
        pool.map(run_reproducer, find_reproducers(args.path))
    except KeyboardInterrupt:
        print("Interrupted")