|
1 | 1 | import json |
2 | | -from typing import Any, Callable, Dict, Optional, Union |
| 2 | +from typing import Any, Callable, Dict, Optional, Union, List |
3 | 3 | from opentelemetry.baggage import get_baggage |
4 | 4 | from opentelemetry import context |
5 | 5 | from opentelemetry.context import Context |
@@ -124,3 +124,105 @@ def add_user_attributes(span: Span): |
124 | 124 | except Exception as e: |
125 | 125 | logger.warning("Error loading baggage user information", e) |
126 | 126 | pass |
| 127 | + |
| 128 | + |
| 129 | +def redact(value: str) -> str: |
| 130 | + """Redacts all but the last four characters of the given string. |
| 131 | +
|
| 132 | + Args: |
| 133 | + value (str): The string to be redacted. |
| 134 | +
|
| 135 | + Returns: |
| 136 | + str: The redacted string with all but the last four characters |
| 137 | + replaced by asterisks. |
| 138 | + """ |
| 139 | + redaction_length = len(value) - 4 |
| 140 | + stars = "*" * redaction_length |
| 141 | + return f"{stars}{value[-4:]}" |
| 142 | + |
| 143 | + |
| 144 | +def ismatchingkey( |
| 145 | + target_key: str, |
| 146 | + keys_to_match: tuple[str, ...] = ("key", "token", "password"), |
| 147 | +) -> bool: |
| 148 | + """Check if the target key contains any of the specified keys to match. |
| 149 | +
|
| 150 | + Args: |
| 151 | + target_key (str): The key to be checked. |
| 152 | + keys_to_match (tuple[str, ...], optional): A tuple of keys to match |
| 153 | + against the target key. Defaults to ("key", "token"). |
| 154 | +
|
| 155 | + Returns: |
| 156 | + bool: True if any of the keys to match are found in the target key, |
| 157 | + False otherwise. |
| 158 | + """ |
| 159 | + for k in keys_to_match: |
| 160 | + if k in target_key: |
| 161 | + return True |
| 162 | + return False |
| 163 | + |
| 164 | + |
| 165 | +def can_convert_to_dict(s: str) -> bool: |
| 166 | + """Check if a string can be converted to a dictionary. |
| 167 | +
|
| 168 | + This function attempts to load the input string as JSON. If successful, |
| 169 | + it returns True, indicating that the string can be converted to a dictionary. |
| 170 | + Otherwise, it catches ValueError and TypeError exceptions and returns False. |
| 171 | +
|
| 172 | + Args: |
| 173 | + s (str): The input string to be checked. |
| 174 | +
|
| 175 | + Returns: |
| 176 | + bool: True if the string can be converted to a dictionary, False otherwise. |
| 177 | + """ |
| 178 | + try: |
| 179 | + json.loads(s) |
| 180 | + return True |
| 181 | + except (ValueError, TypeError): |
| 182 | + return False |
| 183 | + |
| 184 | + |
| 185 | +def recursive_key_operation( |
| 186 | + data: Optional[Union[Dict[str, Any], List[Any], str]], |
| 187 | + operation: Callable[[str], str], |
| 188 | + keys_to_match: List[str] = ["key", "token", "password"], |
| 189 | +) -> Optional[Union[Dict[str, Any], List[Any], str]]: |
| 190 | + """Recursively traverses a dictionary, list, or JSON string and applies a |
| 191 | + specified operation to the values of keys that match any in the |
| 192 | + `keys_to_match` list. This function is useful for masking sensitive data |
| 193 | + (e.g., keys, tokens, passwords) in nested structures. |
| 194 | +
|
| 195 | + Args: |
| 196 | + data (Optional[Union[Dict[str, Any], List[Any], str]]): The input data |
| 197 | + to traverse. This can bea dictionary, list, or JSON string. If a |
| 198 | + JSON string is provided, it will be parsed into a dictionary before |
| 199 | + processing. |
| 200 | +
|
| 201 | + operation (Callable[[str], str]): A function that takes a string value |
| 202 | + and returns a modified string. This operation is applied to the values |
| 203 | + of keys that match any in `keys_to_match`. |
| 204 | + keys_to_match (List[str]): A list of keys to search for in the data. If |
| 205 | + a key matche any in this list, the corresponding value will be processed |
| 206 | + by the `operation`. Defaults to ["key", "token", "password"]. |
| 207 | +
|
| 208 | + Returns: |
| 209 | + Optional[Union[Dict[str, Any], List[Any], str]]: The modified data structure |
| 210 | + with the operation applied to the values of matched keys. The return type |
| 211 | + matches the input type (dict, list, or str). |
| 212 | + """ |
| 213 | + if isinstance(data, str) and can_convert_to_dict(data): |
| 214 | + data_dict = json.loads(data) |
| 215 | + data = str(recursive_key_operation(data_dict, operation, keys_to_match)) |
| 216 | + elif isinstance(data, dict): |
| 217 | + for key, value in data.items(): |
| 218 | + if ismatchingkey(key, tuple(keys_to_match)) and isinstance(value, str): |
| 219 | + # Apply the operation to the value of the matched key |
| 220 | + data[key] = operation(value) |
| 221 | + else: |
| 222 | + # Recursively process nested dictionaries or lists |
| 223 | + data[key] = recursive_key_operation(value, operation, keys_to_match) |
| 224 | + elif isinstance(data, list): |
| 225 | + for i in range(len(data)): |
| 226 | + data[i] = recursive_key_operation(data[i], operation, keys_to_match) |
| 227 | + |
| 228 | + return data |
0 commit comments