import sys
import argparse
import subprocess
import tempfile
import json
import os
from datetime import datetime
import numpy as np
from scipy.optimize import curve_fit
from scipy.stats import t
def generate_cpp_cycle_test(n: int) -> str:
"""
Generates a C++ code snippet with a specified number of pointers in a cycle.
Creates a while loop that rotates N pointers.
This pattern tests the convergence speed of the dataflow analysis when
reaching its fixed point.
Example:
struct MyObj { int id; ~MyObj() {} };
void long_cycle_4(bool condition) {
MyObj v1{1};
MyObj v2{1};
MyObj v3{1};
MyObj v4{1};
MyObj* p1 = &v1;
MyObj* p2 = &v2;
MyObj* p3 = &v3;
MyObj* p4 = &v4;
while (condition) {
MyObj* temp = p1;
p1 = p2;
p2 = p3;
p3 = p4;
p4 = temp;
}
}
"""
if n <= 0:
return "// Number of variables must be positive."
cpp_code = "struct MyObj { int id; ~MyObj() {} };\n\n"
cpp_code += f"void long_cycle_{n}(bool condition) {{\n"
for i in range(1, n + 1):
cpp_code += f" MyObj v{i}{{1}};\n"
cpp_code += "\n"
for i in range(1, n + 1):
cpp_code += f" MyObj* p{i} = &v{i};\n"
cpp_code += "\n while (condition) {\n"
if n > 0:
cpp_code += f" MyObj* temp = p1;\n"
for i in range(1, n):
cpp_code += f" p{i} = p{i+1};\n"
cpp_code += f" p{n} = temp;\n"
cpp_code += " }\n}\n"
cpp_code += f"\nint main() {{ long_cycle_{n}(false); return 0; }}\n"
return cpp_code
def generate_cpp_merge_test(n: int) -> str:
"""
Creates N independent if statements that merge at a single point.
This pattern specifically stresses the performance of the
'LifetimeLattice::join' operation.
Example:
struct MyObj { int id; ~MyObj() {} };
void conditional_merges_4(bool condition) {
MyObj v1, v2, v3, v4;
MyObj *p1 = nullptr, *p2 = nullptr, *p3 = nullptr, *p4 = nullptr;
if(condition) { p1 = &v1; }
if(condition) { p2 = &v2; }
if(condition) { p3 = &v3; }
if(condition) { p4 = &v4; }
}
"""
if n <= 0:
return "// Number of variables must be positive."
cpp_code = "struct MyObj { int id; ~MyObj() {} };\n\n"
cpp_code += f"void conditional_merges_{n}(bool condition) {{\n"
decls = [f"v{i}" for i in range(1, n + 1)]
cpp_code += f" MyObj {', '.join(decls)};\n"
ptr_decls = [f"*p{i} = nullptr" for i in range(1, n + 1)]
cpp_code += f" MyObj {', '.join(ptr_decls)};\n\n"
for i in range(1, n + 1):
cpp_code += f" if(condition) {{ p{i} = &v{i}; }}\n"
cpp_code += "}\n"
cpp_code += f"\nint main() {{ conditional_merges_{n}(false); return 0; }}\n"
return cpp_code
def generate_cpp_nested_loop_test(n: int) -> str:
"""
Generates C++ code with N levels of nested loops.
This pattern tests how analysis performance scales with loop nesting depth,
which is a key factor in the complexity of dataflow analyses on structured
control flow.
Example (n=3):
struct MyObj { int id; ~MyObj() {} };
void nested_loops_3() {
MyObj* p = nullptr;
for(int i0=0; i0<2; ++i0) {
MyObj s0;
p = &s0;
for(int i1=0; i1<2; ++i1) {
MyObj s1;
p = &s1;
for(int i2=0; i2<2; ++i2) {
MyObj s2;
p = &s2;
}
}
}
}
"""
if n <= 0:
return "// Nesting depth must be positive."
cpp_code = "struct MyObj { int id; ~MyObj() {} };\n\n"
cpp_code += f"void nested_loops_{n}() {{\n"
cpp_code += " MyObj* p = nullptr;\n"
for i in range(n):
indent = " " * (i + 1)
cpp_code += f"{indent}for(int i{i}=0; i{i}<2; ++i{i}) {{\n"
cpp_code += f"{indent} MyObj s{i}; p = &s{i};\n"
for i in range(n - 1, -1, -1):
indent = " " * (i + 1)
cpp_code += f"{indent}}}\n"
cpp_code += "}\n"
cpp_code += f"\nint main() {{ nested_loops_{n}(); return 0; }}\n"
return cpp_code
def generate_cpp_switch_fan_out_test(n: int) -> str:
"""
Generates C++ code with a switch statement with N branches.
Each branch 'i' 'uses' (reads) a single, unique pointer 'pi'.
This pattern creates a "fan-in" join point for the backward
liveness analysis, stressing the LivenessMap::join operation
by forcing it to merge N disjoint, single-element sets of live origins.
The resulting complexity for LiveOrigins should be O(n log n) or higher.
Example (n=3):
struct MyObj { int id; ~MyObj() {} };
void switch_fan_out_3(int condition) {
MyObj v1{1}; MyObj v2{1}; MyObj v3{1};
MyObj* p1 = &v1; MyObj* p2 = &v2; MyObj* p3 = &v3;
switch (condition % 3) {
case 0:
p1->id = 1;
break;
case 1:
p2->id = 1;
break;
case 2:
p3->id = 1;
break;
}
}
"""
if n <= 0:
return "// Number of variables must be positive."
cpp_code = "struct MyObj { int id; ~MyObj() {} };\n\n"
cpp_code += f"void switch_fan_out{n}(int condition) {{\n"
# Generate N distinct objects
for i in range(1, n + 1):
cpp_code += f" MyObj v{i}{{1}};\n"
cpp_code += "\n"
# Generate N distinct pointers, each as a separate variable
for i in range(1, n + 1):
cpp_code += f" MyObj* p{i} = &v{i};\n"
cpp_code += "\n"
cpp_code += f" switch (condition % {n}) {{\n"
for case_num in range(n):
cpp_code += f" case {case_num}:\n"
cpp_code += f" p{case_num + 1}->id = 1;\n"
cpp_code += " break;\n"
cpp_code += " }\n}\n"
cpp_code += f"\nint main() {{ switch_fan_out{n}(0); return 0; }}\n"
return cpp_code
def analyze_trace_file(trace_path: str) -> dict:
"""
Parses the -ftime-trace JSON output to find durations for the lifetime
analysis and its sub-phases.
Returns a dictionary of durations in microseconds.
"""
durations = {
"lifetime_us": 0.0,
"total_us": 0.0,
"fact_gen_us": 0.0,
"loan_prop_us": 0.0,
"live_origins_us": 0.0,
}
event_name_map = {
"LifetimeSafetyAnalysis": "lifetime_us",
"ExecuteCompiler": "total_us",
"FactGenerator": "fact_gen_us",
"LoanPropagation": "loan_prop_us",
"LiveOrigins": "live_origins_us",
}
try:
with open(trace_path, "r") as f:
trace_data = json.load(f)
for event in trace_data.get("traceEvents", []):
event_name = event.get("name")
if event_name in event_name_map:
key = event_name_map[event_name]
durations[key] += float(event.get("dur", 0))
except (IOError, json.JSONDecodeError) as e:
print(f"Error reading or parsing trace file {trace_path}: {e}", file=sys.stderr)
return {key: 0.0 for key in durations}
return durations
def power_law(n, c, k):
"""Represents the power law function: y = c * n^k"""
return c * np.power(n, k)
def human_readable_time(ms: float) -> str:
"""Converts milliseconds to a human-readable string (ms or s)."""
if ms >= 1000:
return f"{ms / 1000:.2f} s"
return f"{ms:.2f} ms"
def calculate_complexity(n_data, y_data) -> tuple[float | None, float | None]:
"""
Calculates the exponent 'k' for the power law fit y = c * n^k.
Returns a tuple of (k, k_standard_error).
"""
try:
if len(n_data) < 3 or np.all(y_data < 1e-6) or np.var(y_data) < 1e-6:
return None, None
non_zero_indices = y_data > 0
if np.sum(non_zero_indices) < 3:
return None, None
n_fit, y_fit = n_data[non_zero_indices], y_data[non_zero_indices]
popt, pcov = curve_fit(power_law, n_fit, y_fit, p0=[0, 1], maxfev=5000)
k_stderr = np.sqrt(np.diag(pcov))[1]
return popt[1], k_stderr
except (RuntimeError, ValueError):
return None, None
def generate_markdown_report(results: dict) -> str:
"""Generates a concise, Markdown-formatted report from the benchmark results."""
report = []
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S %Z")
report.append(f"# Lifetime Analysis Performance Report")
report.append(f"> Generated on: {timestamp}")
report.append("\n---\n")
for test_name, data in results.items():
title = data["title"]
report.append(f"## Test Case: {title}")
report.append("\n**Timing Results:**\n")
# Table header
report.append(
"| N (Input Size) | Total Time | Analysis Time (%) | Fact Generator (%) | Loan Propagation (%) | Live Origins (%) |"
)
report.append(
"|:---------------|-----------:|------------------:|-------------------:|---------------------:|------------------:|"
)
# Table rows
n_data = np.array(data["n"])
total_ms_data = np.array(data["total_ms"])
for i in range(len(n_data)):
total_t = total_ms_data[i]
if total_t < 1e-6:
total_t = 1.0 # Avoid division by zero
row = [
f"| {n_data[i]:<14} |",
f"{human_readable_time(total_t):>10} |",
f"{data['lifetime_ms'][i] / total_t * 100:>17.2f}% |",
f"{data['fact_gen_ms'][i] / total_t * 100:>18.2f}% |",
f"{data['loan_prop_ms'][i] / total_t * 100:>20.2f}% |",
f"{data['live_origins_ms'][i] / total_t * 100:>17.2f}% |",
]
report.append(" ".join(row))
report.append("\n**Complexity Analysis:**\n")
report.append("| Analysis Phase | Complexity O(nk) | ")
report.append("|:------------------|:--------------------------|")
analysis_phases = {
"Total Analysis": data["lifetime_ms"],
"FactGenerator": data["fact_gen_ms"],
"LoanPropagation": data["loan_prop_ms"],
"LiveOrigins": data["live_origins_ms"],
}
for phase_name, y_data in analysis_phases.items():
k, delta = calculate_complexity(n_data, np.array(y_data))
if k is not None and delta is not None:
complexity_str = f"O(n{k:.2f} ± {delta:.2f})"
else:
complexity_str = "(Negligible)"
report.append(f"| {phase_name:<17} | {complexity_str:<25} |")
report.append("\n---\n")
return "\n".join(report)
def run_single_test(
clang_binary: str, output_dir: str, test_name: str, generator_func, n: int
) -> dict:
"""Generates, compiles, and benchmarks a single test case."""
print(f"--- Running Test: {test_name.capitalize()} with N={n} ---")
generated_code = generator_func(n)
base_name = f"test_{test_name}_{n}"
source_file = os.path.join(output_dir, f"{base_name}.cpp")
trace_file = os.path.join(output_dir, f"{base_name}.json")
with open(source_file, "w") as f:
f.write(generated_code)
clang_command = [
clang_binary,
"-c",
"-o",
"/dev/null",
"-ftime-trace=" + trace_file,
"-Xclang",
"-fexperimental-lifetime-safety",
"-std=c++17",
source_file,
]
try:
result = subprocess.run(
clang_command, capture_output=True, text=True, timeout=60
)
except subprocess.TimeoutExpired:
print(f"Compilation timed out for N={n}!", file=sys.stderr)
return {}
if result.returncode != 0:
print(f"Compilation failed for N={n}!", file=sys.stderr)
print(result.stderr, file=sys.stderr)
return {}
durations_us = analyze_trace_file(trace_file)
return {
key.replace("_us", "_ms"): value / 1000.0 for key, value in durations_us.items()
}
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Generate, compile, and benchmark C++ test cases for Clang's lifetime analysis."
)
parser.add_argument(
"--clang-binary", type=str, required=True, help="Path to the Clang executable."
)
parser.add_argument(
"--output-dir",
type=str,
default="benchmark_results",
help="Directory to save persistent benchmark files. (Default: ./benchmark_results)",
)
args = parser.parse_args()
os.makedirs(args.output_dir, exist_ok=True)
print(f"Benchmark files will be saved in: {os.path.abspath(args.output_dir)}\n")
# Maximize 'n' values while keeping execution time under 10s.
test_configurations = [
{
"name": "cycle",
"title": "Pointer Cycle in Loop",
"generator_func": generate_cpp_cycle_test,
"n_values": [50, 75, 100, 200, 300],
},
{
"name": "merge",
"title": "CFG Merges",
"generator_func": generate_cpp_merge_test,
"n_values": [400, 1000, 2000, 5000],
},
{
"name": "nested_loops",
"title": "Deeply Nested Loops",
"generator_func": generate_cpp_nested_loop_test,
"n_values": [50, 100, 150, 200],
},
{
"name": "switch_fan_out",
"title": "Switch Fan-out",
"generator_func": generate_cpp_switch_fan_out_test,
"n_values": [500, 1000, 2000, 4000],
},
]
results = {}
print("Running performance benchmarks...")
for config in test_configurations:
test_name = config["name"]
results[test_name] = {
"title": config["title"],
"n": [],
"lifetime_ms": [],
"total_ms": [],
"fact_gen_ms": [],
"loan_prop_ms": [],
"live_origins_ms": [],
}
for n in config["n_values"]:
durations_ms = run_single_test(
args.clang_binary,
args.output_dir,
test_name,
config["generator_func"],
n,
)
if durations_ms:
results[test_name]["n"].append(n)
for key, value in durations_ms.items():
results[test_name][key].append(value)
print(
f" Total Analysis: {human_readable_time(durations_ms['lifetime_ms'])} | "
f"FactGen: {human_readable_time(durations_ms['fact_gen_ms'])} | "
f"LoanProp: {human_readable_time(durations_ms['loan_prop_ms'])} | "
f"LiveOrigins: {human_readable_time(durations_ms['live_origins_ms'])}"
)
print("\n\n" + "=" * 80)
print("Generating Markdown Report...")
print("=" * 80 + "\n")
markdown_report = generate_markdown_report(results)
print(markdown_report)
report_filename = os.path.join(args.output_dir, "performance_report.md")
with open(report_filename, "w") as f:
f.write(markdown_report)
print(f"Report saved to: {report_filename}")