|
2 | 2 | import json |
3 | 3 | import logging |
4 | 4 | from collections.abc import Mapping, Sequence |
5 | | -from typing import Any |
| 5 | +from typing import Any, List, Tuple |
6 | 6 |
|
7 | 7 | from nested_lookup import nested_lookup |
8 | 8 | from ordered_set import OrderedSet |
|
14 | 14 | NON_MERGABLE_KEYS = ("uniqueItems", "insertionOrder") |
15 | 15 | TYPE = "type" |
16 | 16 | REF = "$ref" |
| 17 | +UNPACK_SEQUENCE_IDENTIFIER = "*" |
17 | 18 |
|
18 | 19 |
|
19 | 20 | class FlatteningError(Exception): |
@@ -185,6 +186,133 @@ def traverse_raw_schema(schema: dict, path: tuple): |
185 | 186 | return {} |
186 | 187 |
|
187 | 188 |
|
| 189 | +def traverse_path_for_sequence_members( |
| 190 | + document: dict, path_parts: Sequence, path: list = None |
| 191 | +) -> Tuple[List[object], List[tuple]]: |
| 192 | + """Traverse the paths for all sequence members in the document according to the reference. |
| 193 | +
|
| 194 | + Since the document is presumed to be the reference's base, the base is |
| 195 | + discarded. There is no validation that the reference is valid. |
| 196 | +
|
| 197 | + Differing from traverse, this returns a list of documents and a list of resolved paths. |
| 198 | +
|
| 199 | + :parameter document: document to traverse (dict or list) |
| 200 | + :parameter path_parts: document paths to traverse |
| 201 | + :parameter path: traversed path so far |
| 202 | +
|
| 203 | + :raises ValueError, LookupError: the reference is invalid for this document |
| 204 | +
|
| 205 | + >>> traverse_path_for_sequence_members({"foo": {"bar": [42, 43, 44]}}, tuple()) |
| 206 | + ([{'foo': {'bar': [42, 43, 44]}}], [()]) |
| 207 | + >>> traverse_path_for_sequence_members({"foo": {"bar": [42, 43, 44]}}, ["foo"]) |
| 208 | + ([{'bar': [42, 43, 44]}], [('foo',)]) |
| 209 | + >>> traverse_path_for_sequence_members({"foo": {"bar": [42, 43, 44]}}, ("foo", "bar")) |
| 210 | + ([[42, 43, 44]], [('foo', 'bar')]) |
| 211 | + >>> traverse_path_for_sequence_members({"foo": {"bar": [42, 43, 44]}}, ("foo", "bar", "*")) |
| 212 | + ([42, 43, 44], [('foo', 'bar', 0), ('foo', 'bar', 1), ('foo', 'bar', 2)]) |
| 213 | + >>> traverse_path_for_sequence_members({"foo": {"bar": [{"baz": 1, "bin": 1}, {"baz": 2, "bin": 2}]}}, ("foo", "bar", "*")) |
| 214 | + ([{'baz': 1, 'bin': 1}, {'baz': 2, 'bin': 2}], [('foo', 'bar', 0), ('foo', 'bar', 1)]) |
| 215 | + >>> traverse_path_for_sequence_members({"foo": {"bar": [{"baz": 1, "bin": 1}, {"baz": 2, "bin": 2}]}}, ("foo", "bar", "*", "baz")) |
| 216 | + ([1, 2], [('foo', 'bar', 0, 'baz'), ('foo', 'bar', 1, 'baz')]) |
| 217 | + >>> traverse_path_for_sequence_members({}, ["foo"]) |
| 218 | + Traceback (most recent call last): |
| 219 | + ... |
| 220 | + KeyError: 'foo' |
| 221 | + >>> traverse_path_for_sequence_members([], ["foo"]) |
| 222 | + Traceback (most recent call last): |
| 223 | + ... |
| 224 | + ValueError: invalid literal for int() with base 10: 'foo' |
| 225 | + >>> traverse_path_for_sequence_members([], [0]) |
| 226 | + Traceback (most recent call last): |
| 227 | + ... |
| 228 | + IndexError: list index out of range |
| 229 | + """ |
| 230 | + if path is None: |
| 231 | + path = [] |
| 232 | + if not path_parts: |
| 233 | + return [document], [tuple(path)] |
| 234 | + path_parts = list(path_parts) |
| 235 | + if not isinstance(document, Sequence): |
| 236 | + return _handle_non_sequence_for_traverse(document, path_parts, path) |
| 237 | + return _handle_sequence_for_traverse(document, path_parts, path) |
| 238 | + |
| 239 | + |
| 240 | +def _handle_non_sequence_for_traverse( |
| 241 | + current_document: dict, current_path_parts: list, current_path: list |
| 242 | +) -> Tuple[List[object], List[tuple]]: |
| 243 | + """ |
| 244 | + Handling a non-sequence member for `traverse_path_for_sequence_members` is like the loop block in `traverse`: |
| 245 | +
|
| 246 | + The next path part is the first part in the list of path parts. |
| 247 | + The new document is obtained from the current document using the new path part as the key. |
| 248 | + The next path part is added to the traversed path. |
| 249 | +
|
| 250 | + The traversal continues by recursively calling `traverse_path_for_sequence_members` |
| 251 | + """ |
| 252 | + part_to_handle = current_path_parts.pop(0) |
| 253 | + current_document = current_document[part_to_handle] |
| 254 | + current_path.append(part_to_handle) |
| 255 | + return traverse_path_for_sequence_members( |
| 256 | + current_document, current_path_parts, current_path |
| 257 | + ) |
| 258 | + |
| 259 | + |
| 260 | +def _handle_sequence_for_traverse( |
| 261 | + current_document: Sequence, current_path_parts: list, current_path: list |
| 262 | +) -> Tuple[List[object], List[tuple]]: |
| 263 | + """ |
| 264 | + Check the new path part for the unpack sequence identifier (e.g. '*'), otherwise traverse index and continue: |
| 265 | +
|
| 266 | + The new document is obtained from the current document (a sequence) using the new path part as the index. |
| 267 | + The next path part is added to the traversed path |
| 268 | + """ |
| 269 | + sequence_part = current_path_parts.pop(0) |
| 270 | + if sequence_part == UNPACK_SEQUENCE_IDENTIFIER: |
| 271 | + return _handle_unpack_sequence_for_traverse( |
| 272 | + current_document, current_path_parts, current_path |
| 273 | + ) |
| 274 | + # otherwise, sequence part should be a valid index |
| 275 | + current_sequence_part = int(sequence_part) |
| 276 | + current_document = current_document[current_sequence_part] |
| 277 | + current_path.append(current_sequence_part) |
| 278 | + return [current_document], [tuple(current_path)] |
| 279 | + |
| 280 | + |
| 281 | +def _handle_unpack_sequence_for_traverse( |
| 282 | + current_document: Sequence, current_path_parts: list, current_path: list |
| 283 | +) -> Tuple[List[object], List[tuple]]: |
| 284 | + """ |
| 285 | + When unpacking a sequence, we need to include multiple paths and multiple documents, one for each sequence member. |
| 286 | +
|
| 287 | + For each sequence member: |
| 288 | + Append the traversed paths w/ the sequence index, and get the new document. |
| 289 | + The new document is obtained by traversing the current document using the sequence index. |
| 290 | + The new document is appended to the list of new documents. |
| 291 | +
|
| 292 | + For each new document: |
| 293 | + The remaining document is traversed using the remaining path parts. |
| 294 | + The list of traversed documents and traversed paths are returned. |
| 295 | + """ |
| 296 | + documents = [] |
| 297 | + resolved_paths = [] |
| 298 | + new_documents = [] |
| 299 | + new_paths = [] |
| 300 | + for sequence_index in range(len(current_document)): |
| 301 | + new_paths.append(current_path.copy() + [sequence_index]) |
| 302 | + new_document = traverse_path_for_sequence_members( |
| 303 | + current_document, [sequence_index] + current_path_parts, current_path.copy() |
| 304 | + )[0] |
| 305 | + new_documents.extend(new_document) |
| 306 | + for i in range(len(new_documents)): # pylint: disable=consider-using-enumerate |
| 307 | + new_document = new_documents[i] |
| 308 | + newer_documents, newer_paths = traverse_path_for_sequence_members( |
| 309 | + new_document, current_path_parts, new_paths[i] |
| 310 | + ) |
| 311 | + documents.extend(newer_documents) |
| 312 | + resolved_paths.extend(newer_paths) |
| 313 | + return documents, resolved_paths |
| 314 | + |
| 315 | + |
188 | 316 | def schema_merge(target, src, path): # noqa: C901 # pylint: disable=R0912 |
189 | 317 | """Merges the src schema into the target schema in place. |
190 | 318 |
|
|
0 commit comments