|
| 1 | +""" |
| 2 | +Logprep supports the custom YAML tags :code:`!include`, :code:`!set_anchor` and |
| 3 | +:code:`!load_anchor`. |
| 4 | +Those can be used inside any YAML file that is loaded by Logprep. |
| 5 | +
|
| 6 | +Include tags |
| 7 | +^^^^^^^^^^^^ |
| 8 | +
|
| 9 | +The tag :code:`!include PATH_TO_YAML_FILE` loads a single YAML document from a local file path and |
| 10 | +inserts it in its place. |
| 11 | +
|
| 12 | +Included files can't contain an :code:`!include` tag. |
| 13 | +
|
| 14 | +Example: |
| 15 | +
|
| 16 | +.. code-block:: yaml |
| 17 | + :linenos: |
| 18 | + :caption: Include tag |
| 19 | +
|
| 20 | + filter: to_resolve |
| 21 | + generic_resolver: |
| 22 | + field_mapping: |
| 23 | + to_resolve: resolved |
| 24 | + resolve_list: !include /path/to/resolve_list.yml |
| 25 | +
|
| 26 | +Anchor tags |
| 27 | +^^^^^^^^^^^ |
| 28 | +
|
| 29 | +Anchor tags work similar to regular YAML anchors, but are valid for all documents inside a file or |
| 30 | +stream. |
| 31 | +Tags are set with :code:`!set_anchor(:[0-9])?` and loaded with :code:`!load_anchor(:[0-9])?`. |
| 32 | +Ten anchors can be active inside a single file or stream. |
| 33 | +`!set_anchor` and :code:`!load_anchor` are shorthands for :code:`!set_anchor:0` and |
| 34 | +:code:`!load_anchor:0`. |
| 35 | +
|
| 36 | +`!include` and :code:`!set_anchor` can't be nested inside :code:`!set_anchor`. |
| 37 | +
|
| 38 | +Examples: |
| 39 | +
|
| 40 | +.. code-block:: yaml |
| 41 | + :linenos: |
| 42 | + :caption: Anchor tag without shorthand |
| 43 | +
|
| 44 | + filter: to_resolve |
| 45 | + generic_resolver: |
| 46 | + field_mapping: |
| 47 | + to_resolve: resolved |
| 48 | + resolve_list: !set_anchor:1 |
| 49 | + one: foo |
| 50 | + two: bar |
| 51 | + --- |
| 52 | + filter: to_resolve_2 |
| 53 | + generic_resolver: |
| 54 | + field_mapping: |
| 55 | + to_resolve_2: resolved |
| 56 | + resolve_list: !load_anchor:1 |
| 57 | +
|
| 58 | +.. code-block:: yaml |
| 59 | + :linenos: |
| 60 | + :caption: Anchor tag with shorthand |
| 61 | +
|
| 62 | + filter: to_resolve |
| 63 | + generic_resolver: |
| 64 | + field_mapping: |
| 65 | + to_resolve: resolved |
| 66 | + resolve_list: !set_anchor |
| 67 | + one: foo |
| 68 | + two: bar |
| 69 | + --- |
| 70 | + filter: to_resolve_2 |
| 71 | + generic_resolver: |
| 72 | + field_mapping: |
| 73 | + to_resolve_2: resolved |
| 74 | + resolve_list: !load_anchor |
| 75 | +""" |
| 76 | + |
| 77 | +import os.path |
| 78 | +from typing import Set, Callable, Any |
| 79 | + |
| 80 | +from ruamel.yaml import YAML, Node, BaseConstructor |
| 81 | + |
| 82 | + |
| 83 | +def init_yaml_loader_tags(*loader_types: str) -> None: |
| 84 | + """Add custom tags !include, !set_anchor and !load_anchor to the specified loader types. |
| 85 | +
|
| 86 | + Must be initialized before yaml files have been loaded to take effect. |
| 87 | +
|
| 88 | + Parameters |
| 89 | + ---------- |
| 90 | + *loader_types : str |
| 91 | + Types of loaders for which tags will be initialized (i.e. "safe" or "rt"). |
| 92 | + """ |
| 93 | + |
| 94 | + def include(_yaml: YAML) -> Callable[[BaseConstructor, Node], Any]: |
| 95 | + """Includes the contents of a yaml file specified by the !include tag. |
| 96 | +
|
| 97 | + Parameters |
| 98 | + ---------- |
| 99 | + _yaml : YAML |
| 100 | + Used to load the yaml file that will be included. |
| 101 | +
|
| 102 | + Returns |
| 103 | + ------- |
| 104 | + Yaml data where the !include tag has been replaced by the content of the include file. |
| 105 | + """ |
| 106 | + |
| 107 | + def _include(_: BaseConstructor, node: Node) -> Any: |
| 108 | + if not isinstance(node.value, (str, os.PathLike)): |
| 109 | + raise ValueError(f"'{node.value}' is not a file path") |
| 110 | + if not os.path.isfile(node.value): |
| 111 | + raise FileNotFoundError(node.value) |
| 112 | + with open(node.value, "r", encoding="utf-8") as yaml_file: |
| 113 | + try: |
| 114 | + data = _yaml.load(yaml_file) |
| 115 | + except AttributeError as error: |
| 116 | + raise ValueError(f"'{node.tag} {node.value}' could not be loaded") from error |
| 117 | + if data is None: |
| 118 | + raise ValueError(f"'{node.value}' is empty") |
| 119 | + return data |
| 120 | + |
| 121 | + return _include |
| 122 | + |
| 123 | + def set_anchor( |
| 124 | + _yaml: YAML, _anchors: dict[str, Any], _last_buffer: Set[str] |
| 125 | + ) -> Callable[[BaseConstructor, Node], Any]: |
| 126 | + """Sets a global anchor if the '!set_anchor'tag is used, which is valid within a file. |
| 127 | +
|
| 128 | + Setting it for a node with children stores the children inside the anchor. |
| 129 | + Setting it for a scalar node stores the value of that node inside the anchor. |
| 130 | +
|
| 131 | + Parameters |
| 132 | + ---------- |
| 133 | + _yaml : YAML |
| 134 | + Used to load the yaml file that will be included. |
| 135 | + _anchors : dict[str, Any] |
| 136 | + The dict where all anchors are stored. |
| 137 | + _last_buffer : Set[str] |
| 138 | + Used to check if a different file/stream has been loaded. |
| 139 | +
|
| 140 | + Returns |
| 141 | + ------- |
| 142 | + The loaded yaml data without any modifications. |
| 143 | + """ |
| 144 | + |
| 145 | + def _set_anchor(constructor: BaseConstructor, node: Node) -> Any: |
| 146 | + clear_anchors_if_buffer_changed(constructor, _anchors, _last_buffer) |
| 147 | + |
| 148 | + anchor_name = get_anchor_name(node) |
| 149 | + _anchors[anchor_name] = _extract_anchor_value(constructor, node) |
| 150 | + return _anchors[anchor_name] |
| 151 | + |
| 152 | + def _extract_anchor_value(constructor: BaseConstructor, node: Node) -> Any: |
| 153 | + lines = constructor.loader.reader.buffer.splitlines() |
| 154 | + anchor_value_lines = lines[node.start_mark.line : node.end_mark.line + 1] |
| 155 | + anchor_value_lines[0] = anchor_value_lines[0][node.start_mark.column + len(node.tag) :] |
| 156 | + anchor_value_lines[-1] = anchor_value_lines[-1][: node.end_mark.column] |
| 157 | + anchor_value = "\n".join(anchor_value_lines) |
| 158 | + try: |
| 159 | + data = _yaml.load(anchor_value) |
| 160 | + except AttributeError as error: |
| 161 | + _, _, value = "\\n".join(lines).partition(node.tag) |
| 162 | + raise ValueError(f"'{node.tag}{value}' could not be loaded") from error |
| 163 | + if data is None: |
| 164 | + raise ValueError(f"'{lines[node.start_mark.line]}' is en empty anchor") |
| 165 | + return data |
| 166 | + |
| 167 | + return _set_anchor |
| 168 | + |
| 169 | + def load_anchor( |
| 170 | + _anchors: dict[str, Any], _last_buffer: Set[str] |
| 171 | + ) -> Callable[[BaseConstructor, Node], Any]: |
| 172 | + """Loads a global anchor if the '!load_anchor'tag is used, which is valid within a file. |
| 173 | +
|
| 174 | + Parameters |
| 175 | + ---------- |
| 176 | + _anchors : dict[str, Any] |
| 177 | + The dict where all anchors are stored. |
| 178 | + _last_buffer : Set[str] |
| 179 | + Used to check if a different file/stream has been loaded. |
| 180 | +
|
| 181 | + Returns |
| 182 | + ------- |
| 183 | + Yaml data where the !load_anchor tag has been replaced by the content of the anchor. |
| 184 | + """ |
| 185 | + |
| 186 | + def _load_anchor(constructor: BaseConstructor, node: Node) -> Any: |
| 187 | + clear_anchors_if_buffer_changed(constructor, _anchors, _last_buffer) |
| 188 | + |
| 189 | + anchor_name = get_anchor_name(node) |
| 190 | + try: |
| 191 | + return _anchors[anchor_name] |
| 192 | + except KeyError as error: |
| 193 | + raise ValueError( |
| 194 | + f"'{node.value}' is not a defined anchor within this yaml stream" |
| 195 | + ) from error |
| 196 | + |
| 197 | + return _load_anchor |
| 198 | + |
| 199 | + def clear_anchors_if_buffer_changed( |
| 200 | + constructor: BaseConstructor, _anchors: dict[str, Any], _last_buffer: Set[str] |
| 201 | + ) -> None: |
| 202 | + if constructor.loader.reader.buffer not in _last_buffer: |
| 203 | + _last_buffer.clear() |
| 204 | + _anchors.clear() |
| 205 | + _last_buffer.add(constructor.loader.reader.buffer) |
| 206 | + |
| 207 | + def get_anchor_name(node: Node) -> str: |
| 208 | + anchor_name: str |
| 209 | + _, _, anchor_name = node.tag.partition(":") |
| 210 | + if anchor_name == "": |
| 211 | + anchor_name = "0" |
| 212 | + anchor_name = anchor_name.strip() |
| 213 | + return anchor_name |
| 214 | + |
| 215 | + for loader_type in loader_types: |
| 216 | + yaml = YAML(pure=True, typ=loader_type) |
| 217 | + |
| 218 | + yaml.constructor.add_constructor("!include", include(yaml)) |
| 219 | + |
| 220 | + last_buffer: Set[str] = set() |
| 221 | + anchors: dict[str, Any] = {} |
| 222 | + yaml.constructor.add_constructor("!set_anchor", set_anchor(yaml, anchors, last_buffer)) |
| 223 | + yaml.constructor.add_constructor("!load_anchor", load_anchor(anchors, last_buffer)) |
| 224 | + |
| 225 | + for num in range(10): |
| 226 | + yaml.constructor.add_constructor( |
| 227 | + f"!set_anchor:{num}", set_anchor(yaml, anchors, last_buffer) |
| 228 | + ) |
| 229 | + yaml.constructor.add_constructor( |
| 230 | + f"!load_anchor:{num}", load_anchor(anchors, last_buffer) |
| 231 | + ) |
0 commit comments