|
44 | 44 |
|
45 | 45 | CONNECTION_KEYS = ["rabbit", "database_connection", |
46 | 46 | "slave_connection", "sql_connection"] |
| 47 | + |
| 48 | +# Error messages |
47 | 49 | ERR_STR = "Not a valid string for masking" |
48 | 50 | ERR_FORMAT = "Required a dict for masking" |
| 51 | + |
49 | 52 | # Masking string |
50 | 53 | MASK_STR = "**********" |
51 | 54 |
|
|
64 | 67 | regexes = [gen_regex, con_regex] |
65 | 68 |
|
66 | 69 |
|
| 70 | +# Custom YAML representer to preserve multi-line strings as block scalars |
| 71 | +def str_representer(dumper: Any, data: str) -> Any: |
| 72 | + if '\n' in data: |
| 73 | + return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|') |
| 74 | + return dumper.represent_scalar('tag:yaml.org,2002:str', data) |
| 75 | + |
| 76 | + |
| 77 | +yaml.add_representer(str, str_representer) |
| 78 | + |
| 79 | + |
67 | 80 | class SecretMask(): |
68 | 81 | """ |
69 | 82 | Given a path to k8s secret containing sensitive base64 encoded data, |
@@ -219,6 +232,127 @@ def _process_data(self, data_map: Any) -> Any: |
219 | 232 | return d |
220 | 233 |
|
221 | 234 |
|
| 235 | +class PlaintextMask(): |
| 236 | + """ |
| 237 | + Mask sensitive data in plain text YAML files (ConfigMaps and CRs). |
| 238 | + Unlike SecretMask, this works on non-base64 encoded data. |
| 239 | + """ |
| 240 | + |
| 241 | + def __init__(self, path: Optional[str] = None) -> None: |
| 242 | + self.path: Union[str, None] = path |
| 243 | + |
| 244 | + def mask(self) -> bool: |
| 245 | + """ |
| 246 | + Read a k8s resource (ConfigMap or CR) dumped as yaml and process |
| 247 | + recursively to mask any sensitive information. |
| 248 | + """ |
| 249 | + resource = self._readYaml() |
| 250 | + if not resource or len(resource) == 0: |
| 251 | + return True |
| 252 | + |
| 253 | + # Recursively mask the entire resource |
| 254 | + self._applyMaskRecursive(resource) |
| 255 | + |
| 256 | + # Write the masked file |
| 257 | + self._writeYaml(resource) |
| 258 | + return True |
| 259 | + |
| 260 | + def _readYaml(self) -> Dict[str, Any]: |
| 261 | + """ |
| 262 | + Read and Load the k8s resource dumped as yaml file. |
| 263 | + """ |
| 264 | + try: |
| 265 | + assert self.path is not None |
| 266 | + with open(self.path, 'r') as f: |
| 267 | + resource = yaml.safe_load(f) |
| 268 | + return resource if resource else {} |
| 269 | + except (FileNotFoundError, yaml.YAMLError) as e: |
| 270 | + print(f"Error while reading YAML {self.path}: {e}") |
| 271 | + return {} |
| 272 | + |
| 273 | + def _writeYaml(self, resource: Any) -> None: |
| 274 | + """ |
| 275 | + Re-write the masked resource to the same path. |
| 276 | + """ |
| 277 | + try: |
| 278 | + assert self.path is not None |
| 279 | + with open(self.path, 'w') as f: |
| 280 | + # Write with settings to preserve readability |
| 281 | + yaml.dump(resource, f, default_flow_style=False, |
| 282 | + allow_unicode=True, sort_keys=False) |
| 283 | + except (IOError, yaml.YAMLError) as e: |
| 284 | + print(f"Error while writing the masked file {self.path}: {e}") |
| 285 | + |
| 286 | + def _applyMaskRecursive(self, obj: Any) -> Any: |
| 287 | + """ |
| 288 | + Recursively traverse the object and mask sensitive data. |
| 289 | + Two-step strategy: |
| 290 | + 1. If key name is sensitive AND value is single-line -> fully mask |
| 291 | + 2. If multi-line (has newlines) -> parse content with regex |
| 292 | + """ |
| 293 | + if isinstance(obj, dict): |
| 294 | + for key, value in obj.items(): |
| 295 | + if isinstance(value, str): |
| 296 | + # If key name matches sensitive pattern and value is single-line, fully mask |
| 297 | + # This catches: password: secret123, transport_url: mysql://..., etc. |
| 298 | + if re.search(key_regex, key) and '\n' not in value: |
| 299 | + obj[key] = MASK_STR |
| 300 | + else: |
| 301 | + # Parse content to mask sensitive parts |
| 302 | + # This handles: customServiceConfig blocks (multi-line), long configs, etc. |
| 303 | + obj[key] = self._applyRegex(value) |
| 304 | + elif isinstance(value, (dict, list)): |
| 305 | + # Recursively process nested structures |
| 306 | + self._applyMaskRecursive(value) |
| 307 | + elif isinstance(obj, list): |
| 308 | + for i, item in enumerate(obj): |
| 309 | + if isinstance(item, str): |
| 310 | + obj[i] = self._applyRegex(item) |
| 311 | + elif isinstance(item, (dict, list)): |
| 312 | + self._applyMaskRecursive(item) |
| 313 | + return obj |
| 314 | + |
| 315 | + def _applyRegex(self, text: str) -> str: |
| 316 | + """ |
| 317 | + Apply regex patterns to mask sensitive information in text. |
| 318 | + Handles both single-line and multi-line strings. |
| 319 | + """ |
| 320 | + for pattern in regexes: |
| 321 | + text = re.sub(pattern, r"\1{}".format(MASK_STR), text, flags=re.MULTILINE) |
| 322 | + return text |
| 323 | + |
| 324 | + |
| 325 | +def get_resource_kind(path: str) -> Optional[str]: |
| 326 | + """ |
| 327 | + Read a YAML file and return its 'kind' field to determine resource type. |
| 328 | + Returns None if the file cannot be read or doesn't have a 'kind' field. |
| 329 | + """ |
| 330 | + try: |
| 331 | + with open(path, 'r') as f: |
| 332 | + resource = yaml.safe_load(f) |
| 333 | + if isinstance(resource, dict): |
| 334 | + return resource.get('kind', None) |
| 335 | + except (FileNotFoundError, yaml.YAMLError) as e: |
| 336 | + print(f"Error while reading YAML to determine kind: {e}") |
| 337 | + return None |
| 338 | + |
| 339 | + |
| 340 | +def mask_resource(path: str, dump_conf: bool = False) -> bool: |
| 341 | + """ |
| 342 | + Dispatcher function that determines the resource type and applies |
| 343 | + the appropriate masking strategy: |
| 344 | + - Secrets: Use SecretMask (base64 decode/encode) |
| 345 | + - ConfigMaps/CRs/Other: Use PlaintextMask (direct text masking) |
| 346 | + """ |
| 347 | + kind = get_resource_kind(path) |
| 348 | + |
| 349 | + if kind == "Secret": |
| 350 | + return SecretMask(path, dump_conf).mask() |
| 351 | + else: |
| 352 | + # ConfigMaps, CRs, and any other resource type |
| 353 | + return PlaintextMask(path).mask() |
| 354 | + |
| 355 | + |
222 | 356 | def parse_opts(argv: Any) -> Any: |
223 | 357 | """ |
224 | 358 | Utility for the main function: it provides a way to parse |
@@ -247,7 +381,12 @@ def parse_opts(argv: Any) -> Any: |
247 | 381 | # argument and process all the files found in |
248 | 382 | # that directory |
249 | 383 | for root, subdirs, files in os.walk(OPTS.dir): |
250 | | - [SecretMask(os.path.join(root, f), OPTS.dump_conf).mask() for f in files] |
| 384 | + for f in files: |
| 385 | + # Skip non-YAML files |
| 386 | + if not f.endswith('.yaml') and not f.endswith('.yml'): |
| 387 | + continue |
| 388 | + file_path = os.path.join(root, f) |
| 389 | + mask_resource(file_path, OPTS.dump_conf) |
251 | 390 |
|
252 | 391 | if OPTS.path is not None and os.path.exists(OPTS.path): |
253 | | - SecretMask(OPTS.path, OPTS.dump_conf).mask() |
| 392 | + mask_resource(OPTS.path, OPTS.dump_conf) |
0 commit comments