#!/usr/bin/env python from __future__ import absolute_import, division, print_function import os import re import subprocess import sys import tempfile ### class DeltaAlgorithm(object): def __init__(self): self.cache = set() def test(self, changes): abstract ### def getTestResult(self, changes): # There is no reason to cache successful tests because we will # always reduce the changeset when we see one. changeset = frozenset(changes) if changeset in self.cache: return False elif not self.test(changes): self.cache.add(changeset) return False else: return True def run(self, changes, force=False): # Make sure the initial test passes, if not then (a) either # the user doesn't expect monotonicity, and we may end up # doing O(N^2) tests, or (b) the test is wrong. Avoid the # O(N^2) case unless user requests it. if not force: if not self.getTestResult(changes): raise ValueError("Initial test passed to delta fails.") # Check empty set first to quickly find poor test functions. if self.getTestResult(set()): return set() else: return self.delta(changes, self.split(changes)) def split(self, S): """split(set) -> [sets] Partition a set into one or two pieces. """ # There are many ways to split, we could do a better job with more # context information (but then the API becomes grosser). L = list(S) mid = len(L) // 2 if mid == 0: return (L,) else: return L[:mid], L[mid:] def delta(self, c, sets): # assert(reduce(set.union, sets, set()) == c) # If there is nothing left we can remove, we are done. if len(sets) <= 1: return c # Look for a passing subset. res = self.search(c, sets) if res is not None: return res # Otherwise, partition sets if possible; if not we are done. refined = sum(map(list, map(self.split, sets)), []) if len(refined) == len(sets): return c return self.delta(c, refined) def search(self, c, sets): for i, S in enumerate(sets): # If test passes on this subset alone, recurse. if self.getTestResult(S): return self.delta(S, self.split(S)) # Otherwise if we have more than two sets, see if test # pases without this subset. if len(sets) > 2: complement = sum(sets[:i] + sets[i + 1 :], []) if self.getTestResult(complement): return self.delta(complement, sets[:i] + sets[i + 1 :]) ### class Token(object): def __init__(self, type, data, flags, file, line, column): self.type = type self.data = data self.flags = flags self.file = file self.line = line self.column = column kTokenRE = re.compile( r"""([a-z_]+) '(.*)'\t(.*)\tLoc=<(.*):(.*):(.*)>""", re.DOTALL | re.MULTILINE ) def getTokens(path): p = subprocess.Popen( ["clang", "-dump-raw-tokens", path], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) out, err = p.communicate() tokens = [] collect = None for ln in err.split("\n"): # Silly programmers refuse to print in simple machine readable # formats. Whatever. if collect is None: collect = ln else: collect = collect + "\n" + ln if "Loc=<" in ln and ln.endswith(">"): ln, collect = collect, None tokens.append(Token(*kTokenRE.match(ln).groups())) return tokens ### class TMBDDelta(DeltaAlgorithm): def __init__(self, testProgram, tokenLists, log): def patchName(name, suffix): base, ext = os.path.splitext(name) return base + "." + suffix + ext super(TMBDDelta, self).__init__() self.testProgram = testProgram self.tokenLists = tokenLists self.tempFiles = [patchName(f, "tmp") for f, _ in self.tokenLists] self.targetFiles = [patchName(f, "ok") for f, _ in self.tokenLists] self.log = log self.numTests = 0 def writeFiles(self, changes, fileNames): assert len(fileNames) == len(self.tokenLists) byFile = [[] for i in self.tokenLists] for i, j in changes: byFile[i].append(j) for i, (file, tokens) in enumerate(self.tokenLists): f = open(fileNames[i], "w") for j in byFile[i]: f.write(tokens[j]) f.close() return byFile def test(self, changes): self.numTests += 1 byFile = self.writeFiles(changes, self.tempFiles) if self.log: print("TEST - ", end=" ", file=sys.stderr) if self.log > 1: for i, (file, _) in enumerate(self.tokenLists): indices = byFile[i] if i: sys.stderr.write("\n ") sys.stderr.write("%s:%d tokens: [" % (file, len(byFile[i]))) prev = None for j in byFile[i]: if prev is None or j != prev + 1: if prev: sys.stderr.write("%d][" % prev) sys.stderr.write(str(j)) sys.stderr.write(":") prev = j if byFile[i]: sys.stderr.write(str(byFile[i][-1])) sys.stderr.write("] ") else: print( ", ".join( [ "%s:%d tokens" % (file, len(byFile[i])) for i, (file, _) in enumerate(self.tokenLists) ] ), end=" ", file=sys.stderr, ) p = subprocess.Popen([self.testProgram] + self.tempFiles) res = p.wait() == 0 if res: self.writeFiles(changes, self.targetFiles) if self.log: print("=> %s" % res, file=sys.stderr) else: if res: print("\nSUCCESS (%d tokens)" % len(changes)) else: sys.stderr.write(".") return res def run(self): res = super(TMBDDelta, self).run( [ (i, j) for i, (file, tokens) in enumerate(self.tokenLists) for j in range(len(tokens)) ] ) self.writeFiles(res, self.targetFiles) if not self.log: print(file=sys.stderr) return res def tokenBasedMultiDelta(program, files, log): # Read in the lists of tokens. tokenLists = [(file, [t.data for t in getTokens(file)]) for file in files] numTokens = sum([len(tokens) for _, tokens in tokenLists]) print("Delta on %s with %d tokens." % (", ".join(files), numTokens)) tbmd = TMBDDelta(program, tokenLists, log) res = tbmd.run() print( "Finished %s with %d tokens (in %d tests)." % (", ".join(tbmd.targetFiles), len(res), tbmd.numTests) ) def main(): from optparse import OptionParser, OptionGroup parser = OptionParser("%prog {files+}") parser.add_option( "", "--debug", dest="debugLevel", help="set debug level [default %default]", action="store", type=int, default=0, ) (opts, args) = parser.parse_args() if len(args) <= 1: parser.error("Invalid number of arguments.") program, files = args[0], args[1:] md = tokenBasedMultiDelta(program, files, log=opts.debugLevel) if __name__ == "__main__": try: main() except KeyboardInterrupt: print("Interrupted.", file=sys.stderr) os._exit(1) # Avoid freeing our giant cache.