# SPDX-License-Identifier: Apache-2.0 # Copyright 2016 The Meson development team # This class contains the basic functionality needed to run any interpreter # or an interpreter-based tool. from __future__ import annotations import os import sys import typing as T from collections import defaultdict from dataclasses import dataclass import itertools from pathlib import Path from .. import mparser, mesonlib, mlog from .. import environment from ..interpreterbase import ( MesonInterpreterObject, InterpreterBase, InvalidArguments, BreakRequest, ContinueRequest, Disabler, default_resolve_key, is_disabled, UnknownValue, UndefinedVariable, InterpreterObject, ) from ..interpreterbase.helpers import flatten from ..interpreter import ( StringHolder, BooleanHolder, IntegerHolder, ArrayHolder, DictHolder, ) from ..mparser import ( ArgumentNode, ArithmeticNode, ArrayNode, AssignmentNode, BaseNode, EmptyNode, IdNode, MethodNode, NotNode, PlusAssignmentNode, TernaryNode, SymbolNode, Token, FunctionNode, ) if T.TYPE_CHECKING: from .visitor import AstVisitor from ..interpreter import Interpreter from ..interpreterbase import SubProject, TYPE_var, TYPE_nvar from ..mparser import ( AndNode, ComparisonNode, ForeachClauseNode, IfClauseNode, IndexNode, OrNode, TestCaseClauseNode, UMinusNode, ) _T = T.TypeVar('_T') _V = T.TypeVar('_V') def _symbol(val: str) -> SymbolNode: return SymbolNode(Token('', '', 0, 0, 0, (0, 0), val)) # `IntrospectionFile` is to the `IntrospectionInterpreter` what `File` is to the normal `Interpreter`. @dataclass class IntrospectionFile: subdir: str rel: str def to_abs_path(self, root_dir: Path) -> Path: return (root_dir / self.subdir / self.rel).resolve() def __hash__(self) -> int: return hash((self.__class__.__name__, self.subdir, self.rel)) # `IntrospectionDependency` is to the `IntrospectionInterpreter` what `Dependency` is to the normal `Interpreter`. @dataclass class IntrospectionDependency(MesonInterpreterObject): name: T.Union[str, UnknownValue] required: T.Union[bool, UnknownValue] version: T.Union[T.List[str], UnknownValue] has_fallback: bool conditional: bool node: FunctionNode # `IntrospectionBuildTarget` is to the `IntrospectionInterpreter` what `BuildTarget` is to the normal `Interpreter`. @dataclass class IntrospectionBuildTarget(MesonInterpreterObject): name: str machine: str id: str typename: str defined_in: str subdir: str build_by_default: bool installed: bool outputs: T.List[str] source_nodes: T.List[BaseNode] extra_files: BaseNode kwargs: T.Dict[str, TYPE_var] node: FunctionNode def is_ignored_edge(src: T.Union[BaseNode, UnknownValue]) -> bool: return (isinstance(src, FunctionNode) and src.func_name.value not in {'files', 'get_variable'}) or isinstance(src, MethodNode) class DataflowDAG: src_to_tgts: T.DefaultDict[T.Union[BaseNode, UnknownValue], T.Set[T.Union[BaseNode, UnknownValue]]] tgt_to_srcs: T.DefaultDict[T.Union[BaseNode, UnknownValue], T.Set[T.Union[BaseNode, UnknownValue]]] def __init__(self) -> None: self.src_to_tgts = defaultdict(set) self.tgt_to_srcs = defaultdict(set) def add_edge(self, source: T.Union[BaseNode, UnknownValue], target: T.Union[BaseNode, UnknownValue]) -> None: self.src_to_tgts[source].add(target) self.tgt_to_srcs[target].add(source) # Returns all nodes in the DAG that are reachable from a node in `srcs`. # In other words, A node `a` is part of the returned set exactly if data # from `srcs` flows into `a`, directly or indirectly. # Certain edges are ignored. def reachable(self, srcs: T.Set[T.Union[BaseNode, UnknownValue]], reverse: bool) -> T.Set[T.Union[BaseNode, UnknownValue]]: reachable = srcs.copy() active = srcs.copy() while active: new: T.Set[T.Union[BaseNode, UnknownValue]] = set() if reverse: for tgt in active: new.update(src for src in self.tgt_to_srcs[tgt] if not is_ignored_edge(src)) else: for src in active: if is_ignored_edge(src): continue new.update(tgt for tgt in self.src_to_tgts[src]) reachable.update(new) active = new return reachable # Returns all paths from src to target. # Certain edges are ignored. def find_all_paths(self, src: T.Union[BaseNode, UnknownValue], target: T.Union[BaseNode, UnknownValue]) -> T.List[T.List[T.Union[BaseNode, UnknownValue]]]: queue = [(src, [src])] paths = [] while queue: cur, path = queue.pop() if cur == target: paths.append(path) if is_ignored_edge(cur): continue queue.extend((tgt, path + [tgt]) for tgt in self.src_to_tgts[cur]) return paths class AstInterpreter(InterpreterBase): def __init__(self, source_root: str, subdir: str, subproject: SubProject, subproject_dir: str, env: environment.Environment, visitors: T.Optional[T.List[AstVisitor]] = None): super().__init__(source_root, subdir, subproject, subproject_dir, env) self.visitors = visitors if visitors is not None else [] self.nesting: T.List[int] = [] self.cur_assignments: T.DefaultDict[str, T.List[T.Tuple[T.List[int], T.Union[BaseNode, UnknownValue]]]] = defaultdict(list) self.all_assignment_nodes: T.DefaultDict[str, T.List[AssignmentNode]] = defaultdict(list) # dataflow_dag is an acyclic directed graph that contains an edge # from one instance of `BaseNode` to another instance of `BaseNode` if # data flows directly from one to the other. Example: If meson.build # contains this: # var = 'foo' + '123' # executable(var, 'src.c') # var = 'bar' # dataflow_dag will contain an edge from the IdNode corresponding to # 'var' in line 2 to the ArithmeticNode corresponding to 'foo' + '123'. # This graph is crucial for e.g. node_to_runtime_value because we have # to know that 'var' in line2 is 'foo123' and not 'bar'. self.dataflow_dag = DataflowDAG() self.funcvals: T.Dict[BaseNode, T.Any] = {} self.tainted = False self.funcs.update({'project': self.func_do_nothing, 'test': self.func_do_nothing, 'benchmark': self.func_do_nothing, 'install_headers': self.func_do_nothing, 'install_man': self.func_do_nothing, 'install_data': self.func_do_nothing, 'install_subdir': self.func_do_nothing, 'install_symlink': self.func_do_nothing, 'install_emptydir': self.func_do_nothing, 'configuration_data': self.func_do_nothing, 'configure_file': self.func_do_nothing, 'find_program': self.func_do_nothing, 'include_directories': self.func_do_nothing, 'add_global_arguments': self.func_do_nothing, 'add_global_link_arguments': self.func_do_nothing, 'add_project_arguments': self.func_do_nothing, 'add_project_dependencies': self.func_do_nothing, 'add_project_link_arguments': self.func_do_nothing, 'message': self.func_do_nothing, 'generator': self.func_do_nothing, 'error': self.func_do_nothing, 'run_command': self.func_do_nothing, 'assert': self.func_do_nothing, 'subproject': self.func_do_nothing, 'dependency': self.func_do_nothing, 'get_option': self.func_do_nothing, 'join_paths': self.func_do_nothing, 'environment': self.func_do_nothing, 'import': self.func_do_nothing, 'vcs_tag': self.func_do_nothing, 'add_languages': self.func_do_nothing, 'declare_dependency': self.func_do_nothing, 'files': self.func_files, 'executable': self.func_do_nothing, 'static_library': self.func_do_nothing, 'shared_library': self.func_do_nothing, 'library': self.func_do_nothing, 'build_target': self.func_do_nothing, 'custom_target': self.func_do_nothing, 'run_target': self.func_do_nothing, 'subdir': self.func_subdir, 'set_variable': self.func_set_variable, 'get_variable': self.func_get_variable, 'unset_variable': self.func_unset_variable, 'is_disabler': self.func_do_nothing, 'is_variable': self.func_do_nothing, 'disabler': self.func_do_nothing, 'jar': self.func_do_nothing, 'warning': self.func_do_nothing, 'shared_module': self.func_do_nothing, 'option': self.func_do_nothing, 'both_libraries': self.func_do_nothing, 'add_test_setup': self.func_do_nothing, 'subdir_done': self.func_do_nothing, 'alias_target': self.func_do_nothing, 'summary': self.func_do_nothing, 'range': self.func_do_nothing, 'structured_sources': self.func_do_nothing, 'debug': self.func_do_nothing, }) def _unholder_args(self, args: T.Any, kwargs: T.Any) -> T.Tuple[T.Any, T.Any]: return args, kwargs def _holderify(self, res: T.Any) -> T.Any: return res def func_do_nothing(self, node: BaseNode, args: T.List[TYPE_var], kwargs: T.Dict[str, TYPE_var]) -> UnknownValue: return UnknownValue() def load_root_meson_file(self) -> None: super().load_root_meson_file() for i in self.visitors: self.ast.accept(i) def func_subdir(self, node: BaseNode, args: T.List[TYPE_var], kwargs: T.Dict[str, TYPE_var]) -> None: args = self.flatten_args(args) if len(args) != 1 or not isinstance(args[0], str): sys.stderr.write(f'Unable to evaluate subdir({args}) in AstInterpreter --> Skipping\n') return subdir, is_new = self._resolve_subdir(self.source_root, args[0]) if not is_new: sys.stderr.write('Trying to enter {} which has already been visited --> Skipping\n'.format(args[0])) return if not self._evaluate_subdir(self.source_root, subdir, self.visitors): buildfilename = os.path.join(subdir, environment.build_filename) sys.stderr.write(f'Unable to find build file {buildfilename} --> Skipping\n') def inner_method_call(self, obj: BaseNode, method_name: str, args: T.List[TYPE_var], kwargs: T.Dict[str, TYPE_var]) -> T.Any: for arg in itertools.chain(args, kwargs.values()): if isinstance(arg, UnknownValue): return UnknownValue() if isinstance(obj, str): result = StringHolder(obj, T.cast('Interpreter', self)).method_call(method_name, args, kwargs) elif isinstance(obj, bool): result = BooleanHolder(obj, T.cast('Interpreter', self)).method_call(method_name, args, kwargs) elif isinstance(obj, int): result = IntegerHolder(obj, T.cast('Interpreter', self)).method_call(method_name, args, kwargs) elif isinstance(obj, list): result = ArrayHolder(obj, T.cast('Interpreter', self)).method_call(method_name, args, kwargs) elif isinstance(obj, dict): result = DictHolder(obj, T.cast('Interpreter', self)).method_call(method_name, args, kwargs) else: return UnknownValue() return result def method_call(self, node: mparser.MethodNode) -> None: invocable = node.source_object self.evaluate_statement(invocable) obj = self.node_to_runtime_value(invocable) method_name = node.name.value (args, kwargs) = self.reduce_arguments(node.args) if is_disabled(args, kwargs): res = Disabler() else: res = self.inner_method_call(obj, method_name, args, kwargs) self.funcvals[node] = res def evaluate_fstring(self, node: mparser.StringNode) -> None: pass def evaluate_arraystatement(self, cur: mparser.ArrayNode) -> None: for arg in cur.args.arguments: self.evaluate_statement(arg) def evaluate_arithmeticstatement(self, cur: ArithmeticNode) -> None: self.evaluate_statement(cur.left) self.evaluate_statement(cur.right) def evaluate_uminusstatement(self, cur: UMinusNode) -> None: self.evaluate_statement(cur.value) def evaluate_ternary(self, node: TernaryNode) -> None: assert isinstance(node, TernaryNode) self.evaluate_statement(node.condition) self.evaluate_statement(node.trueblock) self.evaluate_statement(node.falseblock) def evaluate_dictstatement(self, node: mparser.DictNode) -> None: for k, v in node.args.kwargs.items(): self.evaluate_statement(k) self.evaluate_statement(v) def evaluate_indexing(self, node: IndexNode) -> None: self.evaluate_statement(node.iobject) self.evaluate_statement(node.index) def reduce_arguments( self, args: mparser.ArgumentNode, key_resolver: T.Callable[[mparser.BaseNode], str] = default_resolve_key, duplicate_key_error: T.Optional[str] = None, ) -> T.Tuple[T.List[T.Any], T.Any]: for arg in args.arguments: self.evaluate_statement(arg) for value in args.kwargs.values(): self.evaluate_statement(value) if isinstance(args, ArgumentNode): kwargs = {} for key, val in args.kwargs.items(): kwargs[key_resolver(key)] = val if args.incorrect_order(): raise InvalidArguments('All keyword arguments must be after positional arguments.') return self.flatten_args(args.arguments), kwargs else: return self.flatten_args(args), {} def evaluate_comparison(self, node: ComparisonNode) -> None: self.evaluate_statement(node.left) self.evaluate_statement(node.right) def evaluate_andstatement(self, cur: AndNode) -> None: self.evaluate_statement(cur.left) self.evaluate_statement(cur.right) def evaluate_orstatement(self, cur: OrNode) -> None: self.evaluate_statement(cur.left) self.evaluate_statement(cur.right) def evaluate_notstatement(self, cur: NotNode) -> None: self.evaluate_statement(cur.value) def find_potential_writes(self, node: BaseNode) -> T.Set[str]: if isinstance(node, mparser.ForeachClauseNode): return {el.value for el in node.varnames} | self.find_potential_writes(node.block) elif isinstance(node, mparser.CodeBlockNode): ret = set() for line in node.lines: ret.update(self.find_potential_writes(line)) return ret elif isinstance(node, (AssignmentNode, PlusAssignmentNode)): return set([node.var_name.value]) | self.find_potential_writes(node.value) elif isinstance(node, IdNode): return set() elif isinstance(node, ArrayNode): ret = set() for arg in node.args.arguments: ret.update(self.find_potential_writes(arg)) return ret elif isinstance(node, mparser.DictNode): ret = set() for k, v in node.args.kwargs.items(): ret.update(self.find_potential_writes(k)) ret.update(self.find_potential_writes(v)) return ret elif isinstance(node, FunctionNode): ret = set() for arg in node.args.arguments: ret.update(self.find_potential_writes(arg)) for arg in node.args.kwargs.values(): ret.update(self.find_potential_writes(arg)) return ret elif isinstance(node, MethodNode): ret = self.find_potential_writes(node.source_object) for arg in node.args.arguments: ret.update(self.find_potential_writes(arg)) for arg in node.args.kwargs.values(): ret.update(self.find_potential_writes(arg)) return ret elif isinstance(node, ArithmeticNode): return self.find_potential_writes(node.left) | self.find_potential_writes(node.right) elif isinstance(node, (mparser.NumberNode, mparser.StringNode, mparser.BreakNode, mparser.BooleanNode, mparser.ContinueNode)): return set() elif isinstance(node, mparser.IfClauseNode): if isinstance(node.elseblock, EmptyNode): ret = set() else: ret = self.find_potential_writes(node.elseblock.block) for i in node.ifs: ret.update(self.find_potential_writes(i)) return ret elif isinstance(node, mparser.IndexNode): return self.find_potential_writes(node.iobject) | self.find_potential_writes(node.index) elif isinstance(node, mparser.IfNode): return self.find_potential_writes(node.condition) | self.find_potential_writes(node.block) elif isinstance(node, (mparser.ComparisonNode, mparser.OrNode, mparser.AndNode)): return self.find_potential_writes(node.left) | self.find_potential_writes(node.right) elif isinstance(node, mparser.NotNode): return self.find_potential_writes(node.value) elif isinstance(node, mparser.TernaryNode): return self.find_potential_writes(node.condition) | self.find_potential_writes(node.trueblock) | self.find_potential_writes(node.falseblock) elif isinstance(node, mparser.UMinusNode): return self.find_potential_writes(node.value) elif isinstance(node, mparser.ParenthesizedNode): return self.find_potential_writes(node.inner) raise mesonlib.MesonBugException('Unhandled node type') def evaluate_foreach(self, node: ForeachClauseNode) -> None: asses = self.find_potential_writes(node) for ass in asses: self.cur_assignments[ass].append((self.nesting.copy(), UnknownValue())) try: self.evaluate_codeblock(node.block) except ContinueRequest: pass except BreakRequest: pass for ass in asses: self.cur_assignments[ass].append((self.nesting.copy(), UnknownValue())) # In case the foreach loops 0 times. def evaluate_if(self, node: IfClauseNode) -> None: self.nesting.append(0) for i in node.ifs: self.evaluate_codeblock(i.block) self.nesting[-1] += 1 if not isinstance(node.elseblock, EmptyNode): self.evaluate_codeblock(node.elseblock.block) self.nesting.pop() for var_name in self.cur_assignments: potential_values = [] oldval = self.get_cur_value_if_defined(var_name) if not isinstance(oldval, UndefinedVariable): potential_values.append(oldval) for nesting, value in self.cur_assignments[var_name]: if len(nesting) > len(self.nesting): potential_values.append(value) self.cur_assignments[var_name] = [(nesting, v) for (nesting, v) in self.cur_assignments[var_name] if len(nesting) <= len(self.nesting)] if len(potential_values) > 1 or (len(potential_values) > 0 and isinstance(oldval, UndefinedVariable)): uv = UnknownValue() for pv in potential_values: self.dataflow_dag.add_edge(pv, uv) self.cur_assignments[var_name].append((self.nesting.copy(), uv)) def func_files(self, node: BaseNode, args: T.List[TYPE_var], kwargs: T.Dict[str, TYPE_var]) -> T.Any: ret: T.List[T.Union[IntrospectionFile, UnknownValue]] = [] for arg in args: if isinstance(arg, str): ret.append(IntrospectionFile(self.subdir, arg)) elif isinstance(arg, UnknownValue): ret.append(UnknownValue()) else: raise TypeError return ret def get_cur_value_if_defined(self, var_name: str) -> T.Union[BaseNode, UnknownValue, UndefinedVariable]: if var_name in {'meson', 'host_machine', 'build_machine', 'target_machine'}: return UnknownValue() ret: T.Union[BaseNode, UnknownValue, UndefinedVariable] = UndefinedVariable() for nesting, value in reversed(self.cur_assignments[var_name]): if len(self.nesting) >= len(nesting) and self.nesting[:len(nesting)] == nesting: ret = value break if isinstance(ret, UndefinedVariable) and self.tainted: return UnknownValue() return ret def get_cur_value(self, var_name: str) -> T.Union[BaseNode, UnknownValue]: ret = self.get_cur_value_if_defined(var_name) if isinstance(ret, UndefinedVariable): path = mlog.get_relative_path(Path(self.current_node.filename), Path(os.getcwd())) mlog.warning(f"{path}:{self.current_node.lineno}:{self.current_node.colno} will always crash if executed, since a variable named `{var_name}` is not defined") # We could add more advanced analysis of code referencing undefined # variables, but it is probably not worth the effort and the # complexity. So we do the simplest thing, returning an # UnknownValue. return UnknownValue() return ret # The function `node_to_runtime_value` takes a node of the ast as an # argument and tries to return the same thing that would be passed to e.g. # `func_message` if you put `message(node)` in your `meson.build` file and # run `meson setup`. If this is not possible, `UnknownValue()` is returned. # There are 3 Reasons why this is sometimes impossible: # 1. Because the meson rewriter is imperfect and has not implemented everything yet # 2. Because the value is different on different machines, example: # ```meson # node = somedep.found() # message(node) # ``` # will print `true` on some machines and `false` on others, so # `node_to_runtime_value` does not know whether to return `true` or # `false` and will return `UnknownValue()`. # 3. Here: # ```meson # foreach x : [1, 2] # node = x # message(node) # endforeach # ``` # `node_to_runtime_value` does not know whether to return `1` or `2` and # will return `UnknownValue()`. # # If you have something like # ``` # node = [123, somedep.found()] # ``` # `node_to_runtime_value` will return `[123, UnknownValue()]`. def node_to_runtime_value(self, node: T.Union[UnknownValue, BaseNode, TYPE_var]) -> T.Any: if isinstance(node, (mparser.StringNode, mparser.BooleanNode, mparser.NumberNode)): return node.value elif isinstance(node, mparser.StringNode): if node.is_fstring: return UnknownValue() else: return node.value elif isinstance(node, list): return [self.node_to_runtime_value(x) for x in node] elif isinstance(node, ArrayNode): return [self.node_to_runtime_value(x) for x in node.args.arguments] elif isinstance(node, mparser.DictNode): return {self.node_to_runtime_value(k): self.node_to_runtime_value(v) for k, v in node.args.kwargs.items()} elif isinstance(node, IdNode): assert len(self.dataflow_dag.tgt_to_srcs[node]) == 1 val = next(iter(self.dataflow_dag.tgt_to_srcs[node])) return self.node_to_runtime_value(val) elif isinstance(node, (MethodNode, FunctionNode)): funcval = self.funcvals[node] if isinstance(funcval, (dict, str)): return funcval else: return self.node_to_runtime_value(funcval) elif isinstance(node, ArithmeticNode): left = self.node_to_runtime_value(node.left) right = self.node_to_runtime_value(node.right) if isinstance(left, list) and isinstance(right, UnknownValue): return left + [right] if isinstance(right, list) and isinstance(left, UnknownValue): return [left] + right if isinstance(left, UnknownValue) or isinstance(right, UnknownValue): return UnknownValue() if node.operation == 'add': if isinstance(left, dict) and isinstance(right, dict): ret = left.copy() for k, v in right.items(): ret[k] = v return ret if isinstance(left, list): if not isinstance(right, list): right = [right] return left + right return left + right elif node.operation == 'sub': return left - right elif node.operation == 'mul': return left * right elif node.operation == 'div': if isinstance(left, int) and isinstance(right, int): return left // right elif isinstance(left, str) and isinstance(right, str): return os.path.join(left, right).replace('\\', '/') elif node.operation == 'mod': if isinstance(left, int) and isinstance(right, int): return left % right elif isinstance(node, (UnknownValue, IntrospectionBuildTarget, IntrospectionFile, IntrospectionDependency, str, bool, int)): return node elif isinstance(node, mparser.IndexNode): iobject = self.node_to_runtime_value(node.iobject) index = self.node_to_runtime_value(node.index) if isinstance(iobject, UnknownValue) or isinstance(index, UnknownValue): return UnknownValue() return iobject[index] elif isinstance(node, mparser.ComparisonNode): left = self.node_to_runtime_value(node.left) right = self.node_to_runtime_value(node.right) if isinstance(left, UnknownValue) or isinstance(right, UnknownValue): return UnknownValue() if node.ctype == '==': return left == right elif node.ctype == '!=': return left != right elif node.ctype == 'in': return left in right elif node.ctype == 'notin': return left not in right elif isinstance(node, mparser.TernaryNode): cond = self.node_to_runtime_value(node.condition) if isinstance(cond, UnknownValue): return UnknownValue() if cond is True: return self.node_to_runtime_value(node.trueblock) if cond is False: return self.node_to_runtime_value(node.falseblock) elif isinstance(node, mparser.OrNode): left = self.node_to_runtime_value(node.left) right = self.node_to_runtime_value(node.right) if isinstance(left, UnknownValue) or isinstance(right, UnknownValue): return UnknownValue() return left or right elif isinstance(node, mparser.AndNode): left = self.node_to_runtime_value(node.left) right = self.node_to_runtime_value(node.right) if isinstance(left, UnknownValue) or isinstance(right, UnknownValue): return UnknownValue() return left and right elif isinstance(node, mparser.UMinusNode): val = self.node_to_runtime_value(node.value) if isinstance(val, UnknownValue): return val if isinstance(val, (int, float)): return -val elif isinstance(node, mparser.NotNode): val = self.node_to_runtime_value(node.value) if isinstance(val, UnknownValue): return val if isinstance(val, bool): return not val elif isinstance(node, mparser.ParenthesizedNode): return self.node_to_runtime_value(node.inner) raise mesonlib.MesonBugException('Unhandled node type') def assignment(self, node: AssignmentNode) -> None: assert isinstance(node, AssignmentNode) self.evaluate_statement(node.value) self.cur_assignments[node.var_name.value].append((self.nesting.copy(), node.value)) self.all_assignment_nodes[node.var_name.value].append(node) def evaluate_plusassign(self, node: PlusAssignmentNode) -> None: assert isinstance(node, PlusAssignmentNode) self.evaluate_statement(node.value) lhs = self.get_cur_value(node.var_name.value) newval: T.Union[UnknownValue, ArithmeticNode] if isinstance(lhs, UnknownValue): newval = UnknownValue() else: newval = mparser.ArithmeticNode(operation='add', left=lhs, operator=_symbol('+'), right=node.value) self.cur_assignments[node.var_name.value].append((self.nesting.copy(), newval)) self.all_assignment_nodes[node.var_name.value].append(node) self.dataflow_dag.add_edge(lhs, newval) self.dataflow_dag.add_edge(node.value, newval) def func_set_variable(self, node: BaseNode, args: T.List[TYPE_var], kwargs: T.Dict[str, TYPE_var]) -> None: assert isinstance(node, FunctionNode) if bool(node.args.kwargs): raise InvalidArguments('set_variable accepts no keyword arguments') if len(node.args.arguments) != 2: raise InvalidArguments('set_variable requires exactly two positional arguments') var_name = args[0] value = node.args.arguments[1] if isinstance(var_name, UnknownValue): self.evaluate_statement(value) self.tainted = True return assert isinstance(var_name, str) equiv = AssignmentNode(var_name=IdNode(Token('', '', 0, 0, 0, (0, 0), var_name)), value=value, operator=_symbol('=')) equiv.ast_id = str(id(equiv)) self.evaluate_statement(equiv) def func_get_variable(self, node: BaseNode, args: T.List[TYPE_var], kwargs: T.Dict[str, TYPE_var]) -> T.Any: assert isinstance(node, FunctionNode) var_name = args[0] assert isinstance(var_name, str) val = self.get_cur_value(var_name) self.dataflow_dag.add_edge(val, node) return val def func_unset_variable(self, node: BaseNode, args: T.List[TYPE_var], kwargs: T.Dict[str, TYPE_var]) -> None: assert isinstance(node, FunctionNode) if bool(node.args.kwargs): raise InvalidArguments('unset_variable accepts no keyword arguments') if len(node.args.arguments) != 1: raise InvalidArguments('unset_variable requires exactly one positional arguments') var_name = args[0] assert isinstance(var_name, str) self.cur_assignments[var_name].append((self.nesting.copy(), node)) def nodes_to_pretty_filelist(self, root_path: Path, subdir: str, nodes: T.List[BaseNode]) -> T.List[T.Union[str, UnknownValue]]: def src_to_abs(src: T.Union[str, IntrospectionFile, UnknownValue]) -> T.Union[str, UnknownValue]: if isinstance(src, str): return os.path.normpath(os.path.join(root_path, subdir, src)) elif isinstance(src, IntrospectionFile): return str(src.to_abs_path(root_path)) elif isinstance(src, UnknownValue): return src else: raise TypeError rtvals: T.List[T.Any] = flatten([self.node_to_runtime_value(sn) for sn in nodes]) return [src_to_abs(x) for x in rtvals] def flatten_args(self, args_raw: T.Union[TYPE_nvar, T.Sequence[TYPE_nvar]], include_unknown_args: bool = False) -> T.List[TYPE_var]: # Make sure we are always dealing with lists if isinstance(args_raw, list): args = args_raw else: args = [args_raw] flattened_args: T.List[TYPE_var] = [] # Resolve the contents of args for i in args: if isinstance(i, BaseNode): resolved = self.node_to_runtime_value(i) if resolved is not None: if not isinstance(resolved, list): resolved = [resolved] flattened_args += resolved elif isinstance(i, (str, bool, int, float, UnknownValue, IntrospectionFile)) or include_unknown_args: flattened_args += [i] else: raise NotImplementedError return flattened_args def evaluate_testcase(self, node: TestCaseClauseNode) -> Disabler | None: return Disabler(subproject=self.subproject) def evaluate_statement(self, cur: mparser.BaseNode) -> T.Optional[InterpreterObject]: if hasattr(cur, 'args'): for arg in cur.args.arguments: self.dataflow_dag.add_edge(arg, cur) for k, v in cur.args.kwargs.items(): self.dataflow_dag.add_edge(v, cur) for attr in ['source_object', 'left', 'right', 'items', 'iobject', 'index', 'condition']: if hasattr(cur, attr): assert isinstance(getattr(cur, attr), mparser.BaseNode) self.dataflow_dag.add_edge(getattr(cur, attr), cur) if isinstance(cur, mparser.IdNode): self.dataflow_dag.add_edge(self.get_cur_value(cur.value), cur) return None else: return super().evaluate_statement(cur) def function_call(self, node: mparser.FunctionNode) -> T.Any: ret = super().function_call(node) if ret is not None: self.funcvals[node] = ret return ret