diff --git a/plugins/modules/crawl_n_mask.py b/plugins/modules/crawl_n_mask.py index 8470b9e5d8..4d9cd66b47 100755 --- a/plugins/modules/crawl_n_mask.py +++ b/plugins/modules/crawl_n_mask.py @@ -13,13 +13,12 @@ --- module: crawl_n_mask -short_description: This module mask secrets in yaml files/dirs +short_description: This module mask secrets in yaml/json/log files/dirs version_added: "1.0.0" description: - - This module crawls over a directory (default) and find yaml files which may have secrets in it, and proceeds with masking it. - - If you pass a yaml file, it will directly check and mask secret in it. + - This module crawls over a directory (default) and find yaml/json/log files which may have secrets in it, and proceeds with masking it. - If you pass a directory, it will crawl the directory and find eligible files to mask. options: @@ -43,7 +42,7 @@ """ EXAMPLES = r""" -- name: Mask secrets in all yaml files within /home/zuul/logs +- name: Mask secrets in all yaml/json/log files within /home/zuul/logs crawl_n_mask: path: /home/zuul/logs isdir: True @@ -51,6 +50,10 @@ - name: Mask my_secrets.yaml crawl_n_mask: path: /home/zuul/logs/my_secrets.yaml + +- name: Mask application.log + crawl_n_mask: + path: /var/log/application.log """ RETURN = r""" @@ -62,8 +65,9 @@ """ import os -import re import pathlib +import re +from multiprocessing import Pool, cpu_count from ansible.module_utils.basic import AnsibleModule @@ -86,85 +90,211 @@ # python3 plugins/modules/crawl_n_mask.py ./args.json ################ -# files which are yaml but do not end with .yaml or .yml -ALLOWED_YAML_FILES = [ - "Standalone", -] # dirs which we do not want to scan EXCLUDED_DIRS = [ "openstack-k8s-operators-openstack-must-gather", "tmp", "venv", + ".git", ".github", ] -# file extensions which we do not want to process +# Used to skip Ansible task headers from txt/log masked files +ANSIBLE_SKIP_PATTERNS = [ + "TASK [", + "TASK: ", + "PLAY [", +] +# File extensions which we do not want to process EXCLUDED_FILE_EXT = [ ".py", ".html", ".DS_Store", - ".tar.gz", + ".tar.*", + ".rpm", ".zip", ".j2", + ".subunit", + ".tmp", ] # keys in files whose values need to be masked PROTECT_KEYS = [ - "literals", - "PASSWORD", - "Password", - "password", + "_client_cert_passphrase", + "_client_key_passphrase", + "_local_rsync_password", "_pwd", "_PWD", - "Token", - "Secret", - "secret", - "SECRET", + "_secret_content", + "abotrabbitmq", + "accessSecret", + "adcCredentialSecret", + "admin_password", + "adminPassword", + "AdminPassword", + "ADMIN_PASSWORD", + "adminPasswordSecretKeyRef", + "alt_password", + "AodhDatabasePassword", + "AodhPassword", + "api_secret", + "auth_encryption_key", + "authCertSecret", "Authkey", "authkey", - "private_key", - "privatekey", - "Passphrase", - "passphrase", - "PASSPHRASE", - "encryption_key", - "ENCRYPTION_KEY", - "HeatAuthEncryptionKey", - "oc_login_command", - "METADATA_SHARED_SECRET", - "KEYSTONE_FEDERATION_CLIENT_SECRET", - "rabbit", - "database_connection", - "slave_connection", - "sql_connection", + "aws_secret_access_key", + "BARBICAN_SIMPLE_CRYPTO_ENCRYPTION_KEY", + "BarbicanDatabasePassword", + "BarbicanPassword", + "BarbicanSimpleCryptoKEK", + "BarbicanSimpleCryptoKek", + "bearerToken", + "bind_password", + "bindPassword", + "bootstrapPassword", + "bootstrapToken", + "ca_secret", + "caSecret", + "CeilometerPassword", + "CephClientKey", + "CephClusterFSID", + "CephRgwKey", + "chap_password", "cifmw_openshift_login_password", "cifmw_openshift_login_token", - "BarbicanSimpleCryptoKEK", - "OctaviaHeartbeatKey", - "server-ca-passphrase", - "KeystoneFernetKeys", - "KeystoneFernetKey", - "KeystoneCredential", + "cifmw_openshift_password", + "CinderDatabasePassword", + "CinderPassword", + "client_secret", + "clientSecret", + "clientsecret", + "ClientKey", + "cloud_admin_user_password", + "database_connection", + "databasePassword", + "DatabasePassword", + "db-password", + "DB_ROOT_PASSWORD", + "DbRootPassword", + "defaultAdminPassword", + "DesignateDatabasePassword", + "DesignatePassword", "DesignateRndcKey", - "CephRgwKey", - "CephClusterFSID", - "CephClientKey", - "BarbicanSimpleCryptoKek", - "HashSuffix", - "RabbitCookie", + "docker-password", + "EMAIL_HOST_PASSWORD", + "ENCRYPTION_KEY", + "encryption_key", "erlang_cookie", - "ClientKey", - "swift_store_key", - "secret_key", - "heartbeat_key", "fernet_keys", - "sshkey", + "fromConnectionSecretKey", + "git-password", + "GlanceDatabasePassword", + "GlancePassword", + "HashSuffix", + "HEAT_AUTH_ENCRYPTION_KEY", + "HeatAuthEncryptionKey", + "HeatDatabasePassword", + "HeatPassword", + "heartbeat_key", + "http_basic_password", + "idp_password", + "idp_test_user_password", + "iibpassword", + "ilo_password", + "image_alt_ssh_password", + "image_password", + "image_server_password", + "image_ssh_password", + "infoblox_password", + "ipmi_password", + "IronicDatabasePassword", + "IronicInspectorDatabasePassword", + "IronicInspectorPassword", + "IronicPassword", + "key-password", + "key-store-password", + "key_password", + "keyPassword", + "keystoreKeyPassword", + "keystoreKeypassword", + "keystorePassword", + "keyStorePassword", + "KeystoneCredential", + "KeystoneDatabasePassword", + "KEYSTONE_FEDERATION_CLIENT_SECRET", + "KeystoneFernetKey", + "KeystoneFernetKeys", "keytab_base64", -] -# connection keys which may be part of the value itself -CONNECTION_KEYS = [ + "LibvirtPassword", + "licenseSecret", + "literals", + "managementPassword", + "ManilaDatabasePassword", + "ManilaPassword", + "MARIADB_PASSWORD", + "master-password", + "masterPassword", + "MASTER_PASSWORD", + "metadata_proxy_shared_secret", + "MetadataSecret", + "METADATA_SHARED_SECRET", + "MONGODB_BACKUP_PASSWORD", + "MONGODB_CLUSTER_ADMIN_PASSWORD", + "MONGODB_CLUSTER_MONITOR_PASSWORD", + "MONGODB_DATABASE_ADMIN_PASSWORD", + "MONGODB_USER_ADMIN_PASSWORD", + "mqpassword", + "mysql_root_password", + "mysql_zabbix_password", + "netapp_password", + "NeutronDatabasePassword", + "NeutronPassword", + "nexusInitialPassword", + "NodeRootPassword", + "NovaAPIDatabasePassword", + "NovaCell0DatabasePassword", + "NovaCell1DatabasePassword", + "NovaPassword", + "oc_login_command", + "OctaviaDatabasePassword", + "OctaviaHeartbeatKey", + "OctaviaPassword", + "Passphrase", + "passphrase", + "PASSPHRASE", + "Password", + "password", + "PASSWORD", + "pgpassword", + "pgreplpassword", + "pg_restic_password", + "pgRewindPassword", + "PlacementDatabasePassword", + "PlacementPassword", + "postgresPassword", + "postgresqlPassword", + "private_key", + "privatekey", + "proxy_password", "rabbit", - "database_connection", + "rabbitmqPassword", + "RabbitCookie", + "redfish_password", + "redis_password", + "remote_image_user_password", + "scimAdminPassword", + "Secret", + "secret", + "SECRET", + "secret_key", + "server-ca-passphrase", + "ServicePassword", "slave_connection", + "SPRING_DATASOURCE_PASSWORD", "sql_connection", + "ssh-privatekey", + "sshkey", + "stack_domain_admin_password", + "staticPasswords", + "X-Auth-Token", ] # Masking string MASK_STR = "**********" @@ -172,6 +302,48 @@ # regex of excluded file extensions excluded_file_ext_regex = r"(^.*(%s).*)" % "|".join(EXCLUDED_FILE_EXT) +QUICK_KEYWORDS = frozenset([key.lower() for key in PROTECT_KEYS]) + +# Pre-compiled regex patterns for log file masking. +LOG_PATTERNS = { + # Matches: 'password': 'value' OR \n'password': 'value' + # Groups: (1=prefix, 2=", 3=key, 4=", 5=space-before, 6=sep, 7=space-after, 8=", 9=value, 10=") + "python_dict_quoted": re.compile( + r"((?:\s|\\n)*)(['\"])(" + + "|".join(PROTECT_KEYS) + + r")(['\"])(\s*)([:=])(\s*)(['\"])([^'\"]+)(['\"])", + re.IGNORECASE, + ), + # Matches: 'password': 123456789 OR \n'password': 123456789 + # Groups: (1=prefix, 2=", 3=key, 4=", 5=space-before, 6=sep, 7=space-after, 8=value) + "python_dict_numeric": re.compile( + r"((?:\s|\\n)*)(['\"])(" + + "|".join(PROTECT_KEYS) + + r")(['\"])(\s*)(:)(\s*)(\d{6,})", + re.IGNORECASE, + ), + # Matches: password: value OR netapp_password=secret OR \npassword: value + # Groups: (1=prefix, 2=key, 3=space-before, 4=sep, 5=space-after, + # 6=open-quote(optional), 7=value, 8=close-quote(optional)) + "plain_key_value": re.compile( + r"((?:\s|\\n)*)\b(" + + "|".join(PROTECT_KEYS) + + r')\b(\s*)([:=])(\s*)(["\']?)([^\s\\"\']+)(["\']?)', + re.IGNORECASE, + ), + # Matches: SHA256 tokens (OpenShift style) + "sha256_token": re.compile(r"sha256~[A-Za-z0-9_-]+"), + # Matches: Bearer + "bearer": re.compile(r"Bearer\s+[a-zA-Z0-9_-]{20,}", re.IGNORECASE), + # Matches: ://user:pass@host + "connection_string": re.compile( + r"://([a-zA-Z0-9_-]+):([a-zA-Z0-9_@!#$%^&*]+)@([a-zA-Z0-9.-]+)" + ), +} + +# Available CPU-1, Max 8 +NUM_WORKERS = min(cpu_count() - 1, 8) + def handle_walk_errors(e): raise e @@ -182,8 +354,9 @@ def crawl(module, path) -> bool: Crawler function which will crawl through the log directory and find eligible files for masking. """ - changed = False + files_to_process = [] base_path = os.path.normpath(path) + results = [] for root, _, files in os.walk(base_path, onerror=handle_walk_errors): # Get relative path from our base path rel_path = os.path.relpath(root, base_path) @@ -191,100 +364,111 @@ def crawl(module, path) -> bool: # Check if any parent directory (not the root) is excluded if any(part in EXCLUDED_DIRS for part in rel_path.split(os.sep)): continue - for f in files: if not re.search(excluded_file_ext_regex, f): - if mask(module, os.path.join(root, f)): - # even if one file is masked, the final result will be True - changed = True - return changed + files_to_process.append(os.path.join(root, f)) + try: + with Pool(processes=NUM_WORKERS) as pool: + results = pool.map(mask_file, files_to_process) + except Exception as e: + module.fail_json(msg=f"Failed to mask files: {str(e)}") + + return any(results) def _get_masked_string(value): + # Not process empty strings + if len(value.strip("'\"")) == 0: + return value if len(value) <= 4: return value[:2] + MASK_STR return value[:2] + MASK_STR + value[-2:] -def partial_mask(value): +def mask_log_line(line: str) -> str: """ - Check length of the string. If it is too long, take 2 chars - from beginning, then add mask string and add 2 chars from the - end. - If value is short, take just 2 chars and add mask string + Masks several secrets occurrence in a single line. + Works good with big file with long lines and sparse secrets. + + Returns masked line with secrets replaced by MASK_STR """ - if not value.strip(): - return - - if "'" in value: - parsed_value = value.split("'") - if len(parsed_value) > 2 and parsed_value[1] != "": - prefix = parsed_value[0] - value = _get_masked_string(parsed_value[1]) - suffix = parsed_value[2] - return f"{prefix}'{value}'{suffix}" - else: - match = re.match(r"^(\s*)(.*?)(\n?)$", value) - if match: - parts = list(match.groups()) - prefix = parts[0] - value = _get_masked_string(parts[1]) - suffix = parts[2] - return f"{prefix}'{value}'{suffix}" + line_lower = line.lower() + has_keyword = any(kw in line_lower for kw in QUICK_KEYWORDS) + + if not has_keyword: + return line -def mask(module, path: str) -> bool: + # Pattern 1: 'password': 'value' + # Groups: (1=prefix, 2=", 3=key, 4=", 5=space-before, 6=sep, 7=space-after, 8=", 9=value, 10=") + line = LOG_PATTERNS["python_dict_quoted"].sub( + lambda m: f"{m.group(1)}{m.group(2)}{m.group(3)}{m.group(4)}{m.group(5)}{m.group(6)}{m.group(7)}{m.group(8)}{_get_masked_string(m.group(9))}{m.group(10)}", + line, + ) + + # Pattern 2: 'password': 123456789 + # Groups: (1=prefix, 2=", 3=key, 4=", 5=space-before, 6=sep, 7=space-after, 8=value) + line = LOG_PATTERNS["python_dict_numeric"].sub( + lambda m: f"{m.group(1)}{m.group(2)}{m.group(3)}{m.group(4)}{m.group(5)}{m.group(6)}{m.group(7)}{_get_masked_string(m.group(8))}", + line, + ) + + # Pattern 3: password: value OR password = value + # Groups: (1=prefix, 2=key, 3=space-before, 4=sep, 5=space-after, + # 6=open-quote(optional), 7=value, 8=close-quote(optional)) + line = LOG_PATTERNS["plain_key_value"].sub( + lambda m: f"{m.group(1)}{m.group(2)}{m.group(3)}{m.group(4)}{m.group(5)}{m.group(6)}{_get_masked_string(m.group(7))}{m.group(8)}", + line, + ) + # SHA256 tokens + # sha256~abc123... -> sha256~********** + line = LOG_PATTERNS["sha256_token"].sub(f"sha256~{MASK_STR}", line) + + # Bearer tokens + # Bearer abc123... -> Bearer ********** + line = LOG_PATTERNS["bearer"].sub(f"Bearer {MASK_STR}", line) + + # Connection_string tokens + # mysql://user:pas123@localhost:3306/db -> mysql://*****:*******@:3306/db" + line = LOG_PATTERNS["connection_string"].sub(f"://{MASK_STR}:{MASK_STR}@", line) + + return line + + +def should_skip_ansible_line(line: str) -> bool: """ - Function responsible to begin masking on a provided - log file. It checks for file type, and calls - respective masking methods for that file. + Identifies if the line is in an Ansible header for Tasks or Plays. + + Returns True for lines that should not be masked. """ - changed = False - if ( - path.endswith((tuple(["yaml", "yml"]))) - or os.path.basename(path).split(".")[0] in ALLOWED_YAML_FILES - ): - extension = "yaml" - changed = mask_file(module, path, extension) - return changed + line_upper = line.upper() + return any(pattern.upper() in line_upper for pattern in ANSIBLE_SKIP_PATTERNS) -def mask_yaml(infile, outfile, changed) -> bool: +def mask_log_file_lines(infile, outfile, changed) -> bool: """ - Read the file, search for colon (':'), take value and - mask sensitive data + Mask log file lines with skip logic. + """ for line in infile: - # Skip lines without colon - if ":" not in line: + # Skip Ansible task headers + if should_skip_ansible_line(line): outfile.write(line) continue - key, sep, value = line.partition(":") - masked_value = value - for word in PROTECT_KEYS: - if key.strip() == word: - masked = partial_mask(value) - if not masked: - continue - masked_value = masked_value.replace(value, masked) - changed = True - - outfile.write(f"{key}{sep}{masked_value}") - return changed - + masked_line = mask_log_line(line) + if masked_line != line: + changed = True + outfile.write(masked_line) -def replace_file(temp_path, file_path, changed): - if changed: - temp_path.replace(file_path) - else: - temp_path.unlink(missing_ok=True) + return changed -def mask_file(module, path, extension) -> bool: +def mask_file(path) -> bool: """ Create temporary file, replace sensitive string with masked, then replace the tmp file with original. + Unlink temp file when failure. """ changed = False @@ -293,12 +477,23 @@ def mask_file(module, path, extension) -> bool: try: with file_path.open("r", encoding="utf-8") as infile: with temp_path.open("w", encoding="utf-8") as outfile: - if extension == "yaml": - changed = mask_yaml(infile, outfile, changed) - replace_file(temp_path, file_path, changed) - return changed + changed = mask_log_file_lines(infile, outfile, changed) + replace_file(temp_path, file_path, changed) + return changed + except FileNotFoundError: + print(f"Warning: File not found (possibly broken symlink): {file_path}") + return False except Exception as e: print(f"An unexpected error occurred on masking file {file_path}: {e}") + temp_path.unlink(missing_ok=True) + return False + + +def replace_file(temp_path, file_path, changed): + if changed: + temp_path.replace(file_path) + else: + temp_path.unlink(missing_ok=True) def run_module(): @@ -344,7 +539,10 @@ def run_module(): changed = crawl(module, path) if not isdir and not re.search(excluded_file_ext_regex, path): - changed = mask(module, path) + try: + changed = mask_file(path) + except Exception as e: + module.fail_json(e) result.update(changed=changed) # in the event of a successful module execution, you will want to diff --git a/roles/artifacts/tasks/main.yml b/roles/artifacts/tasks/main.yml index 36e10f79a6..682058fcf4 100644 --- a/roles/artifacts/tasks/main.yml +++ b/roles/artifacts/tasks/main.yml @@ -92,6 +92,7 @@ - name: Mask secrets in yaml log files when: cifmw_artifacts_mask_logs |bool + become: true ignore_errors: true # noqa: ignore-errors timeout: 3600 cifmw.general.crawl_n_mask: diff --git a/tests/unit/modules/test_crawl_n_mask.py b/tests/unit/modules/test_crawl_n_mask.py index 519bdd2792..8330ae4d27 100644 --- a/tests/unit/modules/test_crawl_n_mask.py +++ b/tests/unit/modules/test_crawl_n_mask.py @@ -1,5 +1,9 @@ +import os +import tempfile +from unittest.mock import MagicMock, patch + import pytest -from unittest.mock import patch, MagicMock + from plugins.modules import crawl_n_mask as cnm @@ -14,10 +18,13 @@ class TestCrawlNMask: ) def test_crawl_true(self, test_dir, expected_files): with patch("os.walk") as mock_walk, patch( - "plugins.modules.crawl_n_mask.mask" - ) as mock_mask: + "plugins.modules.crawl_n_mask.Pool" + ) as mock_pool: mock_walk.return_value = expected_files - mock_mask.return_value = True + # Mock the Pool context manager and map method + mock_pool_instance = MagicMock() + mock_pool.return_value.__enter__.return_value = mock_pool_instance + mock_pool_instance.map.return_value = [True] # At least one file changed module = MagicMock() changed = cnm.crawl(module, test_dir) assert changed @@ -33,33 +40,220 @@ def test_crawl_true(self, test_dir, expected_files): ) def test_crawl_false(self, test_dir, expected_files): with patch("os.walk") as mock_walk, patch( - "plugins.modules.crawl_n_mask.mask" - ) as mock_mask: + "plugins.modules.crawl_n_mask.Pool" + ) as mock_pool: mock_walk.return_value = expected_files - mock_mask.return_value = False + # Mock the Pool context manager and map method + mock_pool_instance = MagicMock() + mock_pool.return_value.__enter__.return_value = mock_pool_instance + mock_pool_instance.map.return_value = [False] # No files changed module = MagicMock() changed = cnm.crawl(module, test_dir) assert not changed - def test_partial_mask_scenario_1(self): - example_value = " 'test1234'\n" - expected_value = " 'te**********34'\n" - test_value = cnm.partial_mask(example_value) + def test_get_masked_string_scenario_1(self): + example_value = "test1234" + expected_value = "te**********34" + test_value = cnm._get_masked_string(example_value) assert expected_value == test_value - def test_partial_mask_scenario_2(self): - example_value = " osp_ci_framework_keytab\n" - expected_value = " 'os**********ab'\n" - test_value = cnm.partial_mask(example_value) + def test_get_masked_string_scenario_2(self): + example_value = "examplea_keytab" + expected_value = "ex**********ab" + test_value = cnm._get_masked_string(example_value) assert expected_value == test_value - def test_partial_mask_scenario_3(self): - example_value = " ''\n" - test_value = cnm.partial_mask(example_value) - assert test_value is None + def test_get_masked_string_scenario_3(self): + example_value = "" + expected_value = "" + test_value = cnm._get_masked_string(example_value) + assert expected_value == test_value - def test_partial_mask_scenario_4(self): - example_value = "tet" - expected_value = "'te**********'" - test_value = cnm.partial_mask(example_value) + def test_get_masked_string_scenario_4(self): + example_value = "test" + expected_value = "te**********" + test_value = cnm._get_masked_string(example_value) assert expected_value == test_value + + @pytest.mark.parametrize( + "input_line, expected_output", + [ + # Test python dict quoted pattern + ( + "'admin_password': 'SuperSecret123'", + "'admin_password': 'Su**********23'", + ), + ( + '"AdminPassword": "MyP@ssw0rd"', + '"AdminPassword": "My**********rd"', + ), + # Test numeric passwords + ( + "'db-password': 123456789", + "'db-password': 12**********89", + ), + # Test empty value + ( + "password: ''", + "password: ''", + ), + # Test plain key-value pattern + ( + "admin_password: secret123", + "admin_password: se**********23", + ), + ( + 'password: "abc123"', + 'password: "ab**********23"', + ), + ( + "password: 'abc123'", + "password: 'ab**********23'", + ), + ( + "mysql_root_password=MyPassword", + "mysql_root_password=My**********rd", + ), + # Test SHA256 tokens - treated as plain key:value + ( + "X-Auth-Token sha256~abc123def456ghi789", + "X-Auth-Token sha256~**********", + ), + # Test Bearer tokens - value gets masked + ( + "bearerToken: eyJhbGciOiJIU2d12ansnR5cCI6IkpXVCJ9", + "bearerToken: ey**********J9", + ), + # Test connection strings - credentials in URL get masked + ( + "admin_password in mysql://user:password123@localhost:3306/db", + "admin_password in mysql://**********:**********@:3306/db", + ), + # Test line without secrets - should remain unchanged + ( + "This is a normal log line without secrets", + "This is a normal log line without secrets", + ), + # Test multiple secrets in one line + ( + "'admin_password': 'secret1' and 'db-password'= 'secret2'", + "'admin_password': 'se**********t1' and 'db-password'= 'se**********t2'", + ), + # Test additional keys from PROTECT_KEYS + ( + "redis_password: myRedisSecret", + "redis_password: my**********et", + ), + ( + "clientSecret:oauth2secret", + "clientSecret:oa**********et", + ), + ( + "postgresPassword :dbP@ssw0rd!", + "postgresPassword :db**********d!", + ), + ( + "'BARBICAN_SIMPLE_CRYPTO_ENCRYPTION_KEY' : 'sE12312341==48943y21'", + "'BARBICAN_SIMPLE_CRYPTO_ENCRYPTION_KEY' : 'sE**********21'", + ), + ], + ) + def test_mask_log_line(self, input_line, expected_output): + result = cnm.mask_log_line(input_line) + assert result == expected_output + + def test_should_skip_ansible_line(self): + assert cnm.should_skip_ansible_line("TASK [Install packages]") == True + assert cnm.should_skip_ansible_line("TASK: Configure database") == True + assert cnm.should_skip_ansible_line("PLAY [Deploy application]") == True + assert cnm.should_skip_ansible_line("Some regular log line") == False + + def test_mask_file_with_real_file(self): + # Create a temporary file with secrets + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".yaml") as f: + f.write("admin_password: SuperSecret123\n") + f.write( + "db_connection: admin_password in mysql://user:pass123@localhost/db\n" + ) + f.write("normal_config: some_value\n") + temp_path = f.name + + try: + # Mask the file + changed = cnm.mask_file(temp_path) + + # Verify it was changed + assert changed == True + + # Read the masked file + with open(temp_path, "r") as f: + content = f.read() + + # Verify secrets are masked + assert "SuperSecret123" not in content + assert "pass123" not in content + assert "**********" in content + + # Verify normal content is preserved + assert "normal_config: some_value" in content + + finally: + # Clean up + if os.path.exists(temp_path): + os.unlink(temp_path) + + def test_mask_file_no_changes(self): + # Create a temporary file without secrets + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".yaml") as f: + f.write("normal_config: some_value\n") + f.write("another_config: another_value\n") + temp_path = f.name + + try: + # Mask the file + changed = cnm.mask_file(temp_path) + + # Verify it was not changed + assert changed == False + + # Read the file + with open(temp_path, "r") as f: + content = f.read() + + # Verify content is unchanged + assert "normal_config: some_value" in content + assert "another_config: another_value" in content + + finally: + # Clean up + if os.path.exists(temp_path): + os.unlink(temp_path) + + def test_mask_file_preserves_ansible_task_headers(self): + # Create a temporary file with Ansible task headers + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".log") as f: + f.write("TASK [Setup admin_password variable]\n") + f.write("admin_password: SuperSecret123\n") + f.write("PLAY [Configure mysql_root_password]\n") + temp_path = f.name + + try: + # Mask the file + changed = cnm.mask_file(temp_path) + + # Read the masked file + with open(temp_path, "r") as f: + content = f.read() + + # Verify task headers are preserved exactly + assert "TASK [Setup admin_password variable]" in content + assert "PLAY [Configure mysql_root_password]" in content + + # Verify the actual secret is masked + assert "SuperSecret123" not in content + assert "**********" in content + + finally: + # Clean up + if os.path.exists(temp_path): + os.unlink(temp_path)