diff --git a/.github/workflows/parameters/private/parameters_aws_auth_tests.json.gpg b/.github/workflows/parameters/private/parameters_aws_auth_tests.json.gpg index 66efaba70..a94264cb8 100644 Binary files a/.github/workflows/parameters/private/parameters_aws_auth_tests.json.gpg and b/.github/workflows/parameters/private/parameters_aws_auth_tests.json.gpg differ diff --git a/ci/test_authentication.sh b/ci/test_authentication.sh index af1e77e24..7e238f79e 100755 --- a/ci/test_authentication.sh +++ b/ci/test_authentication.sh @@ -23,5 +23,5 @@ docker run \ -v $(cd $THIS_DIR/.. && pwd):/mnt/host \ -v $WORKSPACE:/mnt/workspace \ --rm \ - nexus.int.snowflakecomputing.com:8086/docker/snowdrivers-test-external-browser-python:2 \ + nexus.int.snowflakecomputing.com:8086/docker/snowdrivers-test-external-browser-python:3 \ "/mnt/host/ci/container/test_authentication.sh" diff --git a/test/auth/authorization_parameters.py b/test/auth/authorization_parameters.py index be56d43c9..332b9bd09 100644 --- a/test/auth/authorization_parameters.py +++ b/test/auth/authorization_parameters.py @@ -77,6 +77,13 @@ def __init__(self): def get_base_connection_parameters(self) -> dict[str, Union[str, bool, int]]: return self.basic_config + def get_mfa_connection_parameters(self) -> dict[str, Union[str, bool, int]]: + config = self.basic_config.copy() + config["user"] = _get_env_variable("SNOWFLAKE_AUTH_TEST_MFA_USER") + config["password"] = _get_env_variable("SNOWFLAKE_AUTH_TEST_MFA_PASSWORD") + config["authenticator"] = "USERNAME_PASSWORD_MFA" + return config + def get_key_pair_connection_parameters(self): config = self.basic_config.copy() config["authenticator"] = "SNOWFLAKE_JWT" diff --git a/test/auth/authorization_test_helper.py b/test/auth/authorization_test_helper.py index d35fd1c33..84598a935 100644 --- a/test/auth/authorization_test_helper.py +++ b/test/auth/authorization_test_helper.py @@ -154,6 +154,25 @@ def _provide_credentials(self, scenario: Scenario, login: str, password: str): self.error_msg = e raise RuntimeError(e) + def get_totp(self, seed: str = "") -> []: + if self.auth_test_env == "docker": + try: + provide_totp_generator_path = "/externalbrowser/totpGenerator.js" + process = subprocess.run( + ["node", provide_totp_generator_path, seed], + timeout=40, + capture_output=True, + text=True, + ) + logger.debug(f"OUTPUT: {process.stdout}, ERRORS: {process.stderr}") + return process.stdout.strip().split() + except Exception as e: + self.error_msg = e + raise RuntimeError(e) + else: + logger.info("TOTP generation is not supported in this environment") + return "" + def connect_using_okta_connection_and_execute_custom_command( self, command: str, return_token: bool = False ) -> Union[bool, str]: @@ -169,3 +188,27 @@ def connect_using_okta_connection_and_execute_custom_command( if return_token: return token return False + + def connect_and_execute_simple_query_with_mfa_token(self, totp_codes): + # Try each TOTP code until one works + for i, totp_code in enumerate(totp_codes): + logging.info(f"Trying TOTP code {i + 1}/{len(totp_codes)}") + + self.configuration["passcode"] = totp_code + self.error_msg = "" + + connection_success = self.connect_and_execute_simple_query() + + if connection_success: + logging.info(f"Successfully connected with TOTP code {i + 1}") + return True + else: + last_error = str(self.error_msg) + logging.warning(f"TOTP code {i + 1} failed: {last_error}") + if "TOTP Invalid" in last_error: + logging.info("TOTP/MFA error detected.") + continue + else: + logging.error(f"Non-TOTP error detected: {last_error}") + break + return False diff --git a/test/auth/test_mfa.py b/test/auth/test_mfa.py new file mode 100644 index 000000000..e7304bc5a --- /dev/null +++ b/test/auth/test_mfa.py @@ -0,0 +1,38 @@ +import logging +from test.auth.authorization_parameters import AuthConnectionParameters +from test.auth.authorization_test_helper import AuthorizationTestHelper + +import pytest + + +@pytest.mark.auth +def test_mfa_successful(): + connection_parameters = AuthConnectionParameters().get_mfa_connection_parameters() + connection_parameters["client_request_mfa_token"] = True + test_helper = AuthorizationTestHelper(connection_parameters) + totp_codes = test_helper.get_totp() + logging.info(f"Got {len(totp_codes)} TOTP codes to try") + + connection_success = test_helper.connect_and_execute_simple_query_with_mfa_token( + totp_codes + ) + + assert ( + connection_success + ), f"Failed to connect with any of the {len(totp_codes)} TOTP codes. Last error: {test_helper.error_msg}" + assert ( + test_helper.error_msg == "" + ), f"Final error message should be empty but got: {test_helper.error_msg}" + + logging.info("Testing MFA token caching with second connection...") + + connection_parameters["passcode"] = None + cache_test_helper = AuthorizationTestHelper(connection_parameters) + cache_connection_success = cache_test_helper.connect_and_execute_simple_query() + + assert ( + cache_connection_success + ), f"Failed to connect with cached MFA token. Error: {cache_test_helper.error_msg}" + assert ( + cache_test_helper.error_msg == "" + ), f"Cache test error message should be empty but got: {cache_test_helper.error_msg}"