diff --git a/google/auth/transport/_mtls_helper.py b/google/auth/transport/_mtls_helper.py index 7b2b0407f..f5d6b6724 100644 --- a/google/auth/transport/_mtls_helper.py +++ b/google/auth/transport/_mtls_helper.py @@ -47,6 +47,20 @@ b"-----BEGIN PASSPHRASE-----(.+)-----END PASSPHRASE-----", re.DOTALL ) +# Temporary patch to accomodate incorrect cert config in Cloud Run prod environment. +_WELL_KNOWN_CLOUD_RUN_CERT_PATH = ( + "/var/run/secrets/workload-spiffe-credentials/certificates.pem" +) +_WELL_KNOWN_CLOUD_RUN_KEY_PATH = ( + "/var/run/secrets/workload-spiffe-credentials/private_key.pem" +) +_INCORRECT_CLOUD_RUN_CERT_PATH = ( + "/var/lib/volumes/certificate/workload-certificates/certificates.pem" +) +_INCORRECT_CLOUD_RUN_KEY_PATH = ( + "/var/lib/volumes/certificate/workload-certificates/private_key.pem" +) + def _check_config_path(config_path): """Checks for config file path. If it exists, returns the absolute path with user expansion; @@ -183,6 +197,25 @@ def _get_workload_cert_and_key_paths(config_path): ) key_path = workload["key_path"] + # == BEGIN Temporary Cloud Run PATCH == + # See https://github.com/googleapis/google-auth-library-python/issues/1881 + if (cert_path == _INCORRECT_CLOUD_RUN_CERT_PATH) and ( + key_path == _INCORRECT_CLOUD_RUN_KEY_PATH + ): + if not path.exists(cert_path) and not path.exists(key_path): + _LOGGER.debug( + "Applying Cloud Run certificate path patch. " + "Configured paths not found: %s, %s. " + "Using well-known paths: %s, %s", + cert_path, + key_path, + _WELL_KNOWN_CLOUD_RUN_CERT_PATH, + _WELL_KNOWN_CLOUD_RUN_KEY_PATH, + ) + cert_path = _WELL_KNOWN_CLOUD_RUN_CERT_PATH + key_path = _WELL_KNOWN_CLOUD_RUN_KEY_PATH + # == END Temporary Cloud Run PATCH == + return cert_path, key_path diff --git a/tests/transport/test__mtls_helper.py b/tests/transport/test__mtls_helper.py index 63c742c1f..2a7a524b1 100644 --- a/tests/transport/test__mtls_helper.py +++ b/tests/transport/test__mtls_helper.py @@ -334,6 +334,93 @@ def test_success_with_certificate_config( assert key == pytest.private_key_bytes assert passphrase is None + @mock.patch( + "google.auth.transport._mtls_helper._read_cert_and_key_files", autospec=True + ) + @mock.patch( + "google.auth.transport._mtls_helper._get_cert_config_path", autospec=True + ) + @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True) + @mock.patch("google.auth.transport._mtls_helper._check_config_path", autospec=True) + def test_success_with_certificate_config_cloud_run_patch( + self, + mock_check_config_path, + mock_load_json_file, + mock_get_cert_config_path, + mock_read_cert_and_key_files, + ): + cert_config_path = "/path/to/config" + mock_check_config_path.return_value = cert_config_path + mock_load_json_file.return_value = { + "cert_configs": { + "workload": { + "cert_path": _mtls_helper._INCORRECT_CLOUD_RUN_CERT_PATH, + "key_path": _mtls_helper._INCORRECT_CLOUD_RUN_KEY_PATH, + } + } + } + mock_get_cert_config_path.return_value = cert_config_path + mock_read_cert_and_key_files.return_value = ( + pytest.public_cert_bytes, + pytest.private_key_bytes, + ) + + has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials() + assert has_cert + assert cert == pytest.public_cert_bytes + assert key == pytest.private_key_bytes + assert passphrase is None + + mock_read_cert_and_key_files.assert_called_once_with( + _mtls_helper._WELL_KNOWN_CLOUD_RUN_CERT_PATH, + _mtls_helper._WELL_KNOWN_CLOUD_RUN_KEY_PATH, + ) + + @mock.patch("os.path.exists", autospec=True) + @mock.patch( + "google.auth.transport._mtls_helper._read_cert_and_key_files", autospec=True + ) + @mock.patch( + "google.auth.transport._mtls_helper._get_cert_config_path", autospec=True + ) + @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True) + @mock.patch("google.auth.transport._mtls_helper._check_config_path", autospec=True) + def test_success_with_certificate_config_cloud_run_patch_skipped_if_cert_exists( + self, + mock_check_config_path, + mock_load_json_file, + mock_get_cert_config_path, + mock_read_cert_and_key_files, + mock_os_path_exists, + ): + cert_config_path = "/path/to/config" + mock_check_config_path.return_value = cert_config_path + mock_os_path_exists.return_value = True + mock_load_json_file.return_value = { + "cert_configs": { + "workload": { + "cert_path": _mtls_helper._INCORRECT_CLOUD_RUN_CERT_PATH, + "key_path": _mtls_helper._INCORRECT_CLOUD_RUN_KEY_PATH, + } + } + } + mock_get_cert_config_path.return_value = cert_config_path + mock_read_cert_and_key_files.return_value = ( + pytest.public_cert_bytes, + pytest.private_key_bytes, + ) + + has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials() + assert has_cert + assert cert == pytest.public_cert_bytes + assert key == pytest.private_key_bytes + assert passphrase is None + + mock_read_cert_and_key_files.assert_called_once_with( + _mtls_helper._INCORRECT_CLOUD_RUN_CERT_PATH, + _mtls_helper._INCORRECT_CLOUD_RUN_KEY_PATH, + ) + @mock.patch( "google.auth.transport._mtls_helper._get_workload_cert_and_key", autospec=True )