forked from BasedHardware/omi
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdependencies.py
More file actions
134 lines (98 loc) · 5 KB
/
dependencies.py
File metadata and controls
134 lines (98 loc) · 5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
from typing import List, Optional
from fastapi import Depends, HTTPException, Security
from fastapi.security import APIKeyHeader, HTTPAuthorizationCredentials, HTTPBearer
from firebase_admin import auth
import database.mcp_api_key as mcp_api_key_db
import database.dev_api_key as dev_api_key_db
from utils.scopes import Scopes, has_scope
import logging
logger = logging.getLogger(__name__)
bearer_scheme = HTTPBearer()
async def get_current_user_id(
credentials: HTTPAuthorizationCredentials = Security(bearer_scheme),
) -> str:
if not credentials:
raise HTTPException(status_code=401, detail="Not authenticated")
try:
id_token = credentials.credentials
decoded_token = auth.verify_id_token(id_token)
return decoded_token["uid"]
except Exception as e:
logger.error(f"Error verifying Firebase ID token: {e}")
raise HTTPException(status_code=401, detail="Invalid authentication credentials")
api_key_header = APIKeyHeader(name="Authorization", auto_error=False)
async def get_uid_from_mcp_api_key(api_key: str = Security(api_key_header)) -> str:
if not api_key or not api_key.startswith("Bearer "):
raise HTTPException(
status_code=401,
detail="Missing or invalid Authorization header. Must be 'Bearer API_KEY'",
)
token = api_key.replace("Bearer ", "")
user_id = mcp_api_key_db.get_user_id_by_api_key(token)
if not user_id:
raise HTTPException(status_code=401, detail="Invalid API Key")
return user_id
# Data structure to return from auth
class ApiKeyAuth:
def __init__(self, uid: str, scopes: Optional[List[str]]):
self.uid = uid
self.scopes = scopes
async def get_api_key_auth(api_key: str = Security(api_key_header)) -> ApiKeyAuth:
"""Extract user ID and scopes from API key"""
if not api_key or not api_key.startswith("Bearer "):
raise HTTPException(
status_code=401,
detail="Missing or invalid Authorization header. Must be 'Bearer API_KEY'",
)
token = api_key.replace("Bearer ", "")
user_data = dev_api_key_db.get_user_and_scopes_by_api_key(token)
if not user_data:
raise HTTPException(status_code=401, detail="Invalid API Key")
return ApiKeyAuth(uid=user_data["user_id"], scopes=user_data.get("scopes"))
async def get_uid_from_dev_api_key(api_key: str = Security(api_key_header)) -> str:
"""Legacy function for backward compatibility. Use scope-specific dependencies instead."""
auth_data = await get_api_key_auth(api_key)
return auth_data.uid
# Scope-specific dependencies
async def get_uid_with_conversations_read(auth: ApiKeyAuth = Depends(get_api_key_auth)) -> str:
if not has_scope(auth.scopes, Scopes.CONVERSATIONS_READ):
raise HTTPException(
status_code=403, detail=f"Insufficient permissions. Required scope: {Scopes.CONVERSATIONS_READ}"
)
return auth.uid
async def get_uid_with_conversations_write(auth: ApiKeyAuth = Depends(get_api_key_auth)) -> str:
if not has_scope(auth.scopes, Scopes.CONVERSATIONS_WRITE):
raise HTTPException(
status_code=403, detail=f"Insufficient permissions. Required scope: {Scopes.CONVERSATIONS_WRITE}"
)
return auth.uid
async def get_uid_with_memories_read(auth: ApiKeyAuth = Depends(get_api_key_auth)) -> str:
if not has_scope(auth.scopes, Scopes.MEMORIES_READ):
raise HTTPException(status_code=403, detail=f"Insufficient permissions. Required scope: {Scopes.MEMORIES_READ}")
return auth.uid
async def get_uid_with_memories_write(auth: ApiKeyAuth = Depends(get_api_key_auth)) -> str:
if not has_scope(auth.scopes, Scopes.MEMORIES_WRITE):
raise HTTPException(
status_code=403, detail=f"Insufficient permissions. Required scope: {Scopes.MEMORIES_WRITE}"
)
return auth.uid
async def get_uid_with_action_items_read(auth: ApiKeyAuth = Depends(get_api_key_auth)) -> str:
if not has_scope(auth.scopes, Scopes.ACTION_ITEMS_READ):
raise HTTPException(
status_code=403, detail=f"Insufficient permissions. Required scope: {Scopes.ACTION_ITEMS_READ}"
)
return auth.uid
async def get_uid_with_action_items_write(auth: ApiKeyAuth = Depends(get_api_key_auth)) -> str:
if not has_scope(auth.scopes, Scopes.ACTION_ITEMS_WRITE):
raise HTTPException(
status_code=403, detail=f"Insufficient permissions. Required scope: {Scopes.ACTION_ITEMS_WRITE}"
)
return auth.uid
async def get_uid_with_goals_read(auth: ApiKeyAuth = Depends(get_api_key_auth)) -> str:
if not has_scope(auth.scopes, Scopes.GOALS_READ):
raise HTTPException(status_code=403, detail=f"Insufficient permissions. Required scope: {Scopes.GOALS_READ}")
return auth.uid
async def get_uid_with_goals_write(auth: ApiKeyAuth = Depends(get_api_key_auth)) -> str:
if not has_scope(auth.scopes, Scopes.GOALS_WRITE):
raise HTTPException(status_code=403, detail=f"Insufficient permissions. Required scope: {Scopes.GOALS_WRITE}")
return auth.uid