Skip to content

Commit 3760962

Browse files
committed
feat(auth): Add interactive device auth flow via browser
1 parent cd64269 commit 3760962

File tree

3 files changed

+320
-1
lines changed

3 files changed

+320
-1
lines changed

src/amp/auth/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
Reads and manages auth tokens from ~/.amp-cli-config.
55
"""
66

7+
from .device_flow import interactive_device_login
78
from .models import AuthStorage, RefreshTokenResponse
89
from .service import AuthService
910

10-
__all__ = ['AuthService', 'AuthStorage', 'RefreshTokenResponse']
11+
__all__ = ['AuthService', 'AuthStorage', 'RefreshTokenResponse', 'interactive_device_login']

src/amp/auth/device_flow.py

Lines changed: 289 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,289 @@
1+
"""OAuth2 Device Authorization Flow for Privy authentication.
2+
3+
Implements the device authorization grant flow with PKCE for secure authentication.
4+
Matches the TypeScript CLI implementation.
5+
"""
6+
7+
import base64
8+
import hashlib
9+
import secrets
10+
import time
11+
import webbrowser
12+
from typing import Optional, Tuple
13+
14+
import httpx
15+
from pydantic import BaseModel, Field
16+
17+
from .models import AuthStorage
18+
from .service import AUTH_PLATFORM_URL
19+
20+
21+
class DeviceAuthorizationResponse(BaseModel):
22+
"""Response from device authorization endpoint."""
23+
24+
device_code: str = Field(..., description='Device verification code for polling')
25+
user_code: str = Field(..., description='Code for user to enter in browser')
26+
verification_uri: str = Field(..., description='URL where user enters the code')
27+
expires_in: int = Field(..., description='Seconds until device code expires')
28+
interval: int = Field(..., description='Minimum polling interval in seconds')
29+
30+
31+
class DeviceTokenResponse(BaseModel):
32+
"""Response from device token endpoint (success case)."""
33+
34+
access_token: str = Field(..., description='Access token for authenticated requests')
35+
refresh_token: str = Field(..., description='Refresh token for renewing access')
36+
user_id: str = Field(..., description='Authenticated user ID')
37+
user_accounts: list[str] = Field(..., description='List of user accounts/wallets')
38+
expires_in: int = Field(..., description='Seconds until token expires')
39+
40+
41+
class DeviceTokenPendingResponse(BaseModel):
42+
"""Response when authorization is still pending."""
43+
44+
error: str = Field('authorization_pending', description='Error code')
45+
46+
47+
class DeviceTokenExpiredResponse(BaseModel):
48+
"""Response when device code has expired."""
49+
50+
error: str = Field('expired_token', description='Error code')
51+
52+
53+
def generate_pkce_pair() -> Tuple[str, str]:
54+
"""Generate PKCE code_verifier and code_challenge.
55+
56+
Returns:
57+
Tuple of (code_verifier, code_challenge)
58+
"""
59+
# Generate cryptographically random code_verifier
60+
# Must be 43-128 characters using unreserved chars [A-Za-z0-9-._~]
61+
code_verifier_bytes = secrets.token_bytes(32)
62+
code_verifier = base64.urlsafe_b64encode(code_verifier_bytes).decode('utf-8').rstrip('=')
63+
64+
# Generate code_challenge = BASE64URL(SHA256(code_verifier))
65+
challenge_bytes = hashlib.sha256(code_verifier.encode('utf-8')).digest()
66+
code_challenge = base64.urlsafe_b64encode(challenge_bytes).decode('utf-8').rstrip('=')
67+
68+
return code_verifier, code_challenge
69+
70+
71+
def request_device_authorization(http_client: httpx.Client) -> Tuple[DeviceAuthorizationResponse, str]:
72+
"""Request device authorization from auth platform.
73+
74+
Args:
75+
http_client: HTTP client to use for request
76+
77+
Returns:
78+
Tuple of (DeviceAuthorizationResponse, code_verifier)
79+
80+
Raises:
81+
httpx.HTTPStatusError: If request fails
82+
ValueError: If response is invalid
83+
"""
84+
# Generate PKCE parameters
85+
code_verifier, code_challenge = generate_pkce_pair()
86+
87+
# Request device authorization
88+
url = f'{AUTH_PLATFORM_URL}api/v1/device/authorize'
89+
response = http_client.post(
90+
url, json={'code_challenge': code_challenge, 'code_challenge_method': 'S256'}, timeout=30.0
91+
)
92+
93+
if response.status_code != 200:
94+
raise ValueError(f'Device authorization failed: {response.status_code} - {response.text}')
95+
96+
device_auth = DeviceAuthorizationResponse.model_validate(response.json())
97+
return device_auth, code_verifier
98+
99+
100+
def poll_for_token(
101+
http_client: httpx.Client, device_code: str, code_verifier: str
102+
) -> Optional[DeviceTokenResponse]:
103+
"""Poll device token endpoint once.
104+
105+
Args:
106+
http_client: HTTP client to use for request
107+
device_code: Device code from authorization response
108+
code_verifier: PKCE code verifier
109+
110+
Returns:
111+
DeviceTokenResponse if auth complete, None if still pending
112+
113+
Raises:
114+
ValueError: If device code expired or other error
115+
"""
116+
url = f'{AUTH_PLATFORM_URL}api/v1/device/token'
117+
response = http_client.get(
118+
url, params={'device_code': device_code, 'code_verifier': code_verifier}, timeout=10.0
119+
)
120+
121+
data = response.json()
122+
123+
# Check for error responses
124+
if 'error' in data:
125+
error = data['error']
126+
if error == 'authorization_pending':
127+
return None # Still pending
128+
elif error == 'expired_token':
129+
raise ValueError('Device code expired. Please try again.')
130+
else:
131+
raise ValueError(f'Token polling error: {error}')
132+
133+
# Success - parse token response
134+
return DeviceTokenResponse.model_validate(data)
135+
136+
137+
def poll_until_authenticated(
138+
http_client: httpx.Client,
139+
device_code: str,
140+
code_verifier: str,
141+
interval: int,
142+
expires_in: int,
143+
on_poll: Optional[callable] = None,
144+
verbose: bool = False,
145+
) -> DeviceTokenResponse:
146+
"""Poll for token until authenticated or timeout.
147+
148+
Args:
149+
http_client: HTTP client to use for requests
150+
device_code: Device code from authorization
151+
code_verifier: PKCE code verifier
152+
interval: Minimum polling interval in seconds
153+
expires_in: Seconds until device code expires
154+
on_poll: Optional callback called on each poll attempt
155+
156+
Returns:
157+
DeviceTokenResponse when authentication completes
158+
159+
Raises:
160+
ValueError: If authentication times out or fails
161+
"""
162+
start_time = time.time()
163+
poll_count = 0
164+
max_polls = int(expires_in / interval) + 5 # Add some buffer
165+
166+
while poll_count < max_polls:
167+
elapsed = time.time() - start_time
168+
if elapsed > expires_in:
169+
raise ValueError('Authentication timed out. Please try again.')
170+
171+
if on_poll:
172+
on_poll(poll_count, elapsed)
173+
174+
# Poll for token
175+
try:
176+
token_response = poll_for_token(http_client, device_code, code_verifier)
177+
if token_response:
178+
return token_response
179+
except ValueError as e:
180+
if 'expired' in str(e).lower():
181+
raise
182+
# Other errors, log and continue polling
183+
if verbose:
184+
print(f'\n⚠ Polling error (will retry): {e}')
185+
pass
186+
except Exception as e:
187+
# Log unexpected errors
188+
if verbose:
189+
print(f'\n⚠ Unexpected error (will retry): {type(e).__name__}: {e}')
190+
pass
191+
192+
# Wait before next poll
193+
time.sleep(interval)
194+
poll_count += 1
195+
196+
raise ValueError('Authentication timed out. Please try again.')
197+
198+
199+
def open_browser(url: str) -> bool:
200+
"""Open URL in browser.
201+
202+
Args:
203+
url: URL to open
204+
205+
Returns:
206+
True if browser opened successfully
207+
"""
208+
try:
209+
webbrowser.open(url)
210+
return True
211+
except Exception:
212+
return False
213+
214+
215+
def interactive_device_login(
216+
verbose: bool = True, auto_open_browser: bool = True
217+
) -> AuthStorage:
218+
"""Perform interactive device authorization flow.
219+
220+
Args:
221+
verbose: Print progress messages
222+
auto_open_browser: Automatically open browser for user
223+
224+
Returns:
225+
AuthStorage with tokens
226+
227+
Raises:
228+
ValueError: If authentication fails
229+
"""
230+
http_client = httpx.Client()
231+
232+
try:
233+
# Step 1: Request device authorization
234+
if verbose:
235+
print('🔐 Starting authentication...\n')
236+
237+
device_auth, code_verifier = request_device_authorization(http_client)
238+
239+
# Step 2: Display user code and open browser
240+
if verbose:
241+
print(f'📱 Verification Code: {device_auth.user_code}')
242+
print(f'🌐 Verification URL: {device_auth.verification_uri}\n')
243+
244+
if auto_open_browser:
245+
if verbose:
246+
print('Opening browser...')
247+
if open_browser(device_auth.verification_uri):
248+
if verbose:
249+
print('✓ Browser opened')
250+
else:
251+
if verbose:
252+
print('✗ Could not open browser automatically')
253+
print(f' Please open: {device_auth.verification_uri}')
254+
255+
if verbose:
256+
print(f'\n⏳ Waiting for authentication (expires in {device_auth.expires_in}s)...')
257+
print(' Complete the authentication in your browser.\n')
258+
259+
# Step 3: Poll for token
260+
spinner_frames = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏']
261+
262+
def poll_callback(count: int, elapsed: float):
263+
if verbose:
264+
spinner = spinner_frames[count % len(spinner_frames)]
265+
print(f'\r{spinner} Polling... ({int(elapsed)}s elapsed)', end='', flush=True)
266+
267+
token_response = poll_until_authenticated(
268+
http_client, device_auth.device_code, code_verifier, device_auth.interval, device_auth.expires_in, poll_callback, verbose=verbose
269+
)
270+
271+
if verbose:
272+
print('\r✓ Authentication successful! \n')
273+
274+
# Step 4: Create auth storage
275+
now_ms = int(time.time() * 1000)
276+
expiry_ms = now_ms + (token_response.expires_in * 1000)
277+
278+
auth_storage = AuthStorage(
279+
accessToken=token_response.access_token,
280+
refreshToken=token_response.refresh_token,
281+
userId=token_response.user_id,
282+
accounts=token_response.user_accounts,
283+
expiry=expiry_ms,
284+
)
285+
286+
return auth_storage
287+
288+
finally:
289+
http_client.close()

src/amp/auth/service.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,3 +225,32 @@ def __enter__(self):
225225
def __exit__(self, exc_type, exc_val, exc_tb):
226226
"""Context manager exit."""
227227
self._http.close()
228+
229+
def login(self, verbose: bool = True, auto_open_browser: bool = True) -> None:
230+
"""Perform interactive browser-based login.
231+
232+
Opens browser for OAuth2 device authorization flow with PKCE.
233+
Saves authentication tokens to ~/.amp-cli-config/amp_cli_auth.
234+
235+
Args:
236+
verbose: Print progress messages
237+
auto_open_browser: Automatically open browser
238+
239+
Raises:
240+
ValueError: If authentication fails
241+
242+
Example:
243+
>>> auth = AuthService()
244+
>>> auth.login() # Opens browser for authentication
245+
>>> # Auth tokens saved to ~/.amp-cli-config/amp_cli_auth
246+
"""
247+
from .device_flow import interactive_device_login
248+
249+
# Perform device authorization flow
250+
auth_storage = interactive_device_login(verbose=verbose, auto_open_browser=auto_open_browser)
251+
252+
# Save to config file
253+
self.save_auth(auth_storage)
254+
255+
if verbose:
256+
print(f'✓ Authentication saved to {self.config_path}')

0 commit comments

Comments
 (0)