Skip to content

Commit 3e40977

Browse files
NRL-1215 Fix implementation, refine type hints
1 parent dfdabd7 commit 3e40977

File tree

1 file changed

+41
-40
lines changed

1 file changed

+41
-40
lines changed

layer/nrlf/core/json_duplicate_checker.py

Lines changed: 41 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import json
2-
from typing import List, Tuple, Set, Any, Dict, Union
3-
from nrlf.core.errors import OperationOutcomeError
4-
from nrlf.core.response import SpineErrorConcept
2+
from typing import List, Tuple, Set, Dict
53

6-
JsonValue = List[Any] | Tuple[Any, ...] | Any
7-
JsonPair = Tuple[str, JsonValue]
4+
JsonPrimitive = str | int | float | bool | None
5+
type JsonValue = JsonPrimitive | JsonObject | JsonArray
6+
JsonPair = tuple[str, JsonValue]
7+
JsonObject = list[JsonPair]
8+
JsonArray = list[JsonValue]
89

910
class DuplicateKeyChecker:
1011
"""JSON structure duplicate key detector.
@@ -21,6 +22,20 @@ def __init__(self):
2122
self.duplicate_paths: Set[str] = set()
2223
# Track keys at each path level to detect duplicates
2324
self.key_registry: Dict[str, Dict[str, bool]] = {}
25+
self.current_duplicate_index: Dict[str, int] = {}
26+
27+
def get_path_with_index(self, path: List[str], key: str) -> List[str]:
28+
current_level = '.'.join(path)
29+
index_map = self.current_duplicate_index.setdefault(current_level, {})
30+
count = index_map.get(key, 0)
31+
index_map[key] = count + 1
32+
33+
# If it's the first occurrence, keep the key as is.
34+
# Subsequent occurrences get bracket-indexed.
35+
if count == 0:
36+
return path + [key]
37+
else:
38+
return path + [f"{key}[{count - 1}]"]
2439

2540
def check_key(self, key: str, path: List[str]) -> None:
2641
"""Check if a key at the current path is a duplicate.
@@ -29,53 +44,39 @@ def check_key(self, key: str, path: List[str]) -> None:
2944
nesting level, even if the values differ.
3045
"""
3146
current_level = '.'.join(path)
32-
33-
if current_level not in self.key_registry:
34-
self.key_registry[current_level] = {}
35-
36-
if key in self.key_registry[current_level]:
47+
current_keys = self.key_registry.setdefault(current_level, {})
48+
if key in current_keys:
3749
self.duplicate_keys.add(key)
38-
full_path = '.'.join(path + [key])
39-
self.duplicate_paths.add(full_path)
40-
print(f"Found duplicate key: {key} at path: {full_path}")
50+
self.duplicate_paths.add('.'.join(path + [key]))
51+
print(f"Found duplicate key: {key} at path: {'.'.join(path + [key])}")
4152
else:
42-
self.key_registry[current_level][key] = True
53+
current_keys[key] = True
4354

44-
def traverse_json(self, data: List[JsonPair], path: List[str]) -> None:
45-
"""Traverse JSON structure and check for duplicate keys.
46-
47-
Handles both objects and arrays, maintaining proper path context
48-
during traversal.
49-
"""
55+
def process_collection(self, value: JsonObject | JsonArray, path: list[str], key: str) -> None:
56+
"""Determine if the given 'value' is an object or an array and handle it."""
57+
new_path = self.get_path_with_index(path, key)
58+
if value and isinstance(value[0], tuple):
59+
self.traverse_json(value, new_path)
60+
else:
61+
self.traverse_array(value, new_path)
62+
63+
def traverse_json(self, data: JsonObject, path: list[str]) -> None:
64+
"""Traverse JSON object and check for duplicate keys."""
5065
for key, value in data:
5166
print(f"Processing key: {key}, value: {value}")
5267
self.check_key(key, path)
53-
5468
if isinstance(value, (list, tuple)):
55-
if value and isinstance(value[0], tuple):
56-
# Handle nested object
57-
self.traverse_json(value, path + [key])
58-
else:
59-
# Handle array
60-
self.traverse_array(value, path + [key])
69+
self.process_collection(value, path, key)
6170

62-
def traverse_array(self, items: List[Any], path: List[str]) -> None:
63-
"""Process array items while tracking their indices in the path."""
71+
def traverse_array(self, items: JsonArray, path: list[str]) -> None:
72+
"""Process JSON array items while updating the path for duplicates."""
6473
array_path = path[-1]
6574
base_path = path[:-1]
66-
75+
6776
for idx, item in enumerate(items):
68-
if not isinstance(item, (tuple, list)):
77+
if not isinstance(item, (list, tuple)):
6978
continue
70-
71-
current_path = base_path + [f"{array_path}[{idx}]"]
72-
if item and isinstance(item[0], tuple):
73-
# Handle object in array
74-
pairs = [item] if isinstance(item, tuple) else item
75-
self.traverse_json(pairs, current_path)
76-
else:
77-
# Handle nested array
78-
self.traverse_array(item, current_path)
79+
self.process_collection(item, base_path, f"{array_path}[{idx}]")
7980

8081
def check_duplicate_keys(json_content: str) -> Tuple[List[str], List[str]]:
8182
"""Find all duplicate keys in a JSON string.

0 commit comments

Comments
 (0)