Skip to content

Commit 87fadcb

Browse files
committed
Add tests
1 parent 29f4c73 commit 87fadcb

File tree

7 files changed

+405
-19
lines changed

7 files changed

+405
-19
lines changed

tests/test_session.py

Lines changed: 333 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,333 @@
1+
import pytest
2+
from unittest.mock import Mock, patch
3+
import jwt
4+
from jwt import PyJWKClient
5+
from datetime import datetime, timezone
6+
7+
from workos.session import SessionModule
8+
from workos.types.user_management.authentication_response import RefreshTokenAuthenticationResponse
9+
from workos.types.user_management.session import (
10+
AuthenticateWithSessionCookieFailureReason,
11+
AuthenticateWithSessionCookieSuccessResponse,
12+
RefreshWithSessionCookieErrorResponse,
13+
RefreshWithSessionCookieSuccessResponse,
14+
)
15+
from workos.types.user_management.user import User
16+
17+
from cryptography.hazmat.primitives import serialization
18+
from cryptography.hazmat.primitives.asymmetric import rsa
19+
20+
@pytest.fixture(scope="session")
21+
def TEST_CONSTANTS():
22+
# Generate RSA key pair for testing
23+
private_key = rsa.generate_private_key(
24+
public_exponent=65537,
25+
key_size=2048
26+
)
27+
28+
public_key = private_key.public_key()
29+
30+
# Get the private key in PEM format
31+
private_pem = private_key.private_bytes(
32+
encoding=serialization.Encoding.PEM,
33+
format=serialization.PrivateFormat.PKCS8,
34+
encryption_algorithm=serialization.NoEncryption()
35+
)
36+
37+
return {
38+
"COOKIE_PASSWORD": "pfSqwTFXUTGEBBD1RQh2kt/oNJYxBgaoZan4Z8sMrKU=",
39+
"SESSION_DATA": "session_data",
40+
"CLIENT_ID": "client_123",
41+
"USER_ID": "user_123",
42+
"SESSION_ID": "session_123",
43+
"ORGANIZATION_ID": "organization_123",
44+
"CURRENT_TIMESTAMP": str(datetime.now(timezone.utc)),
45+
"PRIVATE_KEY": private_pem,
46+
"PUBLIC_KEY": public_key,
47+
"TEST_TOKEN": jwt.encode(
48+
{
49+
"sid": "session_123",
50+
"org_id": "organization_123",
51+
"role": "admin",
52+
"permissions": ["read"],
53+
"entitlements": ["feature_1"],
54+
"exp": int(datetime.now(timezone.utc).timestamp()) + 3600,
55+
"iat": int(datetime.now(timezone.utc).timestamp()),
56+
},
57+
private_pem,
58+
algorithm="RS256"
59+
)
60+
}
61+
62+
@pytest.fixture
63+
def mock_user_management(TEST_CONSTANTS):
64+
mock_jwks = Mock(spec=PyJWKClient)
65+
mock_jwk_set = Mock()
66+
mock_jwk_set.keys = [Mock(Algorithm="RS256")]
67+
mock_jwks.get_jwk_set.return_value = mock_jwk_set
68+
69+
70+
71+
mock = Mock()
72+
mock.get_jwks_url.return_value = "https://api.workos.com/user_management/sso/jwks/client_123"
73+
mock.authenticate_with_refresh_token.return_value = RefreshTokenAuthenticationResponse(
74+
**{
75+
"access_token": "access_token_123",
76+
"refresh_token": "refresh_token_123",
77+
"user": {
78+
"object": "user",
79+
"id": TEST_CONSTANTS["USER_ID"],
80+
"email": "[email protected]",
81+
"first_name": "Test",
82+
"last_name": "User",
83+
"email_verified": True,
84+
"created_at": TEST_CONSTANTS["CURRENT_TIMESTAMP"],
85+
"updated_at": TEST_CONSTANTS["CURRENT_TIMESTAMP"],
86+
}
87+
}
88+
)
89+
return mock
90+
91+
def test_initialize_session_module(TEST_CONSTANTS, mock_user_management):
92+
session = SessionModule(
93+
user_management=mock_user_management,
94+
client_id=TEST_CONSTANTS["CLIENT_ID"],
95+
session_data=TEST_CONSTANTS["SESSION_DATA"],
96+
cookie_password=TEST_CONSTANTS["COOKIE_PASSWORD"]
97+
)
98+
99+
assert session.client_id == TEST_CONSTANTS["CLIENT_ID"]
100+
assert session.cookie_password is not None
101+
102+
def test_initialize_without_cookie_password(mock_user_management):
103+
with pytest.raises(ValueError, match="cookie_password is required"):
104+
SessionModule(
105+
user_management=mock_user_management,
106+
client_id="client_123",
107+
session_data="session_data",
108+
cookie_password=""
109+
)
110+
111+
def test_authenticate_no_session_cookie_provided(TEST_CONSTANTS, mock_user_management):
112+
session = SessionModule(
113+
user_management=mock_user_management,
114+
client_id=TEST_CONSTANTS["CLIENT_ID"],
115+
session_data=None,
116+
cookie_password=TEST_CONSTANTS["COOKIE_PASSWORD"]
117+
)
118+
119+
response = session.authenticate()
120+
121+
assert response.reason == AuthenticateWithSessionCookieFailureReason.NO_SESSION_COOKIE_PROVIDED
122+
123+
def test_authenticate_invalid_session_cookie(TEST_CONSTANTS, mock_user_management):
124+
session = SessionModule(
125+
user_management=mock_user_management,
126+
client_id=TEST_CONSTANTS["CLIENT_ID"],
127+
session_data="invalid_session_data",
128+
cookie_password=TEST_CONSTANTS["COOKIE_PASSWORD"]
129+
)
130+
131+
response = session.authenticate()
132+
133+
assert response.reason == AuthenticateWithSessionCookieFailureReason.INVALID_SESSION_COOKIE
134+
135+
def test_authenticate_invalid_jwt(TEST_CONSTANTS, mock_user_management):
136+
invalid_session_data = SessionModule.seal_data({ "access_token": "invalid_session_data" }, TEST_CONSTANTS["COOKIE_PASSWORD"])
137+
session = SessionModule(
138+
user_management=mock_user_management,
139+
client_id=TEST_CONSTANTS["CLIENT_ID"],
140+
session_data=invalid_session_data,
141+
cookie_password=TEST_CONSTANTS["COOKIE_PASSWORD"]
142+
)
143+
144+
response = session.authenticate()
145+
146+
assert response.reason == AuthenticateWithSessionCookieFailureReason.INVALID_JWT
147+
148+
def test_authenticate_success(TEST_CONSTANTS, mock_user_management):
149+
session = SessionModule(
150+
user_management=mock_user_management,
151+
client_id=TEST_CONSTANTS["CLIENT_ID"],
152+
session_data=TEST_CONSTANTS["SESSION_DATA"],
153+
cookie_password=TEST_CONSTANTS["COOKIE_PASSWORD"]
154+
)
155+
156+
# Mock the session data that would be unsealed
157+
mock_session = {
158+
"access_token": jwt.encode(
159+
{
160+
"sid": TEST_CONSTANTS["SESSION_ID"],
161+
"org_id": TEST_CONSTANTS["ORGANIZATION_ID"],
162+
"role": "admin",
163+
"permissions": ["read"],
164+
"entitlements": ["feature_1"],
165+
"exp": int(datetime.now(timezone.utc).timestamp()) + 3600,
166+
"iat": int(datetime.now(timezone.utc).timestamp()),
167+
},
168+
TEST_CONSTANTS["PRIVATE_KEY"],
169+
algorithm="RS256"
170+
),
171+
"user": {
172+
"object": "user",
173+
"id": TEST_CONSTANTS["USER_ID"],
174+
"email": "[email protected]",
175+
"email_verified": True,
176+
"created_at": TEST_CONSTANTS["CURRENT_TIMESTAMP"],
177+
"updated_at": TEST_CONSTANTS["CURRENT_TIMESTAMP"],
178+
},
179+
"impersonator": None
180+
}
181+
182+
# Mock the JWT payload that would be decoded
183+
mock_jwt_payload = {
184+
"sid": TEST_CONSTANTS["SESSION_ID"],
185+
"org_id": TEST_CONSTANTS["ORGANIZATION_ID"],
186+
"role": "admin",
187+
"permissions": ["read"],
188+
"entitlements": ["feature_1"]
189+
}
190+
191+
with (
192+
# Mock unsealing the session data
193+
patch.object(
194+
SessionModule,
195+
"unseal_data",
196+
return_value=mock_session
197+
),
198+
# Mock JWT validation
199+
patch.object(
200+
session,
201+
"is_valid_jwt",
202+
return_value=True
203+
),
204+
# Mock JWT decoding
205+
patch(
206+
"jwt.decode",
207+
return_value=mock_jwt_payload
208+
),
209+
# Mock JWT signing key retrieval
210+
patch.object(
211+
session.jwks,
212+
"get_signing_key_from_jwt",
213+
return_value=Mock(key=TEST_CONSTANTS["PUBLIC_KEY"])
214+
)
215+
):
216+
response = session.authenticate()
217+
218+
assert isinstance(response, AuthenticateWithSessionCookieSuccessResponse)
219+
assert response.authenticated is True
220+
assert response.session_id == TEST_CONSTANTS["SESSION_ID"]
221+
assert response.organization_id == TEST_CONSTANTS["ORGANIZATION_ID"]
222+
assert response.role == "admin"
223+
assert response.permissions == ["read"]
224+
assert response.entitlements == ["feature_1"]
225+
assert response.user.id == TEST_CONSTANTS["USER_ID"]
226+
assert response.impersonator is None
227+
228+
def test_refresh_invalid_session_cookie(TEST_CONSTANTS, mock_user_management):
229+
session = SessionModule(
230+
user_management=mock_user_management,
231+
client_id=TEST_CONSTANTS["CLIENT_ID"],
232+
session_data="invalid_session_data",
233+
cookie_password=TEST_CONSTANTS["COOKIE_PASSWORD"]
234+
)
235+
236+
response = session.refresh()
237+
238+
assert isinstance(response, RefreshWithSessionCookieErrorResponse)
239+
assert response.reason == AuthenticateWithSessionCookieFailureReason.INVALID_SESSION_COOKIE
240+
241+
def test_refresh_success(TEST_CONSTANTS, mock_user_management):
242+
# Create mock JWKS client
243+
mock_jwks = Mock(spec=PyJWKClient)
244+
mock_signing_key = Mock()
245+
mock_signing_key.key = TEST_CONSTANTS["PUBLIC_KEY"]
246+
mock_jwks.get_signing_key_from_jwt.return_value = mock_signing_key
247+
248+
test_user = {
249+
"object": "user",
250+
"id": TEST_CONSTANTS["USER_ID"],
251+
"email": "[email protected]",
252+
"first_name": "Test",
253+
"last_name": "User",
254+
"email_verified": True,
255+
"created_at": TEST_CONSTANTS["CURRENT_TIMESTAMP"],
256+
"updated_at": TEST_CONSTANTS["CURRENT_TIMESTAMP"],
257+
}
258+
259+
session_data = SessionModule.seal_data({
260+
"refresh_token": "refresh_token_12345",
261+
"user": test_user
262+
}, TEST_CONSTANTS["COOKIE_PASSWORD"])
263+
264+
mock_response = {
265+
"access_token": TEST_CONSTANTS["TEST_TOKEN"],
266+
"refresh_token": "refresh_token_123",
267+
"sealed_session": session_data,
268+
"user": test_user
269+
}
270+
271+
mock_user_management.authenticate_with_refresh_token.return_value = RefreshTokenAuthenticationResponse(
272+
**mock_response
273+
)
274+
275+
with (
276+
patch(
277+
'workos.session.PyJWKClient',
278+
return_value=mock_jwks
279+
),
280+
):
281+
session = SessionModule(
282+
user_management=mock_user_management,
283+
client_id=TEST_CONSTANTS["CLIENT_ID"],
284+
session_data=session_data,
285+
cookie_password=TEST_CONSTANTS["COOKIE_PASSWORD"]
286+
)
287+
288+
with (
289+
patch.object(
290+
session,
291+
"is_valid_jwt",
292+
return_value=True
293+
),
294+
patch(
295+
"jwt.decode",
296+
return_value={
297+
"sid": TEST_CONSTANTS["SESSION_ID"],
298+
"org_id": TEST_CONSTANTS["ORGANIZATION_ID"],
299+
"role": "admin",
300+
"permissions": ["read"],
301+
"entitlements": ["feature_1"]
302+
}
303+
)
304+
):
305+
response = session.refresh()
306+
307+
assert isinstance(response, RefreshWithSessionCookieSuccessResponse)
308+
assert response.authenticated is True
309+
assert response.user.id == test_user["id"]
310+
311+
# Verify the refresh token was used correctly
312+
mock_user_management.authenticate_with_refresh_token.assert_called_once_with(
313+
refresh_token="refresh_token_12345",
314+
organization_id=None,
315+
session={
316+
"seal_session": True,
317+
"cookie_password": TEST_CONSTANTS["COOKIE_PASSWORD"]
318+
}
319+
)
320+
321+
322+
def test_seal_data(TEST_CONSTANTS):
323+
test_data = {"test": "data"}
324+
sealed = SessionModule.seal_data(test_data, TEST_CONSTANTS["COOKIE_PASSWORD"])
325+
assert isinstance(sealed, str)
326+
327+
# Test unsealing
328+
unsealed = SessionModule.unseal_data(sealed, TEST_CONSTANTS["COOKIE_PASSWORD"])
329+
assert unsealed == test_data
330+
331+
def test_unseal_invalid_data(TEST_CONSTANTS):
332+
with pytest.raises(Exception): # Adjust exception type based on your implementation
333+
SessionModule.unseal_data("invalid_sealed_data", TEST_CONSTANTS["COOKIE_PASSWORD"])

tests/test_user_management.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,12 @@ def base_authentication_params(self):
6060

6161
@pytest.fixture
6262
def mock_auth_refresh_token_response(self):
63+
user = MockUser("user_01H7ZGXFP5C6BBQY6Z7277ZCT0").dict()
64+
6365
return {
6466
"access_token": "access_token_12345",
6567
"refresh_token": "refresh_token_12345",
68+
"user": user,
6669
}
6770

6871
@pytest.fixture

0 commit comments

Comments
 (0)