|
1 | | -from typing import Dict |
| 1 | +import pydash |
| 2 | + |
| 3 | +from typing import Dict, Any |
2 | 4 |
|
3 | 5 |
|
4 | 6 | def create_result_dict(value=None, meta=None, err=None) -> Dict: |
5 | 7 | return dict(value=value, meta=meta, err=err) |
6 | 8 |
|
7 | 9 |
|
8 | | -def get_path_value_from_dict(key_path: str, input_dict: dict, get_path_value_from_dict_func): |
9 | | - splitted_attribute = key_path.split(".*.") |
10 | | - return get_path_value_from_dict_func(splitted_attribute, input_dict) |
| 10 | +class PydashPathNotFound: |
| 11 | + pass |
| 12 | + |
| 13 | + |
| 14 | +def _get_path_value_from_input_internal(splitted_paths, input_data, place_none_if_not_found=False): |
| 15 | + |
| 16 | + if not splitted_paths: |
| 17 | + return [input_data] if input_data is not PydashPathNotFound else ([None] if place_none_if_not_found else []) |
| 18 | + |
| 19 | + final_data = [] |
| 20 | + expression = splitted_paths[0] |
| 21 | + remaining_paths = splitted_paths[1:] |
| 22 | + |
| 23 | + # Handle wildcard at the beginning (e.g., "*.something") |
| 24 | + if expression == "": |
| 25 | + if isinstance(input_data, list): |
| 26 | + for item in input_data: |
| 27 | + if remaining_paths: |
| 28 | + results = _get_path_value_from_input_internal(remaining_paths, item, place_none_if_not_found) |
| 29 | + final_data.extend(results) |
| 30 | + else: |
| 31 | + final_data.append(item) |
| 32 | + elif isinstance(input_data, dict): |
| 33 | + for value in input_data.values(): |
| 34 | + if remaining_paths: |
| 35 | + results = _get_path_value_from_input_internal(remaining_paths, value, place_none_if_not_found) |
| 36 | + final_data.extend(results) |
| 37 | + else: |
| 38 | + final_data.append(value) |
| 39 | + else: |
| 40 | + # For primitive values with empty expression (wildcard match) |
| 41 | + # Just return the value if no more paths to traverse |
| 42 | + if not remaining_paths: |
| 43 | + final_data.append(input_data) |
| 44 | + return final_data |
| 45 | + |
| 46 | + # Get the value at the current path |
| 47 | + intermediate_val = pydash.get(input_data, expression, default=PydashPathNotFound) |
| 48 | + |
| 49 | + if intermediate_val is PydashPathNotFound: |
| 50 | + return [None] if place_none_if_not_found else [] |
| 51 | + |
| 52 | + # If there are more paths to traverse |
| 53 | + if remaining_paths: |
| 54 | + if isinstance(intermediate_val, list) and remaining_paths[0] == "": |
| 55 | + # For lists with a wildcard marker, iterate over list items |
| 56 | + # Skip the wildcard marker since iteration is implicit for lists |
| 57 | + paths_to_apply = remaining_paths[1:] |
| 58 | + for val in intermediate_val: |
| 59 | + results = _get_path_value_from_input_internal(paths_to_apply, val, place_none_if_not_found) |
| 60 | + final_data.extend(results) |
| 61 | + elif isinstance(intermediate_val, dict) and remaining_paths[0] == "": |
| 62 | + # If it's a dict and next path is a wildcard, iterate over dict values |
| 63 | + # Skip the wildcard marker and apply remaining paths to each value |
| 64 | + for value in intermediate_val.values(): |
| 65 | + results = _get_path_value_from_input_internal(remaining_paths[1:], value, place_none_if_not_found) |
| 66 | + final_data.extend(results) |
| 67 | + else: |
| 68 | + # For non-wildcard paths, continue traversal without iteration |
| 69 | + results = _get_path_value_from_input_internal(remaining_paths, intermediate_val, place_none_if_not_found) |
| 70 | + final_data.extend(results) |
| 71 | + else: |
| 72 | + # This is the final path segment |
| 73 | + final_data.append(intermediate_val) |
| 74 | + |
| 75 | + return final_data |
| 76 | + |
| 77 | + |
| 78 | +def get_path_value_from_input(key_path: str, input: Any, place_none_if_not_found: bool = False): |
| 79 | + """ |
| 80 | + Retrieve values from a nested data structure using a path expression with wildcard support. |
| 81 | +
|
| 82 | + :param key_path: A dot-separated path to traverse the data structure. |
| 83 | + Use ``*`` for wildcard to match all items at that level. |
| 84 | + Supports nested structures including dictionaries, lists, and primitives. |
| 85 | + :type key_path: str |
| 86 | + :param input: The input data structure to search through (dict, list, or primitive). |
| 87 | + :type input: Any |
| 88 | + :param place_none_if_not_found: If True, returns [None] when a path is not found. |
| 89 | + If False, returns an empty list []. Defaults to False. |
| 90 | + :type place_none_if_not_found: bool |
| 91 | + :return: A list of values found at the specified path. Returns empty list or [None] if path not found, |
| 92 | + depending on place_none_if_not_found parameter. |
| 93 | + :rtype: list |
| 94 | +
|
| 95 | + **Examples:** |
| 96 | +
|
| 97 | + Basic path traversal:: |
| 98 | +
|
| 99 | + >>> data = {"user": {"name": "Alice", "age": 30}} |
| 100 | + >>> get_path_value_from_input("user.name", data) |
| 101 | + ["Alice"] |
| 102 | +
|
| 103 | + Wildcard with list items:: |
| 104 | +
|
| 105 | + >>> data = {"users": [{"name": "Alice"}, {"name": "Bob"}]} |
| 106 | + >>> get_path_value_from_input("users.*.name", data) |
| 107 | + ["Alice", "Bob"] |
| 108 | +
|
| 109 | + Wildcard with dictionary values:: |
| 110 | +
|
| 111 | + >>> data = {"countries": {"US": {"capital": "Washington"}, "UK": {"capital": "London"}}} |
| 112 | + >>> get_path_value_from_input("countries.*.capital", data) |
| 113 | + ["Washington", "London"] |
| 114 | +
|
| 115 | + Leading wildcard on lists:: |
| 116 | +
|
| 117 | + >>> data = [{"name": "Alice"}, {"name": "Bob"}] |
| 118 | + >>> get_path_value_from_input("*.name", data) |
| 119 | + ["Alice", "Bob"] |
| 120 | +
|
| 121 | + Wildcard on primitives:: |
| 122 | +
|
| 123 | + >>> get_path_value_from_input("*", 42) |
| 124 | + [42] |
| 125 | + >>> get_path_value_from_input("*", "hello") |
| 126 | + ["hello"] |
| 127 | +
|
| 128 | + Multiple wildcards:: |
| 129 | +
|
| 130 | + >>> data = {"groups": [[{"id": 1}, {"id": 2}], [{"id": 3}]]} |
| 131 | + >>> get_path_value_from_input("groups.*.*.id", data) |
| 132 | + [1, 2, 3] |
| 133 | +
|
| 134 | + Empty path returns input as-is:: |
| 135 | +
|
| 136 | + >>> data = {"key": "value"} |
| 137 | + >>> get_path_value_from_input("", data) |
| 138 | + [{"key": "value"}] |
| 139 | +
|
| 140 | + Path not found behavior:: |
| 141 | +
|
| 142 | + >>> data = {"user": {"name": "Alice"}} |
| 143 | + >>> get_path_value_from_input("missing.path", data) |
| 144 | + [] |
| 145 | + >>> get_path_value_from_input("missing.path", data, place_none_if_not_found=True) |
| 146 | + [None] |
| 147 | + """ |
| 148 | + # Handle empty path - return the input data as is |
| 149 | + if not key_path: |
| 150 | + return [input] |
| 151 | + |
| 152 | + # Split the path by dots and replace '*' with empty string to mark wildcards |
| 153 | + # Empty strings act as markers to iterate over collections (lists or dict values) |
| 154 | + # Example: "users.*.name" -> ["users", "", "name"] |
| 155 | + # "*.name" -> ["", "name"] |
| 156 | + # "numbers.*" -> ["numbers", ""] |
| 157 | + splitted_attribute = key_path.split(".") |
| 158 | + splitted_attribute = ["" if part == "*" else part for part in splitted_attribute] |
| 159 | + |
| 160 | + return _get_path_value_from_input_internal(splitted_attribute, input, place_none_if_not_found) |
11 | 161 |
|
12 | 162 |
|
13 | 163 | class ProviderError: |
|
0 commit comments