Skip to content

Commit 5c5b737

Browse files
SNOW-2235955: adding MFA test in Python (#2465)
1 parent 1190042 commit 5c5b737

File tree

5 files changed

+89
-1
lines changed

5 files changed

+89
-1
lines changed
Binary file not shown.

ci/test_authentication.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,5 @@ docker run \
2323
-v $(cd $THIS_DIR/.. && pwd):/mnt/host \
2424
-v $WORKSPACE:/mnt/workspace \
2525
--rm \
26-
nexus.int.snowflakecomputing.com:8086/docker/snowdrivers-test-external-browser-python:2 \
26+
nexus.int.snowflakecomputing.com:8086/docker/snowdrivers-test-external-browser-python:3 \
2727
"/mnt/host/ci/container/test_authentication.sh"

test/auth/authorization_parameters.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,13 @@ def __init__(self):
7777
def get_base_connection_parameters(self) -> dict[str, Union[str, bool, int]]:
7878
return self.basic_config
7979

80+
def get_mfa_connection_parameters(self) -> dict[str, Union[str, bool, int]]:
81+
config = self.basic_config.copy()
82+
config["user"] = _get_env_variable("SNOWFLAKE_AUTH_TEST_MFA_USER")
83+
config["password"] = _get_env_variable("SNOWFLAKE_AUTH_TEST_MFA_PASSWORD")
84+
config["authenticator"] = "USERNAME_PASSWORD_MFA"
85+
return config
86+
8087
def get_key_pair_connection_parameters(self):
8188
config = self.basic_config.copy()
8289
config["authenticator"] = "SNOWFLAKE_JWT"

test/auth/authorization_test_helper.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,25 @@ def _provide_credentials(self, scenario: Scenario, login: str, password: str):
154154
self.error_msg = e
155155
raise RuntimeError(e)
156156

157+
def get_totp(self, seed: str = "") -> []:
158+
if self.auth_test_env == "docker":
159+
try:
160+
provide_totp_generator_path = "/externalbrowser/totpGenerator.js"
161+
process = subprocess.run(
162+
["node", provide_totp_generator_path, seed],
163+
timeout=40,
164+
capture_output=True,
165+
text=True,
166+
)
167+
logger.debug(f"OUTPUT: {process.stdout}, ERRORS: {process.stderr}")
168+
return process.stdout.strip().split()
169+
except Exception as e:
170+
self.error_msg = e
171+
raise RuntimeError(e)
172+
else:
173+
logger.info("TOTP generation is not supported in this environment")
174+
return ""
175+
157176
def connect_using_okta_connection_and_execute_custom_command(
158177
self, command: str, return_token: bool = False
159178
) -> Union[bool, str]:
@@ -169,3 +188,27 @@ def connect_using_okta_connection_and_execute_custom_command(
169188
if return_token:
170189
return token
171190
return False
191+
192+
def connect_and_execute_simple_query_with_mfa_token(self, totp_codes):
193+
# Try each TOTP code until one works
194+
for i, totp_code in enumerate(totp_codes):
195+
logging.info(f"Trying TOTP code {i + 1}/{len(totp_codes)}")
196+
197+
self.configuration["passcode"] = totp_code
198+
self.error_msg = ""
199+
200+
connection_success = self.connect_and_execute_simple_query()
201+
202+
if connection_success:
203+
logging.info(f"Successfully connected with TOTP code {i + 1}")
204+
return True
205+
else:
206+
last_error = str(self.error_msg)
207+
logging.warning(f"TOTP code {i + 1} failed: {last_error}")
208+
if "TOTP Invalid" in last_error:
209+
logging.info("TOTP/MFA error detected.")
210+
continue
211+
else:
212+
logging.error(f"Non-TOTP error detected: {last_error}")
213+
break
214+
return False

test/auth/test_mfa.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import logging
2+
from test.auth.authorization_parameters import AuthConnectionParameters
3+
from test.auth.authorization_test_helper import AuthorizationTestHelper
4+
5+
import pytest
6+
7+
8+
@pytest.mark.auth
9+
def test_mfa_successful():
10+
connection_parameters = AuthConnectionParameters().get_mfa_connection_parameters()
11+
connection_parameters["client_request_mfa_token"] = True
12+
test_helper = AuthorizationTestHelper(connection_parameters)
13+
totp_codes = test_helper.get_totp()
14+
logging.info(f"Got {len(totp_codes)} TOTP codes to try")
15+
16+
connection_success = test_helper.connect_and_execute_simple_query_with_mfa_token(
17+
totp_codes
18+
)
19+
20+
assert (
21+
connection_success
22+
), f"Failed to connect with any of the {len(totp_codes)} TOTP codes. Last error: {test_helper.error_msg}"
23+
assert (
24+
test_helper.error_msg == ""
25+
), f"Final error message should be empty but got: {test_helper.error_msg}"
26+
27+
logging.info("Testing MFA token caching with second connection...")
28+
29+
connection_parameters["passcode"] = None
30+
cache_test_helper = AuthorizationTestHelper(connection_parameters)
31+
cache_connection_success = cache_test_helper.connect_and_execute_simple_query()
32+
33+
assert (
34+
cache_connection_success
35+
), f"Failed to connect with cached MFA token. Error: {cache_test_helper.error_msg}"
36+
assert (
37+
cache_test_helper.error_msg == ""
38+
), f"Cache test error message should be empty but got: {cache_test_helper.error_msg}"

0 commit comments

Comments
 (0)