Skip to content

Commit 702607f

Browse files
SNOW-2026797 Adding tests for PAT (#2265)
1 parent c3cffb2 commit 702607f

File tree

4 files changed

+139
-19
lines changed

4 files changed

+139
-19
lines changed
Binary file not shown.

test/auth/authorization_parameters.py

Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,14 @@
1-
try:
2-
import os
3-
import sys
1+
import os
2+
import sys
3+
from typing import Union
44

5-
from cryptography.hazmat.backends import default_backend
6-
from cryptography.hazmat.primitives import serialization
7-
except ImportError:
8-
pass
5+
from cryptography.hazmat.backends import default_backend
6+
from cryptography.hazmat.primitives import serialization
97

108
sys.path.append(os.path.abspath(os.path.dirname(__file__)))
119

1210

13-
def get_oauth_token_parameters():
11+
def get_oauth_token_parameters() -> dict[str, str]:
1412
return {
1513
"auth_url": _get_env_variable("SNOWFLAKE_AUTH_TEST_OAUTH_URL"),
1614
"oauth_client_id": _get_env_variable("SNOWFLAKE_AUTH_TEST_OAUTH_CLIENT_ID"),
@@ -30,14 +28,14 @@ def _get_env_variable(name: str, required: bool = True) -> str:
3028
return value
3129

3230

33-
def get_okta_login_credentials():
31+
def get_okta_login_credentials() -> dict[str, str]:
3432
return {
3533
"login": _get_env_variable("SNOWFLAKE_AUTH_TEST_OKTA_USER"),
3634
"password": _get_env_variable("SNOWFLAKE_AUTH_TEST_OKTA_PASS"),
3735
}
3836

3937

40-
def get_soteria_okta_login_credentials():
38+
def get_soteria_okta_login_credentials() -> dict[str, str]:
4139
return {
4240
"login": _get_env_variable("SNOWFLAKE_AUTH_TEST_EXTERNAL_OAUTH_OKTA_CLIENT_ID"),
4341
"password": _get_env_variable(
@@ -56,6 +54,13 @@ def get_rsa_private_key_for_key_pair(
5654
return private_key
5755

5856

57+
def get_pat_setup_command_variables() -> dict[str, Union[str, bool, int]]:
58+
return {
59+
"snowflake_user": _get_env_variable("SNOWFLAKE_AUTH_TEST_SNOWFLAKE_USER"),
60+
"role": _get_env_variable("SNOWFLAKE_AUTH_TEST_INTERNAL_OAUTH_SNOWFLAKE_ROLE"),
61+
}
62+
63+
5964
class AuthConnectionParameters:
6065
def __init__(self):
6166
self.basic_config = {
@@ -69,7 +74,7 @@ def __init__(self):
6974
"CLIENT_STORE_TEMPORARY_CREDENTIAL": False,
7075
}
7176

72-
def get_base_connection_parameters(self):
77+
def get_base_connection_parameters(self) -> dict[str, Union[str, bool, int]]:
7378
return self.basic_config
7479

7580
def get_key_pair_connection_parameters(self):
@@ -79,15 +84,15 @@ def get_key_pair_connection_parameters(self):
7984

8085
return config
8186

82-
def get_external_browser_connection_parameters(self):
87+
def get_external_browser_connection_parameters(self) -> dict[str, str]:
8388
config = self.basic_config.copy()
8489

8590
config["user"] = _get_env_variable("SNOWFLAKE_AUTH_TEST_BROWSER_USER")
8691
config["authenticator"] = "externalbrowser"
8792

8893
return config
8994

90-
def get_store_id_token_connection_parameters(self):
95+
def get_store_id_token_connection_parameters(self) -> dict[str, str]:
9196
config = self.get_external_browser_connection_parameters()
9297

9398
config["CLIENT_STORE_TEMPORARY_CREDENTIAL"] = _get_env_variable(
@@ -96,7 +101,7 @@ def get_store_id_token_connection_parameters(self):
96101

97102
return config
98103

99-
def get_okta_connection_parameters(self):
104+
def get_okta_connection_parameters(self) -> dict[str, str]:
100105
config = self.basic_config.copy()
101106

102107
config["user"] = _get_env_variable("SNOWFLAKE_AUTH_TEST_BROWSER_USER")
@@ -105,15 +110,17 @@ def get_okta_connection_parameters(self):
105110

106111
return config
107112

108-
def get_oauth_connection_parameters(self, token: str):
113+
def get_oauth_connection_parameters(self, token: str) -> dict[str, str]:
109114
config = self.basic_config.copy()
110115

111116
config["user"] = _get_env_variable("SNOWFLAKE_AUTH_TEST_BROWSER_USER")
112117
config["authenticator"] = "OAUTH"
113118
config["token"] = token
114119
return config
115120

116-
def get_oauth_external_authorization_code_connection_parameters(self):
121+
def get_oauth_external_authorization_code_connection_parameters(
122+
self,
123+
) -> dict[str, Union[str, bool, int]]:
117124
config = self.basic_config.copy()
118125

119126
config["authenticator"] = "OAUTH_AUTHORIZATION_CODE"
@@ -136,7 +143,9 @@ def get_oauth_external_authorization_code_connection_parameters(self):
136143

137144
return config
138145

139-
def get_snowflake_authorization_code_connection_parameters(self):
146+
def get_snowflake_authorization_code_connection_parameters(
147+
self,
148+
) -> dict[str, Union[str, bool, int]]:
140149
config = self.basic_config.copy()
141150

142151
config["authenticator"] = "OAUTH_AUTHORIZATION_CODE"
@@ -158,7 +167,9 @@ def get_snowflake_authorization_code_connection_parameters(self):
158167

159168
return config
160169

161-
def get_snowflake_wildcard_external_authorization_code_connection_parameters(self):
170+
def get_snowflake_wildcard_external_authorization_code_connection_parameters(
171+
self,
172+
) -> dict[str, Union[str, bool, int]]:
162173
config = self.basic_config.copy()
163174

164175
config["authenticator"] = "OAUTH_AUTHORIZATION_CODE"
@@ -177,7 +188,9 @@ def get_snowflake_wildcard_external_authorization_code_connection_parameters(sel
177188

178189
return config
179190

180-
def get_oauth_external_client_credential_connection_parameters(self):
191+
def get_oauth_external_client_credential_connection_parameters(
192+
self,
193+
) -> dict[str, str]:
181194
config = self.basic_config.copy()
182195

183196
config["authenticator"] = "OAUTH_CLIENT_CREDENTIALS"
@@ -195,3 +208,11 @@ def get_oauth_external_client_credential_connection_parameters(self):
195208
)
196209

197210
return config
211+
212+
def get_pat_connection_parameters(self) -> dict[str, str]:
213+
config = self.basic_config.copy()
214+
215+
config["authenticator"] = "PROGRAMMATIC_ACCESS_TOKEN"
216+
config["user"] = _get_env_variable("SNOWFLAKE_AUTH_TEST_BROWSER_USER")
217+
218+
return config

test/auth/authorization_test_helper.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import threading
55
import webbrowser
66
from enum import Enum
7+
from typing import Union
78

89
import requests
910

@@ -125,3 +126,19 @@ def _provide_credentials(self, scenario: Scenario, login: str, password: str):
125126
except Exception as e:
126127
self.error_msg = e
127128
raise RuntimeError(e)
129+
130+
def connect_using_okta_connection_and_execute_custom_command(
131+
self, command: str, return_token: bool = False
132+
) -> Union[bool, str]:
133+
try:
134+
logger.info("Setup PAT")
135+
with snowflake.connector.connect(**self.configuration) as con:
136+
result = con.cursor().execute(command)
137+
token = result.fetchall()[0][1]
138+
except Exception as e:
139+
self.error_msg = e
140+
logger.error(e)
141+
return False
142+
if return_token:
143+
return token
144+
return False

test/auth/test_pat.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
from datetime import datetime
2+
from test.auth.authorization_parameters import (
3+
AuthConnectionParameters,
4+
get_pat_setup_command_variables,
5+
)
6+
from typing import Union
7+
8+
import pytest
9+
from authorization_test_helper import AuthorizationTestHelper
10+
11+
12+
@pytest.mark.auth
13+
def test_authenticate_with_pat_successful() -> None:
14+
pat_command_variables = get_pat_setup_command_variables()
15+
connection_parameters = AuthConnectionParameters().get_pat_connection_parameters()
16+
test_helper = AuthorizationTestHelper(connection_parameters)
17+
try:
18+
pat_command_variables = get_pat_token(pat_command_variables)
19+
connection_parameters["token"] = pat_command_variables["token"]
20+
test_helper.connect_and_execute_simple_query()
21+
finally:
22+
remove_pat_token(pat_command_variables)
23+
assert test_helper.get_error_msg() == "", "Error message should be empty"
24+
25+
26+
@pytest.mark.auth
27+
def test_authenticate_with_pat_mismatched_user() -> None:
28+
pat_command_variables = get_pat_setup_command_variables()
29+
connection_parameters = AuthConnectionParameters().get_pat_connection_parameters()
30+
connection_parameters["user"] = "differentUsername"
31+
test_helper = AuthorizationTestHelper(connection_parameters)
32+
try:
33+
pat_command_variables = get_pat_token(pat_command_variables)
34+
connection_parameters["token"] = pat_command_variables["token"]
35+
test_helper.connect_and_execute_simple_query()
36+
finally:
37+
remove_pat_token(pat_command_variables)
38+
39+
assert "Programmatic access token is invalid" in test_helper.get_error_msg()
40+
41+
42+
@pytest.mark.auth
43+
def test_authenticate_with_pat_invalid_token() -> None:
44+
connection_parameters = AuthConnectionParameters().get_pat_connection_parameters()
45+
connection_parameters["token"] = "invalidToken"
46+
test_helper = AuthorizationTestHelper(connection_parameters)
47+
test_helper.connect_and_execute_simple_query()
48+
assert "Programmatic access token is invalid" in test_helper.get_error_msg()
49+
50+
51+
def get_pat_token(pat_command_variables) -> dict[str, Union[str, bool]]:
52+
okta_connection_parameters = (
53+
AuthConnectionParameters().get_okta_connection_parameters()
54+
)
55+
56+
pat_name = "PAT_PYTHON_" + generate_random_suffix()
57+
pat_command_variables["pat_name"] = pat_name
58+
command = (
59+
f"alter user {pat_command_variables['snowflake_user']} add programmatic access token {pat_name} "
60+
f"ROLE_RESTRICTION = '{pat_command_variables['role']}' DAYS_TO_EXPIRY=1;"
61+
)
62+
test_helper = AuthorizationTestHelper(okta_connection_parameters)
63+
pat_command_variables["token"] = (
64+
test_helper.connect_using_okta_connection_and_execute_custom_command(
65+
command, True
66+
)
67+
)
68+
return pat_command_variables
69+
70+
71+
def remove_pat_token(pat_command_variables: dict[str, Union[str, bool]]) -> None:
72+
okta_connection_parameters = (
73+
AuthConnectionParameters().get_okta_connection_parameters()
74+
)
75+
76+
command = f"alter user {pat_command_variables['snowflake_user']} remove programmatic access token {pat_command_variables['pat_name']};"
77+
test_helper = AuthorizationTestHelper(okta_connection_parameters)
78+
test_helper.connect_using_okta_connection_and_execute_custom_command(command)
79+
80+
81+
def generate_random_suffix() -> str:
82+
return datetime.now().strftime("%Y%m%d%H%M%S%f")

0 commit comments

Comments
 (0)