# Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. # See https://llvm.org/LICENSE.txt for license information. # SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception """IR2Vec Triplet Generator Generates IR2Vec triplets by applying random optimization levels to LLVM IR files and extracting triplets using llvm-ir2vec. Automatically generates preprocessed files: entity2id.txt, relation2id.txt, and train2id.txt. Usage: python generateTriplets.py """ import argparse import logging import os import random import subprocess from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path from typing import List, Set, Tuple # Configuration OPT_LEVELS = ["O0", "O1", "O2", "O3", "Os", "Oz"] DEFAULT_MAX_WORKERS = 100 logger = logging.getLogger(__name__) # TODO: Change this to a dataclass with slots # when Python 3.10+ is the minimum version # https://docs.python.org/3/library/dataclasses.html#dataclasses.dataclass class TripletResult: """Result from processing a single LLVM IR file""" __slots__ = ["triplets", "max_relation"] def __init__(self, triplets: Set[str], max_relation: int): self.triplets = triplets self.max_relation = max_relation class IR2VecTripletGenerator: """Main class for generating IR2Vec triplets""" def __init__( self, llvm_build_dir: Path, num_optimizations: int, output_dir: Path, max_workers: int = DEFAULT_MAX_WORKERS, ): self.llvm_build_dir = llvm_build_dir self.num_optimizations = num_optimizations self.output_dir = output_dir self.max_workers = max_workers # Tool paths self.opt_binary = os.path.join(llvm_build_dir, "bin", "opt") self.ir2vec_binary = os.path.join(llvm_build_dir, "bin", "llvm-ir2vec") self._validate_setup() # Create output directory if it doesn't exist self.output_dir.mkdir(parents=True, exist_ok=True) def _validate_setup(self): """Validate that all required tools and paths exist""" if not self.llvm_build_dir.exists(): raise FileNotFoundError( f"LLVM build directory not found: {self.llvm_build_dir}" ) if not os.path.isfile(self.opt_binary) or not os.access( self.opt_binary, os.X_OK ): raise FileNotFoundError( f"opt binary not found or not executable: {self.opt_binary}" ) if not os.path.isfile(self.ir2vec_binary) or not os.access( self.ir2vec_binary, os.X_OK ): raise FileNotFoundError( f"llvm-ir2vec binary not found or not executable: {self.ir2vec_binary}" ) if not (1 <= self.num_optimizations <= len(OPT_LEVELS)): raise ValueError( f"Number of optimizations must be between 1-{len(OPT_LEVELS)}" ) def _select_optimization_levels(self) -> List[str]: """Select unique random optimization levels""" return random.sample(OPT_LEVELS, self.num_optimizations) def _process_single_file(self, input_file: Path) -> TripletResult: """Process a single LLVM IR file with multiple optimization levels""" all_triplets = set() max_relation = 1 opt_levels = self._select_optimization_levels() for opt_level in opt_levels: triplets, file_max_relation = self._run_pipeline(input_file, opt_level) if triplets: all_triplets.update(triplets) max_relation = max(max_relation, file_max_relation) logger.debug( f"Generated {len(triplets)} triplets for {input_file} with {opt_level}" ) return TripletResult(all_triplets, max_relation) def _run_pipeline(self, input_file: Path, opt_level: str) -> Tuple[Set[str], int]: """Run opt | llvm-ir2vec pipeline using subprocess pipes.""" try: # Run opt first opt_proc = subprocess.Popen( [self.opt_binary, f"-{opt_level}", str(input_file), "-o", "-"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) # Run llvm-ir2vec with opt's output as input ir2vec_proc = subprocess.Popen( [self.ir2vec_binary, "triplets", "-", "-o", "-"], stdin=opt_proc.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) opt_proc.stdout.close() stdout, _ = ir2vec_proc.communicate() opt_proc.wait() # Check if either process failed if opt_proc.returncode != 0 or ir2vec_proc.returncode != 0: return set(), 1 return self._parse_triplet_output(stdout) except (subprocess.SubprocessError, OSError): return set(), 1 def _parse_triplet_output(self, output: str) -> Tuple[Set[str], int]: """Parse triplet output and extract max relation""" if not output.strip(): return set(), 1 lines = output.strip().split("\n") max_relation = 1 # Extract max relation from metadata line if lines and lines[0].startswith("MAX_RELATION="): max_relation = int(lines[0].split("=")[1]) lines = lines[1:] # Remove duplicate triplets by converting to a set return set(lines), max_relation def generate_triplets(self, file_list: Path) -> None: """Main method to generate triplets from a list of LLVM IR files""" input_files = self._read_file_list(file_list) logger.info( f"Processing {len(input_files)} files with {self.num_optimizations} " f"optimization levels using {self.max_workers} workers" ) all_triplets = set() global_max_relation = 1 with ThreadPoolExecutor(max_workers=self.max_workers) as executor: future_to_file = { executor.submit(self._process_single_file, file): file for file in input_files } for future in as_completed(future_to_file): try: result = future.result() all_triplets.update(result.triplets) global_max_relation = max(global_max_relation, result.max_relation) except (subprocess.SubprocessError, OSError, ValueError) as e: file_path = future_to_file[future] logger.error(f"Error processing {file_path}: {e}") self._generate_output_files(all_triplets, global_max_relation) logger.info("Processing completed successfully") def _read_file_list(self, file_list: Path) -> List[Path]: """Read and validate the list of input files""" input_files = [] with open(file_list, "r") as f: for line_num, line in enumerate(f, 1): if line := line.strip(): file_path = Path(line) if file_path.exists(): input_files.append(file_path) else: logger.warning(f"File not found (line {line_num}): {file_path}") if not input_files: raise ValueError("No valid input files found") return input_files def _generate_output_files(self, all_triplets: Set[str], max_relation: int) -> None: """Generate the final output files""" logger.info(f"Generating output files with {len(all_triplets)} unique triplets") # Write all output files -- train2id.txt, entity2id.txt, relation2id.txt train2id_file = os.path.join(self.output_dir, "train2id.txt") entity2id_file = os.path.join(self.output_dir, "entity2id.txt") relation2id_file = os.path.join(self.output_dir, "relation2id.txt") with open(train2id_file, "w") as f: f.write(f"{len(all_triplets)}\n") f.writelines(f"{triplet}\n" for triplet in all_triplets) self._generate_entity2id(entity2id_file) self._generate_relation2id(relation2id_file, max_relation) def _generate_entity2id(self, output_file: Path) -> None: """Generate entity2id.txt using llvm-ir2vec""" subprocess.run( [str(self.ir2vec_binary), "entities", "-o", str(output_file)], check=True, capture_output=True, ) def _generate_relation2id(self, output_file: Path, max_relation: int) -> None: """Generate relation2id.txt from max relation""" max_relation = max(max_relation, 1) # At least Type and Next relations num_relations = max_relation + 1 with open(output_file, "w") as f: f.write(f"{num_relations}\n") f.write("Type\t0\n") f.write("Next\t1\n") f.writelines(f"Arg{i-2}\t{i}\n" for i in range(2, num_relations)) def main(): """Main entry point""" parser = argparse.ArgumentParser( description="Generate IR2Vec triplets from LLVM IR files", formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( "llvm_build_dir", type=Path, help="Path to LLVM build directory" ) parser.add_argument( "num_optimizations", type=int, help="Number of optimization levels to apply (1-6)", ) parser.add_argument( "ll_file_list", type=Path, help="File containing list of LLVM IR files to process", ) parser.add_argument( "output_dir", type=Path, help="Output directory for generated files" ) parser.add_argument( "-j", "--max-workers", type=int, default=DEFAULT_MAX_WORKERS, help=f"Maximum number of parallel workers (default: {DEFAULT_MAX_WORKERS})", ) parser.add_argument( "-v", "--verbose", action="store_true", help="Enable debug logging" ) parser.add_argument( "-q", "--quiet", action="store_true", help="Suppress all output except errors" ) args = parser.parse_args() # Configure logging level = ( logging.ERROR if args.quiet else (logging.DEBUG if args.verbose else logging.INFO) ) logging.basicConfig( level=level, format="[%(asctime)s] %(levelname)s: %(message)s", datefmt="%H:%M:%S", ) generator = IR2VecTripletGenerator( args.llvm_build_dir, args.num_optimizations, args.output_dir, args.max_workers, ) generator.generate_triplets(args.ll_file_list) if __name__ == "__main__": main()