diff options
Diffstat (limited to 'mesonbuild/rewriter.py')
-rw-r--r-- | mesonbuild/rewriter.py | 794 |
1 files changed, 450 insertions, 344 deletions
diff --git a/mesonbuild/rewriter.py b/mesonbuild/rewriter.py index 919bd38..4c2fb11 100644 --- a/mesonbuild/rewriter.py +++ b/mesonbuild/rewriter.py @@ -10,23 +10,29 @@ from __future__ import annotations from .ast import IntrospectionInterpreter, BUILD_TARGET_FUNCTIONS, AstConditionLevel, AstIDGenerator, AstIndentationGenerator, AstPrinter -from mesonbuild.mesonlib import MesonException, setup_vsenv +from .ast.interpreter import IntrospectionBuildTarget, IntrospectionDependency, _symbol +from .interpreterbase import UnknownValue, TV_func +from .interpreterbase.helpers import flatten +from mesonbuild.mesonlib import MesonException, setup_vsenv, relpath from . import mlog, environment from functools import wraps -from .mparser import Token, ArrayNode, ArgumentNode, AssignmentNode, StringNode, BooleanNode, ElementaryNode, IdNode, FunctionNode, SymbolNode -import json, os, re, sys +from .mparser import Token, ArrayNode, ArgumentNode, ArithmeticNode, AssignmentNode, BaseNode, StringNode, BooleanNode, ElementaryNode, IdNode, FunctionNode, PlusAssignmentNode +from .mintro import IntrospectionEncoder +import json, os, re, sys, codecs import typing as T +from pathlib import Path if T.TYPE_CHECKING: - from argparse import ArgumentParser, HelpFormatter - from .mparser import BaseNode + import argparse + from argparse import ArgumentParser, _FormatterClass + from .mlog import AnsiDecorator class RewriterException(MesonException): pass # Note: when adding arguments, please also add them to the completion # scripts in $MESONSRC/data/shell-completions/ -def add_arguments(parser: ArgumentParser, formatter: T.Callable[[str], HelpFormatter]) -> None: +def add_arguments(parser: ArgumentParser, formatter: _FormatterClass) -> None: parser.add_argument('-s', '--sourcedir', type=str, default='.', metavar='SRCDIR', help='Path to source directory.') parser.add_argument('-V', '--verbose', action='store_true', default=False, help='Enable verbose output') parser.add_argument('-S', '--skip-errors', dest='skip', action='store_true', default=False, help='Skip errors instead of aborting') @@ -62,12 +68,14 @@ def add_arguments(parser: ArgumentParser, formatter: T.Callable[[str], HelpForma cmd_parser.add_argument('json', help='JSON string or file to execute') class RequiredKeys: - def __init__(self, keys): + keys: T.Dict[str, T.Any] + + def __init__(self, keys: T.Dict[str, T.Any]): self.keys = keys - def __call__(self, f): + def __call__(self, f: TV_func) -> TV_func: @wraps(f) - def wrapped(*wrapped_args, **wrapped_kwargs): + def wrapped(*wrapped_args: T.Any, **wrapped_kwargs: T.Any) -> T.Any: assert len(wrapped_args) >= 2 cmd = wrapped_args[1] for key, val in self.keys.items(): @@ -90,12 +98,11 @@ class RequiredKeys: .format(key, choices, cmd[key])) return f(*wrapped_args, **wrapped_kwargs) - return wrapped - -def _symbol(val: str) -> SymbolNode: - return SymbolNode(Token('', '', 0, 0, 0, (0, 0), val)) + return T.cast('TV_func', wrapped) class MTypeBase: + node: BaseNode + def __init__(self, node: T.Optional[BaseNode] = None): if node is None: self.node = self.new_node() @@ -107,30 +114,30 @@ class MTypeBase: self.node_type = i @classmethod - def new_node(cls, value=None): + def new_node(cls, value: T.Any = None) -> BaseNode: # Overwrite in derived class raise RewriterException('Internal error: new_node of MTypeBase was called') @classmethod - def supported_nodes(cls): + def supported_nodes(cls) -> T.List[type]: # Overwrite in derived class return [] - def can_modify(self): + def can_modify(self) -> bool: return self.node_type is not None - def get_node(self): + def get_node(self) -> BaseNode: return self.node - def add_value(self, value): + def add_value(self, value: T.Any) -> None: # Overwrite in derived class mlog.warning('Cannot add a value of type', mlog.bold(type(self).__name__), '--> skipping') - def remove_value(self, value): + def remove_value(self, value: T.Any) -> None: # Overwrite in derived class mlog.warning('Cannot remove a value of type', mlog.bold(type(self).__name__), '--> skipping') - def remove_regex(self, value): + def remove_regex(self, value: T.Any) -> None: # Overwrite in derived class mlog.warning('Cannot remove a regex in type', mlog.bold(type(self).__name__), '--> skipping') @@ -139,13 +146,13 @@ class MTypeStr(MTypeBase): super().__init__(node) @classmethod - def new_node(cls, value=None): + def new_node(cls, value: T.Optional[str] = None) -> BaseNode: if value is None: value = '' return StringNode(Token('string', '', 0, 0, 0, None, str(value))) @classmethod - def supported_nodes(cls): + def supported_nodes(cls) -> T.List[type]: return [StringNode] class MTypeBool(MTypeBase): @@ -153,11 +160,11 @@ class MTypeBool(MTypeBase): super().__init__(node) @classmethod - def new_node(cls, value=None): + def new_node(cls, value: T.Optional[str] = None) -> BaseNode: return BooleanNode(Token('', '', 0, 0, 0, None, bool(value))) @classmethod - def supported_nodes(cls): + def supported_nodes(cls) -> T.List[type]: return [BooleanNode] class MTypeID(MTypeBase): @@ -165,21 +172,23 @@ class MTypeID(MTypeBase): super().__init__(node) @classmethod - def new_node(cls, value=None): + def new_node(cls, value: T.Optional[str] = None) -> BaseNode: if value is None: value = '' return IdNode(Token('', '', 0, 0, 0, None, str(value))) @classmethod - def supported_nodes(cls): + def supported_nodes(cls) -> T.List[type]: return [IdNode] class MTypeList(MTypeBase): + node: ArrayNode + def __init__(self, node: T.Optional[BaseNode] = None): super().__init__(node) @classmethod - def new_node(cls, value=None): + def new_node(cls, value: T.Optional[T.List[T.Any]] = None) -> ArrayNode: if value is None: value = [] elif not isinstance(value, list): @@ -189,50 +198,52 @@ class MTypeList(MTypeBase): return ArrayNode(_symbol('['), args, _symbol(']')) @classmethod - def _new_element_node(cls, value): + def _new_element_node(cls, value: T.Any) -> BaseNode: # Overwrite in derived class raise RewriterException('Internal error: _new_element_node of MTypeList was called') - def _ensure_array_node(self): + def _ensure_array_node(self) -> None: if not isinstance(self.node, ArrayNode): tmp = self.node self.node = self.new_node() self.node.args.arguments = [tmp] @staticmethod - def _check_is_equal(node, value) -> bool: + def _check_is_equal(node: BaseNode, value: str) -> bool: # Overwrite in derived class return False @staticmethod - def _check_regex_matches(node, regex: str) -> bool: + def _check_regex_matches(node: BaseNode, regex: str) -> bool: # Overwrite in derived class return False - def get_node(self): + def get_node(self) -> BaseNode: if isinstance(self.node, ArrayNode): if len(self.node.args.arguments) == 1: return self.node.args.arguments[0] return self.node @classmethod - def supported_element_nodes(cls): + def supported_element_nodes(cls) -> T.List[T.Type]: # Overwrite in derived class return [] @classmethod - def supported_nodes(cls): + def supported_nodes(cls) -> T.List[T.Type]: return [ArrayNode] + cls.supported_element_nodes() - def add_value(self, value): + def add_value(self, value: T.Any) -> None: if not isinstance(value, list): value = [value] self._ensure_array_node() for i in value: + assert hasattr(self.node, 'args') # For mypy + assert isinstance(self.node.args, ArgumentNode) # For mypy self.node.args.arguments += [self._new_element_node(i)] - def _remove_helper(self, value, equal_func): - def check_remove_node(node): + def _remove_helper(self, value: T.Any, equal_func: T.Callable[[T.Any, T.Any], bool]) -> None: + def check_remove_node(node: BaseNode) -> bool: for j in value: if equal_func(i, j): return True @@ -241,16 +252,18 @@ class MTypeList(MTypeBase): if not isinstance(value, list): value = [value] self._ensure_array_node() + assert hasattr(self.node, 'args') # For mypy + assert isinstance(self.node.args, ArgumentNode) # For mypy removed_list = [] for i in self.node.args.arguments: if not check_remove_node(i): removed_list += [i] self.node.args.arguments = removed_list - def remove_value(self, value): + def remove_value(self, value: T.Any) -> None: self._remove_helper(value, self._check_is_equal) - def remove_regex(self, regex: str): + def remove_regex(self, regex: str) -> None: self._remove_helper(regex, self._check_regex_matches) class MTypeStrList(MTypeList): @@ -258,23 +271,23 @@ class MTypeStrList(MTypeList): super().__init__(node) @classmethod - def _new_element_node(cls, value): + def _new_element_node(cls, value: str) -> StringNode: return StringNode(Token('string', '', 0, 0, 0, None, str(value))) @staticmethod - def _check_is_equal(node, value) -> bool: + def _check_is_equal(node: BaseNode, value: str) -> bool: if isinstance(node, StringNode): - return node.value == value + return bool(node.value == value) return False @staticmethod - def _check_regex_matches(node, regex: str) -> bool: + def _check_regex_matches(node: BaseNode, regex: str) -> bool: if isinstance(node, StringNode): return re.match(regex, node.value) is not None return False @classmethod - def supported_element_nodes(cls): + def supported_element_nodes(cls) -> T.List[T.Type]: return [StringNode] class MTypeIDList(MTypeList): @@ -282,26 +295,26 @@ class MTypeIDList(MTypeList): super().__init__(node) @classmethod - def _new_element_node(cls, value): + def _new_element_node(cls, value: str) -> IdNode: return IdNode(Token('', '', 0, 0, 0, None, str(value))) @staticmethod - def _check_is_equal(node, value) -> bool: + def _check_is_equal(node: BaseNode, value: str) -> bool: if isinstance(node, IdNode): - return node.value == value + return bool(node.value == value) return False @staticmethod - def _check_regex_matches(node, regex: str) -> bool: + def _check_regex_matches(node: BaseNode, regex: str) -> bool: if isinstance(node, StringNode): return re.match(regex, node.value) is not None return False @classmethod - def supported_element_nodes(cls): + def supported_element_nodes(cls) -> T.List[T.Type]: return [IdNode] -rewriter_keys = { +rewriter_keys: T.Dict[str, T.Dict[str, T.Any]] = { 'default_options': { 'operation': (str, None, ['set', 'delete']), 'options': (dict, {}, None) @@ -355,13 +368,15 @@ rewriter_func_kwargs = { } class Rewriter: + info_dump: T.Optional[T.Dict[str, T.Dict[str, T.Any]]] + def __init__(self, sourcedir: str, generator: str = 'ninja', skip_errors: bool = False): self.sourcedir = sourcedir self.interpreter = IntrospectionInterpreter(sourcedir, '', generator, visitors = [AstIDGenerator(), AstIndentationGenerator(), AstConditionLevel()]) self.skip_errors = skip_errors - self.modified_nodes = [] - self.to_remove_nodes = [] - self.to_add_nodes = [] + self.modified_nodes: T.List[BaseNode] = [] + self.to_remove_nodes: T.List[BaseNode] = [] + self.to_add_nodes: T.List[BaseNode] = [] self.functions = { 'default_options': self.process_default_options, 'kwargs': self.process_kwargs, @@ -369,89 +384,99 @@ class Rewriter: } self.info_dump = None - def analyze_meson(self): + def analyze_meson(self) -> None: mlog.log('Analyzing meson file:', mlog.bold(os.path.join(self.sourcedir, environment.build_filename))) self.interpreter.analyze() mlog.log(' -- Project:', mlog.bold(self.interpreter.project_data['descriptive_name'])) mlog.log(' -- Version:', mlog.cyan(self.interpreter.project_data['version'])) - def add_info(self, cmd_type: str, cmd_id: str, data: dict): + def add_info(self, cmd_type: str, cmd_id: str, data: dict) -> None: if self.info_dump is None: self.info_dump = {} if cmd_type not in self.info_dump: self.info_dump[cmd_type] = {} self.info_dump[cmd_type][cmd_id] = data - def print_info(self): + def print_info(self) -> None: if self.info_dump is None: return - sys.stdout.write(json.dumps(self.info_dump, indent=2)) + sys.stdout.write(json.dumps(self.info_dump, indent=2, cls=IntrospectionEncoder)) - def on_error(self): + def on_error(self) -> T.Tuple[AnsiDecorator, AnsiDecorator]: if self.skip_errors: return mlog.cyan('-->'), mlog.yellow('skipping') return mlog.cyan('-->'), mlog.red('aborting') - def handle_error(self): + def handle_error(self) -> None: if self.skip_errors: return None raise MesonException('Rewriting the meson.build failed') - def find_target(self, target: str): - def check_list(name: str) -> T.List[BaseNode]: - result = [] - for i in self.interpreter.targets: - if name in {i['name'], i['id']}: - result += [i] - return result - - targets = check_list(target) - if targets: - if len(targets) == 1: - return targets[0] - else: - mlog.error('There are multiple targets matching', mlog.bold(target)) - for i in targets: - mlog.error(' -- Target name', mlog.bold(i['name']), 'with ID', mlog.bold(i['id'])) - mlog.error('Please try again with the unique ID of the target', *self.on_error()) - self.handle_error() - return None - - # Check the assignments - tgt = None - if target in self.interpreter.assignments: - node = self.interpreter.assignments[target] - if isinstance(node, FunctionNode): - if node.func_name.value in {'executable', 'jar', 'library', 'shared_library', 'shared_module', 'static_library', 'both_libraries'}: - tgt = self.interpreter.assign_vals[target] - - return tgt - - def find_dependency(self, dependency: str): - def check_list(name: str): - for i in self.interpreter.dependencies: - if name == i['name']: - return i + def all_assignments(self, varname: str) -> T.List[BaseNode]: + assigned_values = [] + for ass in self.interpreter.all_assignment_nodes[varname]: + if isinstance(ass, PlusAssignmentNode): + continue + assert isinstance(ass, AssignmentNode) + assigned_values.append(ass.value) + return assigned_values + + def find_target(self, target: str) -> T.Optional[IntrospectionBuildTarget]: + for i in self.interpreter.targets: + if target == i.id: + return i + + potential_tgts = [] + for i in self.interpreter.targets: + if target == i.name: + potential_tgts.append(i) + + if not potential_tgts: + potenial_tgts_1 = self.all_assignments(target) + potenial_tgts_1 = [self.interpreter.node_to_runtime_value(el) for el in potenial_tgts_1] + potential_tgts = [el for el in potenial_tgts_1 if isinstance(el, IntrospectionBuildTarget)] + + if not potential_tgts: return None + elif len(potential_tgts) == 1: + return potential_tgts[0] + else: + mlog.error('There are multiple targets matching', mlog.bold(target)) + for i in potential_tgts: + mlog.error(' -- Target name', mlog.bold(i.name), 'with ID', mlog.bold(i.id)) + mlog.error('Please try again with the unique ID of the target', *self.on_error()) + self.handle_error() + return None + + def find_dependency(self, dependency: str) -> T.Optional[IntrospectionDependency]: + potential_deps = [] + for i in self.interpreter.dependencies: + if i.name == dependency: + potential_deps.append(i) - dep = check_list(dependency) - if dep is not None: - return dep + checking_varnames = len(potential_deps) == 0 - # Check the assignments - if dependency in self.interpreter.assignments: - node = self.interpreter.assignments[dependency] - if isinstance(node, FunctionNode): - if node.func_name.value == 'dependency': - name = self.interpreter.flatten_args(node.args)[0] - dep = check_list(name) + if checking_varnames: + potential_deps1 = self.all_assignments(dependency) + potential_deps = [self.interpreter.node_to_runtime_value(el) for el in potential_deps1 if isinstance(el, FunctionNode) and el.func_name.value == 'dependency'] - return dep + if not potential_deps: + return None + elif len(potential_deps) == 1: + return potential_deps[0] + else: + mlog.error('There are multiple dependencies matching', mlog.bold(dependency)) + for i in potential_deps: + mlog.error(' -- Dependency name', i) + if checking_varnames: + mlog.error('Please try again with the name of the dependency', *self.on_error()) + self.handle_error() + return None @RequiredKeys(rewriter_keys['default_options']) - def process_default_options(self, cmd): + def process_default_options(self, cmd: T.Dict[str, T.Any]) -> None: # First, remove the old values - kwargs_cmd = { + kwargs_cmd: T.Dict[str, T.Any] = { 'function': 'project', 'id': "/", 'operation': 'remove_regex', @@ -495,7 +520,7 @@ class Rewriter: self.process_kwargs(kwargs_cmd) @RequiredKeys(rewriter_keys['kwargs']) - def process_kwargs(self, cmd): + def process_kwargs(self, cmd: T.Dict[str, T.Any]) -> None: mlog.log('Processing function type', mlog.bold(cmd['function']), 'with id', mlog.cyan("'" + cmd['id'] + "'")) if cmd['function'] not in rewriter_func_kwargs: mlog.error('Unknown function type', cmd['function'], *self.on_error()) @@ -516,26 +541,26 @@ class Rewriter: node = self.interpreter.project_node arg_node = node.args elif cmd['function'] == 'target': - tmp = self.find_target(cmd['id']) - if tmp: - node = tmp['node'] + tmp_tgt = self.find_target(cmd['id']) + if tmp_tgt: + node = tmp_tgt.node arg_node = node.args elif cmd['function'] == 'dependency': - tmp = self.find_dependency(cmd['id']) - if tmp: - node = tmp['node'] + tmp_dep = self.find_dependency(cmd['id']) + if tmp_dep: + node = tmp_dep.node arg_node = node.args if not node: mlog.error('Unable to find the function node') assert isinstance(node, FunctionNode) assert isinstance(arg_node, ArgumentNode) # Transform the key nodes to plain strings - arg_node.kwargs = {k.value: v for k, v in arg_node.kwargs.items()} + kwargs = {T.cast(IdNode, k).value: v for k, v in arg_node.kwargs.items()} # Print kwargs info if cmd['operation'] == 'info': - info_data = {} - for key, val in sorted(arg_node.kwargs.items()): + info_data: T.Dict[str, T.Any] = {} + for key, val in sorted(kwargs.items()): info_data[key] = None if isinstance(val, ElementaryNode): info_data[key] = val.value @@ -561,21 +586,21 @@ class Rewriter: if cmd['operation'] == 'delete': # Remove the key from the kwargs - if key not in arg_node.kwargs: + if key not in kwargs: mlog.log(' -- Key', mlog.bold(key), 'is already deleted') continue mlog.log(' -- Deleting', mlog.bold(key), 'from the kwargs') - del arg_node.kwargs[key] + del kwargs[key] elif cmd['operation'] == 'set': # Replace the key from the kwargs mlog.log(' -- Setting', mlog.bold(key), 'to', mlog.yellow(str(val))) - arg_node.kwargs[key] = kwargs_def[key].new_node(val) + kwargs[key] = kwargs_def[key].new_node(val) else: # Modify the value from the kwargs - if key not in arg_node.kwargs: - arg_node.kwargs[key] = None - modifier = kwargs_def[key](arg_node.kwargs[key]) + if key not in kwargs: + kwargs[key] = None + modifier = kwargs_def[key](kwargs[key]) if not modifier.can_modify(): mlog.log(' -- Skipping', mlog.bold(key), 'because it is too complex to modify') continue @@ -593,24 +618,251 @@ class Rewriter: modifier.remove_regex(val) # Write back the result - arg_node.kwargs[key] = modifier.get_node() + kwargs[key] = modifier.get_node() num_changed += 1 # Convert the keys back to IdNode's - arg_node.kwargs = {IdNode(Token('', '', 0, 0, 0, None, k)): v for k, v in arg_node.kwargs.items()} + arg_node.kwargs = {IdNode(Token('', '', 0, 0, 0, None, k)): v for k, v in kwargs.items()} for k, v in arg_node.kwargs.items(): k.level = v.level if num_changed > 0 and node not in self.modified_nodes: self.modified_nodes += [node] - def find_assignment_node(self, node: BaseNode) -> AssignmentNode: - if node.ast_id and node.ast_id in self.interpreter.reverse_assignment: - return self.interpreter.reverse_assignment[node.ast_id] + def find_assignment_node(self, node: BaseNode) -> T.Optional[AssignmentNode]: + for k, v in self.interpreter.all_assignment_nodes.items(): + for ass in v: + if ass.value == node: + return ass return None + def affects_no_other_targets(self, candidate: BaseNode) -> bool: + affected = self.interpreter.dataflow_dag.reachable({candidate}, False) + affected_targets = [x for x in affected if isinstance(x, FunctionNode) and x.func_name.value in BUILD_TARGET_FUNCTIONS] + return len(affected_targets) == 1 + + def get_relto(self, target_node: BaseNode, node: BaseNode) -> Path: + cwd = Path(os.getcwd()) + all_paths = self.interpreter.dataflow_dag.find_all_paths(node, target_node) + # len(all_paths) == 0 would imply that data does not flow from node to + # target_node. This would imply that adding sources to node would not + # add the source to the target. + assert all_paths + if len(all_paths) > 1: + return None + return (cwd / next(x for x in all_paths[0] if isinstance(x, FunctionNode)).filename).parent + + def add_src_or_extra(self, op: str, target: IntrospectionBuildTarget, newfiles: T.List[str], to_sort_nodes: T.List[T.Union[FunctionNode, ArrayNode]]) -> None: + assert op in {'src_add', 'extra_files_add'} + + if op == 'src_add': + old: T.Set[T.Union[BaseNode, UnknownValue]] = set(target.source_nodes) + elif op == 'extra_files_add': + if target.extra_files is None: + old = set() + else: + old = {target.extra_files} + tgt_function: FunctionNode = target.node + + cwd = Path(os.getcwd()) + target_dir_abs = cwd / os.path.dirname(target.node.filename) + source_root_abs = cwd / self.interpreter.source_root + + candidates1 = self.interpreter.dataflow_dag.reachable(old, True) + # A node is a member of the set `candidates1` exactly if data from this node + # flow into one of the `dest` nodes. We assume that this implies that if we + # add `foo.c` to this node, then 'foo.c' will be added to one of these + # nodes. This assumption is not always true: + # ar = ['a.c', 'b.c'] + # srcs = ar[1] + # executable('name', srcs) + # Data flows from `ar` to `srcs`, but if we add 'foo.c': + # ar = ['a.c', 'b.c', 'foo.c'] + # srcs = ar[1] + # executable('name', srcs) + # this does not add 'foo.c' to `srcs`. This is a known bug/limitation of + # the meson rewriter that could be fixed by replacing `reachable` with a + # more advanced analysis. But this is a lot of work and I think e.g. + # `srcs = ar[1]` is rare in real-world projects, so I will just leave + # this for now. + + candidates2 = {x for x in candidates1 if isinstance(x, (FunctionNode, ArrayNode))} + + # If we have this meson.build file: + # shared = ['shared.c'] + # executable('foo', shared + ['foo.c']) + # executable('bar', shared + ['bar.c']) + # and we are tasked with adding 'new.c' to 'foo', we should do e.g this: + # shared = ['shared.c'] + # executable('foo', shared + ['foo.c', 'new.c']) + # executable('bar', shared + ['bar.c']) + # but never this: + # shared = ['shared.c', 'new.c'] + # executable('foo', shared + ['foo.c']) + # executable('bar', shared + ['bar.c']) + # We do this by removing the `['shared.c']`-node from `candidates2`. + candidates2 = {x for x in candidates2 if self.affects_no_other_targets(x)} + + def path_contains_unknowns(candidate: BaseNode) -> bool: + all_paths = self.interpreter.dataflow_dag.find_all_paths(candidate, target.node) + for path in all_paths: + for el in path: + if isinstance(el, UnknownValue): + return True + return False + + candidates2 = {x for x in candidates2 if not path_contains_unknowns(x)} + + candidates2 = {x for x in candidates2 if self.get_relto(target.node, x) is not None} + + chosen: T.Union[FunctionNode, ArrayNode] = None + new_kwarg_flag = False + if len(candidates2) > 0: + # So that files(['a', 'b']) gets modified to files(['a', 'b', 'c']) instead of files(['a', 'b'], 'c') + if len({x for x in candidates2 if isinstance(x, ArrayNode)}) > 0: + candidates2 = {x for x in candidates2 if isinstance(x, ArrayNode)} + + # We choose one more or less arbitrary candidate + chosen = min(candidates2, key=lambda x: (x.lineno, x.colno)) + elif op == 'src_add': + chosen = target.node + elif op == 'extra_files_add': + chosen = ArrayNode(_symbol('['), ArgumentNode(Token('', tgt_function.filename, 0, 0, 0, None, '[]')), _symbol(']')) + + # this is fundamentally error prone + self.interpreter.dataflow_dag.add_edge(chosen, target.node) + + extra_files_idnode = IdNode(Token('string', tgt_function.filename, 0, 0, 0, None, 'extra_files')) + if tgt_function not in self.modified_nodes: + self.modified_nodes += [tgt_function] + new_extra_files_node: BaseNode + if target.node.args.get_kwarg_or_default('extra_files', None) is None: + # Target has no extra_files kwarg, create one + new_kwarg_flag = True + new_extra_files_node = chosen + else: + new_kwarg_flag = True + old_extra_files = target.node.args.get_kwarg_or_default('extra_files', None) + target.node.args.kwargs = {k: v for k, v in target.node.args.kwargs.items() if not (isinstance(k, IdNode) and k.value == 'extra_files')} + new_extra_files_node = ArithmeticNode('add', old_extra_files, _symbol('+'), chosen) + + tgt_function.args.kwargs[extra_files_idnode] = new_extra_files_node + + newfiles_relto = self.get_relto(target.node, chosen) + old_src_list: T.List[T.Any] = flatten([self.interpreter.node_to_runtime_value(sn) for sn in old]) + + if op == 'src_add': + name = 'Source' + elif op == 'extra_files_add': + name = 'Extra file' + # Generate the new String nodes + to_append = [] + added = [] + + old_src_list = [(target_dir_abs / x).resolve() if isinstance(x, str) else x.to_abs_path(source_root_abs) for x in old_src_list if not isinstance(x, UnknownValue)] + for _newf in sorted(set(newfiles)): + newf = Path(_newf) + if os.path.isabs(newf): + newf = Path(newf) + else: + newf = source_root_abs / newf + if newf in old_src_list: + mlog.log(' -- ', name, mlog.green(str(newf)), 'is already defined for the target --> skipping') + continue + + mlog.log(' -- Adding ', name.lower(), mlog.green(str(newf)), 'at', + mlog.yellow(f'{chosen.filename}:{chosen.lineno}')) + added.append(newf) + mocktarget = self.interpreter.funcvals[target.node] + assert isinstance(mocktarget, IntrospectionBuildTarget) + # print("adding ", str(newf), 'to', mocktarget.name) todo: should we write something to stderr? + + path = relpath(newf, newfiles_relto) + path = codecs.encode(path, 'unicode_escape').decode() # Because the StringNode constructor does the inverse + token = Token('string', chosen.filename, 0, 0, 0, None, path) + to_append += [StringNode(token)] + + assert isinstance(chosen, (FunctionNode, ArrayNode)) + arg_node = chosen.args + # Append to the AST at the right place + arg_node.arguments += to_append + + # Mark the node as modified + if chosen not in to_sort_nodes: + to_sort_nodes += [chosen] + # If the extra_files array is newly created, i.e. if new_kwarg_flag is + # True, don't mark it as its parent function node already is, otherwise + # this would cause double modification. + if chosen not in self.modified_nodes and not new_kwarg_flag: + self.modified_nodes += [chosen] + + # Utility function to get a list of the sources from a node + def arg_list_from_node(self, n: BaseNode) -> T.List[BaseNode]: + args = [] + if isinstance(n, FunctionNode): + args = list(n.args.arguments) + if n.func_name.value in BUILD_TARGET_FUNCTIONS: + args.pop(0) + elif isinstance(n, ArrayNode): + args = n.args.arguments + elif isinstance(n, ArgumentNode): + args = n.arguments + return args + + def rm_src_or_extra(self, op: str, target: IntrospectionBuildTarget, to_be_removed: T.List[str], to_sort_nodes: T.List[T.Union[FunctionNode, ArrayNode]]) -> None: + assert op in {'src_rm', 'extra_files_rm'} + cwd = Path(os.getcwd()) + source_root_abs = cwd / self.interpreter.source_root + + # Helper to find the exact string node and its parent + def find_node(src: str) -> T.Tuple[T.Optional[BaseNode], T.Optional[StringNode]]: + if op == 'src_rm': + nodes = self.interpreter.dataflow_dag.reachable(set(target.source_nodes), True).union({target.node}) + elif op == 'extra_files_rm': + nodes = self.interpreter.dataflow_dag.reachable({target.extra_files}, True) + for i in nodes: + if isinstance(i, UnknownValue): + continue + relto = self.get_relto(target.node, i) + if relto is not None: + for j in self.arg_list_from_node(i): + if isinstance(j, StringNode): + if os.path.normpath(relto / j.value) == os.path.normpath(source_root_abs / src): + return i, j + return None, None + + if op == 'src_rm': + name = 'source' + elif op == 'extra_files_rm': + name = 'extra file' + + for i in to_be_removed: + # Try to find the node with the source string + root, string_node = find_node(i) + if root is None: + mlog.warning(' -- Unable to find', name, mlog.green(i), 'in the target') + continue + if not self.affects_no_other_targets(string_node): + mlog.warning(' -- Removing the', name, mlog.green(i), 'is too compilicated') + continue + + if not isinstance(root, (FunctionNode, ArrayNode)): + raise NotImplementedError # I'm lazy + + # Remove the found string node from the argument list + arg_node = root.args + mlog.log(' -- Removing', name, mlog.green(i), 'from', + mlog.yellow(f'{string_node.filename}:{string_node.lineno}')) + arg_node.arguments.remove(string_node) + + # Mark the node as modified + if root not in to_sort_nodes: + to_sort_nodes += [root] + if root not in self.modified_nodes: + self.modified_nodes += [root] + @RequiredKeys(rewriter_keys['target']) - def process_target(self, cmd): + def process_target(self, cmd: T.Dict[str, T.Any]) -> None: mlog.log('Processing target', mlog.bold(cmd['target']), 'operation', mlog.cyan(cmd['operation'])) target = self.find_target(cmd['target']) if target is None and cmd['operation'] != 'target_add': @@ -619,7 +871,7 @@ class Rewriter: # Make source paths relative to the current subdir def rel_source(src: str) -> str: - subdir = os.path.abspath(os.path.join(self.sourcedir, target['subdir'])) + subdir = os.path.abspath(os.path.join(self.sourcedir, target.subdir)) if os.path.isabs(src): return os.path.relpath(src, subdir) elif not os.path.exists(src): @@ -630,180 +882,13 @@ class Rewriter: if target is not None: cmd['sources'] = [rel_source(x) for x in cmd['sources']] - # Utility function to get a list of the sources from a node - def arg_list_from_node(n): - args = [] - if isinstance(n, FunctionNode): - args = list(n.args.arguments) - if n.func_name.value in BUILD_TARGET_FUNCTIONS: - args.pop(0) - elif isinstance(n, ArrayNode): - args = n.args.arguments - elif isinstance(n, ArgumentNode): - args = n.arguments - return args - - to_sort_nodes = [] - - if cmd['operation'] == 'src_add': - node = None - if target['sources']: - node = target['sources'][0] - else: - node = target['node'] - assert node is not None - - # Generate the current source list - src_list = [] - for i in target['sources']: - for j in arg_list_from_node(i): - if isinstance(j, StringNode): - src_list += [j.value] - - # Generate the new String nodes - to_append = [] - for i in sorted(set(cmd['sources'])): - if i in src_list: - mlog.log(' -- Source', mlog.green(i), 'is already defined for the target --> skipping') - continue - mlog.log(' -- Adding source', mlog.green(i), 'at', - mlog.yellow(f'{node.filename}:{node.lineno}')) - token = Token('string', node.filename, 0, 0, 0, None, i) - to_append += [StringNode(token)] - - # Append to the AST at the right place - arg_node = None - if isinstance(node, (FunctionNode, ArrayNode)): - arg_node = node.args - elif isinstance(node, ArgumentNode): - arg_node = node - assert arg_node is not None - arg_node.arguments += to_append - - # Mark the node as modified - if arg_node not in to_sort_nodes and not isinstance(node, FunctionNode): - to_sort_nodes += [arg_node] - if node not in self.modified_nodes: - self.modified_nodes += [node] - - elif cmd['operation'] == 'src_rm': - # Helper to find the exact string node and its parent - def find_node(src): - for i in target['sources']: - for j in arg_list_from_node(i): - if isinstance(j, StringNode): - if j.value == src: - return i, j - return None, None - - for i in cmd['sources']: - # Try to find the node with the source string - root, string_node = find_node(i) - if root is None: - mlog.warning(' -- Unable to find source', mlog.green(i), 'in the target') - continue - - # Remove the found string node from the argument list - arg_node = None - if isinstance(root, (FunctionNode, ArrayNode)): - arg_node = root.args - elif isinstance(root, ArgumentNode): - arg_node = root - assert arg_node is not None - mlog.log(' -- Removing source', mlog.green(i), 'from', - mlog.yellow(f'{string_node.filename}:{string_node.lineno}')) - arg_node.arguments.remove(string_node) - - # Mark the node as modified - if arg_node not in to_sort_nodes and not isinstance(root, FunctionNode): - to_sort_nodes += [arg_node] - if root not in self.modified_nodes: - self.modified_nodes += [root] - - elif cmd['operation'] == 'extra_files_add': - tgt_function: FunctionNode = target['node'] - mark_array = True - try: - node = target['extra_files'][0] - except IndexError: - # Specifying `extra_files` with a list that flattens to empty gives an empty - # target['extra_files'] list, account for that. - try: - extra_files_key = next(k for k in tgt_function.args.kwargs.keys() if isinstance(k, IdNode) and k.value == 'extra_files') - node = tgt_function.args.kwargs[extra_files_key] - except StopIteration: - # Target has no extra_files kwarg, create one - node = ArrayNode(_symbol('['), ArgumentNode(Token('', tgt_function.filename, 0, 0, 0, None, '[]')), _symbol(']')) - tgt_function.args.kwargs[IdNode(Token('string', tgt_function.filename, 0, 0, 0, None, 'extra_files'))] = node - mark_array = False - if tgt_function not in self.modified_nodes: - self.modified_nodes += [tgt_function] - target['extra_files'] = [node] - if isinstance(node, IdNode): - node = self.interpreter.assignments[node.value] - target['extra_files'] = [node] - if not isinstance(node, ArrayNode): - mlog.error('Target', mlog.bold(cmd['target']), 'extra_files argument must be a list', *self.on_error()) - return self.handle_error() - - # Generate the current extra files list - extra_files_list = [] - for i in target['extra_files']: - for j in arg_list_from_node(i): - if isinstance(j, StringNode): - extra_files_list += [j.value] - - # Generate the new String nodes - to_append = [] - for i in sorted(set(cmd['sources'])): - if i in extra_files_list: - mlog.log(' -- Extra file', mlog.green(i), 'is already defined for the target --> skipping') - continue - mlog.log(' -- Adding extra file', mlog.green(i), 'at', - mlog.yellow(f'{node.filename}:{node.lineno}')) - token = Token('string', node.filename, 0, 0, 0, None, i) - to_append += [StringNode(token)] - - # Append to the AST at the right place - arg_node = node.args - arg_node.arguments += to_append - - # Mark the node as modified - if arg_node not in to_sort_nodes: - to_sort_nodes += [arg_node] - # If the extra_files array is newly created, don't mark it as its parent function node already is, - # otherwise this would cause double modification. - if mark_array and node not in self.modified_nodes: - self.modified_nodes += [node] - - elif cmd['operation'] == 'extra_files_rm': - # Helper to find the exact string node and its parent - def find_node(src): - for i in target['extra_files']: - for j in arg_list_from_node(i): - if isinstance(j, StringNode): - if j.value == src: - return i, j - return None, None - - for i in cmd['sources']: - # Try to find the node with the source string - root, string_node = find_node(i) - if root is None: - mlog.warning(' -- Unable to find extra file', mlog.green(i), 'in the target') - continue + to_sort_nodes: T.List[T.Union[FunctionNode, ArrayNode]] = [] - # Remove the found string node from the argument list - arg_node = root.args - mlog.log(' -- Removing extra file', mlog.green(i), 'from', - mlog.yellow(f'{string_node.filename}:{string_node.lineno}')) - arg_node.arguments.remove(string_node) + if cmd['operation'] in {'src_add', 'extra_files_add'}: + self.add_src_or_extra(cmd['operation'], target, cmd['sources'], to_sort_nodes) - # Mark the node as modified - if arg_node not in to_sort_nodes and not isinstance(root, FunctionNode): - to_sort_nodes += [arg_node] - if root not in self.modified_nodes: - self.modified_nodes += [root] + elif cmd['operation'] in {'src_rm', 'extra_files_rm'}: + self.rm_src_or_extra(cmd['operation'], target, cmd['sources'], to_sort_nodes) elif cmd['operation'] == 'target_add': if target is not None: @@ -813,7 +898,7 @@ class Rewriter: id_base = re.sub(r'[- ]', '_', cmd['target']) target_id = id_base + '_exe' if cmd['target_type'] == 'executable' else '_lib' source_id = id_base + '_sources' - filename = os.path.join(cmd['subdir'], environment.build_filename) + filename = os.path.join(os.getcwd(), self.interpreter.source_root, cmd['subdir'], environment.build_filename) # Build src list src_arg_node = ArgumentNode(Token('string', filename, 0, 0, 0, None, '')) @@ -838,44 +923,55 @@ class Rewriter: self.to_add_nodes += [src_ass_node, tgt_ass_node] elif cmd['operation'] == 'target_rm': - to_remove = self.find_assignment_node(target['node']) + to_remove: BaseNode = self.find_assignment_node(target.node) if to_remove is None: - to_remove = target['node'] + to_remove = target.node self.to_remove_nodes += [to_remove] mlog.log(' -- Removing target', mlog.green(cmd['target']), 'at', mlog.yellow(f'{to_remove.filename}:{to_remove.lineno}')) elif cmd['operation'] == 'info': # T.List all sources in the target - src_list = [] - for i in target['sources']: - for j in arg_list_from_node(i): - if isinstance(j, StringNode): - src_list += [j.value] - extra_files_list = [] - for i in target['extra_files']: - for j in arg_list_from_node(i): - if isinstance(j, StringNode): - extra_files_list += [j.value] + + cwd = Path(os.getcwd()) + source_root_abs = cwd / self.interpreter.source_root + + src_list = self.interpreter.nodes_to_pretty_filelist(source_root_abs, target.subdir, target.source_nodes) + extra_files_list = self.interpreter.nodes_to_pretty_filelist(source_root_abs, target.subdir, [target.extra_files] if target.extra_files else []) + + src_list = ['unknown' if isinstance(x, UnknownValue) else relpath(x, source_root_abs) for x in src_list] + extra_files_list = ['unknown' if isinstance(x, UnknownValue) else relpath(x, source_root_abs) for x in extra_files_list] + test_data = { - 'name': target['name'], + 'name': target.name, 'sources': src_list, 'extra_files': extra_files_list } - self.add_info('target', target['id'], test_data) + self.add_info('target', target.id, test_data) # Sort files for i in to_sort_nodes: - convert = lambda text: int(text) if text.isdigit() else text.lower() - alphanum_key = lambda key: [convert(c) for c in re.split('([0-9]+)', key)] - path_sorter = lambda key: ([(key.count('/') <= idx, alphanum_key(x)) for idx, x in enumerate(key.split('/'))]) + def convert(text: str) -> T.Union[int, str]: + return int(text) if text.isdigit() else text.lower() + + def alphanum_key(key: str) -> T.List[T.Union[int, str]]: + return [convert(c) for c in re.split('([0-9]+)', key)] - unknown = [x for x in i.arguments if not isinstance(x, StringNode)] - sources = [x for x in i.arguments if isinstance(x, StringNode)] + def path_sorter(key: str) -> T.List[T.Tuple[bool, T.List[T.Union[int, str]]]]: + return [(key.count('/') <= idx, alphanum_key(x)) for idx, x in enumerate(key.split('/'))] + + if isinstance(i, FunctionNode) and i.func_name.value in BUILD_TARGET_FUNCTIONS: + src_args = i.args.arguments[1:] + target_name = [i.args.arguments[0]] + else: + src_args = i.args.arguments + target_name = [] + unknown: T.List[BaseNode] = [x for x in src_args if not isinstance(x, StringNode)] + sources: T.List[StringNode] = [x for x in src_args if isinstance(x, StringNode)] sources = sorted(sources, key=lambda x: path_sorter(x.value)) - i.arguments = unknown + sources + i.args.arguments = target_name + unknown + T.cast(T.List[BaseNode], sources) - def process(self, cmd): + def process(self, cmd: T.Dict[str, T.Any]) -> None: if 'type' not in cmd: raise RewriterException('Command has no key "type"') if cmd['type'] not in self.functions: @@ -883,7 +979,7 @@ class Rewriter: .format(cmd['type'], list(self.functions.keys()))) self.functions[cmd['type']](cmd) - def apply_changes(self): + def apply_changes(self) -> None: assert all(hasattr(x, 'lineno') and hasattr(x, 'colno') and hasattr(x, 'filename') for x in self.modified_nodes) assert all(hasattr(x, 'lineno') and hasattr(x, 'colno') and hasattr(x, 'filename') for x in self.to_remove_nodes) assert all(isinstance(x, (ArrayNode, FunctionNode)) for x in self.modified_nodes) @@ -891,7 +987,7 @@ class Rewriter: # Sort based on line and column in reversed order work_nodes = [{'node': x, 'action': 'modify'} for x in self.modified_nodes] work_nodes += [{'node': x, 'action': 'rm'} for x in self.to_remove_nodes] - work_nodes = sorted(work_nodes, key=lambda x: (x['node'].lineno, x['node'].colno), reverse=True) + work_nodes = sorted(work_nodes, key=lambda x: (T.cast(BaseNode, x['node']).lineno, T.cast(BaseNode, x['node']).colno), reverse=True) work_nodes += [{'node': x, 'action': 'add'} for x in self.to_add_nodes] # Generating the new replacement string @@ -900,11 +996,11 @@ class Rewriter: new_data = '' if i['action'] == 'modify' or i['action'] == 'add': printer = AstPrinter() - i['node'].accept(printer) + T.cast(BaseNode, i['node']).accept(printer) printer.post_process() new_data = printer.result.strip() data = { - 'file': i['node'].filename, + 'file': T.cast(BaseNode, i['node']).filename, 'str': new_data, 'node': i['node'], 'action': i['action'] @@ -912,11 +1008,11 @@ class Rewriter: str_list += [data] # Load build files - files = {} + files: T.Dict[str, T.Any] = {} for i in str_list: if i['file'] in files: continue - fpath = os.path.realpath(os.path.join(self.sourcedir, i['file'])) + fpath = os.path.realpath(T.cast(str, i['file'])) fdata = '' # Create an empty file if it does not exist if not os.path.exists(fpath): @@ -933,14 +1029,14 @@ class Rewriter: line_offsets += [offset] offset += len(j) - files[i['file']] = { + files[T.cast(str, i['file'])] = { 'path': fpath, 'raw': fdata, 'offsets': line_offsets } # Replace in source code - def remove_node(i): + def remove_node(i: T.Dict[str, T.Any]) -> None: offsets = files[i['file']]['offsets'] raw = files[i['file']]['raw'] node = i['node'] @@ -968,7 +1064,7 @@ class Rewriter: if i['action'] in {'modify', 'rm'}: remove_node(i) elif i['action'] == 'add': - files[i['file']]['raw'] += i['str'] + '\n' + files[T.cast(str, i['file'])]['raw'] += T.cast(str, i['str']) + '\n' # Write the files back for key, val in files.items(): @@ -999,7 +1095,7 @@ def list_to_dict(in_list: T.List[str]) -> T.Dict[str, str]: raise TypeError('in_list parameter of list_to_dict must have an even length.') return result -def generate_target(options) -> T.List[dict]: +def generate_target(options: argparse.Namespace) -> T.List[T.Dict[str, T.Any]]: return [{ 'type': 'target', 'target': options.target, @@ -1009,7 +1105,7 @@ def generate_target(options) -> T.List[dict]: 'target_type': options.tgt_type, }] -def generate_kwargs(options) -> T.List[dict]: +def generate_kwargs(options: argparse.Namespace) -> T.List[T.Dict[str, T.Any]]: return [{ 'type': 'kwargs', 'function': options.function, @@ -1018,19 +1114,19 @@ def generate_kwargs(options) -> T.List[dict]: 'kwargs': list_to_dict(options.kwargs), }] -def generate_def_opts(options) -> T.List[dict]: +def generate_def_opts(options: argparse.Namespace) -> T.List[T.Dict[str, T.Any]]: return [{ 'type': 'default_options', 'operation': options.operation, 'options': list_to_dict(options.options), }] -def generate_cmd(options) -> T.List[dict]: +def generate_cmd(options: argparse.Namespace) -> T.List[T.Dict[str, T.Any]]: if os.path.exists(options.json): with open(options.json, encoding='utf-8') as fp: - return json.load(fp) + return T.cast(T.List[T.Dict[str, T.Any]], json.load(fp)) else: - return json.loads(options.json) + return T.cast(T.List[T.Dict[str, T.Any]], json.loads(options.json)) # Map options.type to the actual type name cli_type_map = { @@ -1043,7 +1139,7 @@ cli_type_map = { 'cmd': generate_cmd, } -def run(options): +def run(options: argparse.Namespace) -> int: mlog.redirect(True) if not options.verbose: mlog.set_quiet() @@ -1062,12 +1158,22 @@ def run(options): if not isinstance(commands, list): raise TypeError('Command is not a list') - for i in commands: - if not isinstance(i, object): + for i, cmd in enumerate(commands): + if not isinstance(cmd, object): raise TypeError('Command is not an object') - rewriter.process(i) + rewriter.process(cmd) + rewriter.apply_changes() + + if i == len(commands) - 1: # Improves the performance, is not necessary for correctness. + break + + rewriter.modified_nodes = [] + rewriter.to_remove_nodes = [] + rewriter.to_add_nodes = [] + # The AST changed, so we need to update every information that was derived from the AST + rewriter.interpreter = IntrospectionInterpreter(rewriter.sourcedir, '', rewriter.interpreter.backend, visitors = [AstIDGenerator(), AstIndentationGenerator(), AstConditionLevel()]) + rewriter.analyze_meson() - rewriter.apply_changes() rewriter.print_info() return 0 except Exception as e: |