Skip to content

Commit 85d51b6

Browse files
committed
tests: Add unit tests for auth service
1 parent be4f352 commit 85d51b6

File tree

1 file changed

+346
-0
lines changed

1 file changed

+346
-0
lines changed

tests/unit/test_auth_service.py

Lines changed: 346 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,346 @@
1+
"""Unit tests for Privy authentication service."""
2+
3+
import json
4+
import time
5+
from pathlib import Path
6+
from unittest.mock import Mock, patch
7+
8+
import pytest
9+
10+
from src.amp.auth.models import AuthStorage, RefreshTokenResponse, RefreshTokenResponseUser
11+
from src.amp.auth.service import AuthService
12+
13+
14+
@pytest.mark.unit
15+
class TestAuthService:
16+
"""Test AuthService functionality."""
17+
18+
def test_is_authenticated_when_no_file(self, tmp_path):
19+
"""Test is_authenticated returns False when config file doesn't exist."""
20+
config_path = tmp_path / 'nonexistent.json'
21+
auth = AuthService(config_path=config_path)
22+
23+
assert auth.is_authenticated() is False
24+
25+
def test_is_authenticated_when_no_auth_data(self, tmp_path):
26+
"""Test is_authenticated returns False when config exists but has no auth data."""
27+
config_path = tmp_path / 'config.json'
28+
config_path.write_text('{}')
29+
30+
auth = AuthService(config_path=config_path)
31+
32+
assert auth.is_authenticated() is False
33+
34+
def test_is_authenticated_when_auth_exists(self, tmp_path):
35+
"""Test is_authenticated returns True when valid auth exists."""
36+
config_path = tmp_path / 'amp_cli_auth'
37+
auth_data = {
38+
'accessToken': 'test-access-token',
39+
'refreshToken': 'test-refresh-token',
40+
'userId': 'did:privy:test123',
41+
'accounts': ['0x123'],
42+
'expiry': int(time.time() * 1000) + 3600000, # 1 hour from now
43+
}
44+
config_path.write_text(json.dumps(auth_data))
45+
46+
auth = AuthService(config_path=config_path)
47+
48+
assert auth.is_authenticated() is True
49+
50+
def test_load_auth_success(self, tmp_path):
51+
"""Test loading auth from config file."""
52+
config_path = tmp_path / 'amp_cli_auth'
53+
auth_data = {
54+
'accessToken': 'test-access-token',
55+
'refreshToken': 'test-refresh-token',
56+
'userId': 'did:privy:test123',
57+
'accounts': ['0x123'],
58+
'expiry': 1234567890000,
59+
}
60+
config_path.write_text(json.dumps(auth_data))
61+
62+
auth = AuthService(config_path=config_path)
63+
result = auth.load_auth()
64+
65+
assert result is not None
66+
assert result.accessToken == 'test-access-token'
67+
assert result.refreshToken == 'test-refresh-token'
68+
assert result.userId == 'did:privy:test123'
69+
assert result.accounts == ['0x123']
70+
assert result.expiry == 1234567890000
71+
72+
def test_load_auth_returns_none_when_missing(self, tmp_path):
73+
"""Test load_auth returns None when config doesn't exist."""
74+
config_path = tmp_path / 'nonexistent.json'
75+
auth = AuthService(config_path=config_path)
76+
77+
result = auth.load_auth()
78+
79+
assert result is None
80+
81+
def test_save_auth(self, tmp_path):
82+
"""Test saving auth to config file."""
83+
config_path = tmp_path / 'amp_cli_auth'
84+
auth = AuthService(config_path=config_path)
85+
86+
auth_storage = AuthStorage(
87+
accessToken='new-access-token',
88+
refreshToken='new-refresh-token',
89+
userId='did:privy:test456',
90+
accounts=['0x456'],
91+
expiry=9876543210000,
92+
)
93+
94+
auth.save_auth(auth_storage)
95+
96+
# Verify file was created
97+
assert config_path.exists()
98+
99+
# Verify content
100+
with open(config_path) as f:
101+
saved_data = json.load(f)
102+
103+
assert saved_data['accessToken'] == 'new-access-token'
104+
assert saved_data['userId'] == 'did:privy:test456'
105+
106+
def test_save_auth_overwrites_existing(self, tmp_path):
107+
"""Test saving auth overwrites existing auth data."""
108+
config_path = tmp_path / 'amp_cli_auth'
109+
initial_data = {
110+
'accessToken': 'old-token',
111+
'refreshToken': 'old-refresh',
112+
'userId': 'did:privy:old',
113+
'accounts': [],
114+
'expiry': 12345,
115+
}
116+
config_path.write_text(json.dumps(initial_data))
117+
118+
auth = AuthService(config_path=config_path)
119+
auth_storage = AuthStorage(
120+
accessToken='new-token',
121+
refreshToken='new-refresh',
122+
userId='did:privy:new',
123+
accounts=['0x123'],
124+
expiry=67890,
125+
)
126+
127+
auth.save_auth(auth_storage)
128+
129+
# Verify new data saved
130+
with open(config_path) as f:
131+
saved_data = json.load(f)
132+
133+
assert saved_data['accessToken'] == 'new-token'
134+
assert saved_data['userId'] == 'did:privy:new'
135+
assert saved_data['accounts'] == ['0x123']
136+
137+
def test_get_token_raises_when_not_authenticated(self, tmp_path):
138+
"""Test get_token raises error when not authenticated."""
139+
config_path = tmp_path / 'nonexistent.json'
140+
auth = AuthService(config_path=config_path)
141+
142+
with pytest.raises(FileNotFoundError, match='Not authenticated'):
143+
auth.get_token()
144+
145+
def test_get_token_returns_valid_token(self, tmp_path):
146+
"""Test get_token returns token when valid and not expired."""
147+
config_path = tmp_path / 'amp_cli_auth'
148+
future_expiry = int(time.time() * 1000) + 3600000 # 1 hour from now
149+
auth_data = {
150+
'accessToken': 'valid-token',
151+
'refreshToken': 'refresh-token',
152+
'userId': 'did:privy:test',
153+
'accounts': ['0x123'],
154+
'expiry': future_expiry,
155+
}
156+
config_path.write_text(json.dumps(auth_data))
157+
158+
auth = AuthService(config_path=config_path)
159+
token = auth.get_token()
160+
161+
assert token == 'valid-token'
162+
163+
164+
@pytest.mark.unit
165+
class TestAuthServiceRefresh:
166+
"""Test token refresh functionality."""
167+
168+
def test_needs_refresh_when_missing_expiry(self):
169+
"""Test needs refresh when expiry field is missing."""
170+
auth = AuthService()
171+
auth_storage = AuthStorage(
172+
accessToken='token',
173+
refreshToken='refresh',
174+
userId='did:privy:test',
175+
accounts=['0x123'],
176+
expiry=None, # Missing expiry
177+
)
178+
179+
assert auth._needs_refresh(auth_storage) is True
180+
181+
def test_needs_refresh_when_missing_accounts(self):
182+
"""Test needs refresh when accounts field is missing."""
183+
auth = AuthService()
184+
auth_storage = AuthStorage(
185+
accessToken='token',
186+
refreshToken='refresh',
187+
userId='did:privy:test',
188+
accounts=None, # Missing accounts
189+
expiry=int(time.time() * 1000) + 3600000,
190+
)
191+
192+
assert auth._needs_refresh(auth_storage) is True
193+
194+
def test_needs_refresh_when_expired(self):
195+
"""Test needs refresh when token is expired."""
196+
auth = AuthService()
197+
past_expiry = int(time.time() * 1000) - 1000 # 1 second ago
198+
auth_storage = AuthStorage(
199+
accessToken='token',
200+
refreshToken='refresh',
201+
userId='did:privy:test',
202+
accounts=['0x123'],
203+
expiry=past_expiry,
204+
)
205+
206+
assert auth._needs_refresh(auth_storage) is True
207+
208+
def test_needs_refresh_when_expiring_soon(self):
209+
"""Test needs refresh when token expires within 5 minutes."""
210+
auth = AuthService()
211+
soon_expiry = int(time.time() * 1000) + (4 * 60 * 1000) # 4 minutes from now
212+
auth_storage = AuthStorage(
213+
accessToken='token',
214+
refreshToken='refresh',
215+
userId='did:privy:test',
216+
accounts=['0x123'],
217+
expiry=soon_expiry,
218+
)
219+
220+
assert auth._needs_refresh(auth_storage) is True
221+
222+
def test_does_not_need_refresh_when_valid(self):
223+
"""Test does not need refresh when token is valid and not expiring soon."""
224+
auth = AuthService()
225+
future_expiry = int(time.time() * 1000) + (10 * 60 * 1000) # 10 minutes from now
226+
auth_storage = AuthStorage(
227+
accessToken='token',
228+
refreshToken='refresh',
229+
userId='did:privy:test',
230+
accounts=['0x123'],
231+
expiry=future_expiry,
232+
)
233+
234+
assert auth._needs_refresh(auth_storage) is False
235+
236+
@patch('httpx.Client.post')
237+
def test_refresh_token_success(self, mock_post, tmp_path):
238+
"""Test successful token refresh."""
239+
config_path = tmp_path / 'config.json'
240+
241+
# Mock HTTP response
242+
mock_response = Mock()
243+
mock_response.status_code = 200
244+
mock_response.json.return_value = {
245+
'token': 'new-access-token',
246+
'refresh_token': 'new-refresh-token',
247+
'session_update_action': 'update',
248+
'expires_in': 3600,
249+
'user': {'id': 'did:privy:test', 'accounts': ['0x123', '0x456']},
250+
}
251+
mock_post.return_value = mock_response
252+
253+
auth = AuthService(config_path=config_path)
254+
old_auth = AuthStorage(
255+
accessToken='old-token',
256+
refreshToken='old-refresh',
257+
userId='did:privy:test',
258+
accounts=['0x123'],
259+
expiry=12345,
260+
)
261+
262+
new_auth = auth.refresh_token(old_auth)
263+
264+
# Verify new tokens
265+
assert new_auth.accessToken == 'new-access-token'
266+
assert new_auth.refreshToken == 'new-refresh-token'
267+
assert new_auth.userId == 'did:privy:test'
268+
assert new_auth.accounts == ['0x123', '0x456']
269+
assert new_auth.expiry > int(time.time() * 1000)
270+
271+
# Verify saved to file
272+
assert config_path.exists()
273+
274+
@patch('httpx.Client.post')
275+
def test_refresh_token_user_id_mismatch(self, mock_post, tmp_path):
276+
"""Test refresh fails when user ID doesn't match."""
277+
config_path = tmp_path / 'config.json'
278+
279+
# Mock HTTP response with different user ID
280+
mock_response = Mock()
281+
mock_response.status_code = 200
282+
mock_response.json.return_value = {
283+
'token': 'new-token',
284+
'refresh_token': 'new-refresh',
285+
'session_update_action': 'update',
286+
'expires_in': 3600,
287+
'user': {'id': 'did:privy:different-user', 'accounts': []},
288+
}
289+
mock_post.return_value = mock_response
290+
291+
auth = AuthService(config_path=config_path)
292+
old_auth = AuthStorage(
293+
accessToken='old-token',
294+
refreshToken='old-refresh',
295+
userId='did:privy:original-user',
296+
accounts=[],
297+
expiry=12345,
298+
)
299+
300+
with pytest.raises(ValueError, match='User ID mismatch'):
301+
auth.refresh_token(old_auth)
302+
303+
@patch('httpx.Client.post')
304+
def test_refresh_token_401_error(self, mock_post, tmp_path):
305+
"""Test refresh handles 401 authentication error."""
306+
config_path = tmp_path / 'config.json'
307+
308+
# Mock 401 response
309+
mock_response = Mock()
310+
mock_response.status_code = 401
311+
mock_post.return_value = mock_response
312+
313+
auth = AuthService(config_path=config_path)
314+
old_auth = AuthStorage(
315+
accessToken='old-token',
316+
refreshToken='old-refresh',
317+
userId='did:privy:test',
318+
accounts=[],
319+
expiry=12345,
320+
)
321+
322+
with pytest.raises(ValueError, match='Authentication expired'):
323+
auth.refresh_token(old_auth)
324+
325+
@patch('httpx.Client.post')
326+
def test_refresh_token_429_rate_limit(self, mock_post, tmp_path):
327+
"""Test refresh handles 429 rate limit error."""
328+
config_path = tmp_path / 'config.json'
329+
330+
# Mock 429 response
331+
mock_response = Mock()
332+
mock_response.status_code = 429
333+
mock_response.headers = {'retry-after': '120'}
334+
mock_post.return_value = mock_response
335+
336+
auth = AuthService(config_path=config_path)
337+
old_auth = AuthStorage(
338+
accessToken='old-token',
339+
refreshToken='old-refresh',
340+
userId='did:privy:test',
341+
accounts=[],
342+
expiry=12345,
343+
)
344+
345+
with pytest.raises(ValueError, match='rate limited'):
346+
auth.refresh_token(old_auth)

0 commit comments

Comments
 (0)