|
| 1 | +import json |
1 | 2 | import os |
2 | 3 | from typing import Optional |
3 | 4 |
|
4 | 5 | from .constants import DEFAULT_CREDENTIALS_PATH |
| 6 | +from .models import UserCredentials |
5 | 7 |
|
6 | 8 |
|
7 | | -def save_api_key_credentials(api_key: str): |
8 | | - """Save an API key to the credentials file. |
| 9 | +def save_credentials(credentials: UserCredentials) -> None: |
| 10 | + """Save user credentials to the credentials file. |
9 | 11 |
|
10 | 12 | Args: |
11 | | - api_key: API key to persist |
| 13 | + credentials: UserCredentials object to persist |
12 | 14 |
|
13 | 15 | Returns: |
14 | 16 | None |
15 | 17 | """ |
16 | 18 | credentials_path = os.path.expanduser(DEFAULT_CREDENTIALS_PATH) |
17 | 19 | os.makedirs(os.path.dirname(credentials_path), exist_ok=True) |
| 20 | + |
| 21 | + # Create file with restricted permissions (0600) to prevent leakage |
18 | 22 | with open(credentials_path, "w", encoding="utf-8") as f: |
19 | | - f.write(api_key) |
| 23 | + f.write(credentials.to_json()) |
| 24 | + os.chmod(credentials_path, 0o600) |
20 | 25 |
|
21 | 26 |
|
22 | | -def load_api_key_credentials() -> Optional[str]: |
23 | | - """Load an API key from the credentials file. |
| 27 | +def load_credentials() -> Optional[UserCredentials]: |
| 28 | + """Load user credentials from the credentials file. |
24 | 29 |
|
25 | 30 | Returns: |
26 | | - String. API key if it exists, None otherwise |
| 31 | + UserCredentials object if it exists, None otherwise |
27 | 32 | """ |
28 | 33 | credentials_path = os.path.expanduser(DEFAULT_CREDENTIALS_PATH) |
29 | 34 | if os.path.exists(credentials_path): |
30 | | - with open(credentials_path, "r", encoding="utf-8") as f: |
31 | | - return f.read().strip() |
| 35 | + try: |
| 36 | + with open(credentials_path, "r", encoding="utf-8") as f: |
| 37 | + return UserCredentials.from_json(f.read()) |
| 38 | + except (json.JSONDecodeError, KeyError, ValueError): |
| 39 | + # Handle corrupted or old format credentials |
| 40 | + return None |
32 | 41 | return None |
| 42 | + |
| 43 | + |
| 44 | +def clear_credentials() -> bool: |
| 45 | + """Clear stored credentials. |
| 46 | +
|
| 47 | + Returns: |
| 48 | + bool: True if credentials were cleared, False if none existed |
| 49 | + """ |
| 50 | + credentials_path = os.path.expanduser(DEFAULT_CREDENTIALS_PATH) |
| 51 | + if os.path.exists(credentials_path): |
| 52 | + os.remove(credentials_path) |
| 53 | + return True |
| 54 | + return False |
| 55 | + |
| 56 | + |
| 57 | +def load_api_key_credentials() -> Optional[str]: |
| 58 | + """Load an API key from the credentials file (backward compatibility). |
| 59 | +
|
| 60 | + Returns: |
| 61 | + String. API key if it exists, None otherwise |
| 62 | + """ |
| 63 | + credentials = load_credentials() |
| 64 | + return credentials.api_key if credentials else None |
0 commit comments