33
33
tools = await toolbox.load_toolset()
34
34
"""
35
35
36
+ import asyncio
36
37
from datetime import datetime , timedelta , timezone
37
38
from typing import Any , Dict
39
+
38
40
import google .auth
39
41
from google .auth .exceptions import GoogleAuthError
40
- from google .auth .transport .requests import Request , AuthorizedSession
42
+ from google .auth .transport .requests import AuthorizedSession , Request
41
43
from google .oauth2 import id_token
42
- import asyncio
43
44
44
45
# --- Constants ---
45
46
BEARER_TOKEN_PREFIX = "Bearer "
46
47
CACHE_REFRESH_MARGIN = timedelta (seconds = 60 )
47
48
48
- _token_cache : Dict [str , Any ] = {"token" : None , "expires_at" : datetime .min .replace (tzinfo = timezone .utc )}
49
+ _token_cache : Dict [str , Any ] = {
50
+ "token" : None ,
51
+ "expires_at" : datetime .min .replace (tzinfo = timezone .utc ),
52
+ }
53
+
49
54
50
55
def _is_token_valid () -> bool :
51
56
"""Checks if the cached token exists and is not nearing expiry."""
52
57
if not _token_cache ["token" ]:
53
58
return False
54
- return datetime .now (timezone .utc ) < (_token_cache ["expires_at" ] - CACHE_REFRESH_MARGIN )
59
+ return datetime .now (timezone .utc ) < (
60
+ _token_cache ["expires_at" ] - CACHE_REFRESH_MARGIN
61
+ )
62
+
55
63
56
64
def _update_cache (new_token : str ) -> None :
57
65
"""
58
66
Validates a new token, extracts its expiry, and updates the cache.
59
-
67
+
60
68
Args:
61
69
new_token: The new JWT ID token string.
62
-
70
+
63
71
Raises:
64
72
ValueError: If the token is invalid or its expiry cannot be determined.
65
73
"""
@@ -68,13 +76,15 @@ def _update_cache(new_token: str) -> None:
68
76
# signature and claims against Google's public keys.
69
77
# It's a synchronous, CPU-bound operation, safe for async contexts.
70
78
claims = id_token .verify_oauth2_token (new_token , Request ())
71
-
79
+
72
80
expiry_timestamp = claims .get ("exp" )
73
81
if not expiry_timestamp :
74
82
raise ValueError ("Token does not contain an 'exp' claim." )
75
-
83
+
76
84
_token_cache ["token" ] = new_token
77
- _token_cache ["expires_at" ] = datetime .fromtimestamp (expiry_timestamp , tz = timezone .utc )
85
+ _token_cache ["expires_at" ] = datetime .fromtimestamp (
86
+ expiry_timestamp , tz = timezone .utc
87
+ )
78
88
79
89
except (ValueError , GoogleAuthError ) as e :
80
90
# Clear cache on failure to prevent using a stale or invalid token
@@ -83,13 +93,12 @@ def _update_cache(new_token: str) -> None:
83
93
raise ValueError (f"Failed to validate and cache the new token: { e } " ) from e
84
94
85
95
86
- # --- Public API Functions ---
87
-
88
96
def get_google_id_token (audience : str ) -> str :
89
97
"""
90
98
Synchronously fetches a Google ID token for a specific audience.
91
-
92
- This function uses Application Default Credentials and caches the token in memory.
99
+ This function uses Application Default Credentials for local systems
100
+ and standard google auth libraries for Google Cloud environments.
101
+ It caches the token in memory.
93
102
94
103
Args:
95
104
audience: The audience for the ID token (e.g., a service URL or client ID).
@@ -103,7 +112,7 @@ def get_google_id_token(audience: str) -> str:
103
112
"""
104
113
if _is_token_valid ():
105
114
return BEARER_TOKEN_PREFIX + _token_cache ["token" ]
106
-
115
+
107
116
# Get local user credentials
108
117
credentials , _ = google .auth .default ()
109
118
session = AuthorizedSession (credentials )
@@ -122,10 +131,29 @@ def get_google_id_token(audience: str) -> str:
122
131
new_token = id_token .fetch_id_token (request , audience )
123
132
_update_cache (new_token )
124
133
return BEARER_TOKEN_PREFIX + _token_cache ["token" ]
125
-
134
+
126
135
except GoogleAuthError as e :
127
- raise GoogleAuthError (f"Failed to fetch Google ID token for audience '{ audience } ': { e } " ) from e
128
-
136
+ raise GoogleAuthError (
137
+ f"Failed to fetch Google ID token for audience '{ audience } ': { e } "
138
+ ) from e
139
+
140
+
129
141
async def aget_google_id_token (audience : str ) -> str :
130
- token = await asyncio .to_thread (get_google_id_token , audience )
131
- return token
142
+ """
143
+ Asynchronously fetches a Google ID token for a specific audience.
144
+ This function uses Application Default Credentials for local systems
145
+ and standard google auth libraries for Google Cloud environments.
146
+ It caches the token in memory.
147
+
148
+ Args:
149
+ audience: The audience for the ID token (e.g., a service URL or client ID).
150
+
151
+ Returns:
152
+ A string in the format "Bearer <google_id_token>".
153
+
154
+ Raises:
155
+ GoogleAuthError: If fetching credentials or the token fails.
156
+ ValueError: If the fetched token is invalid.
157
+ """
158
+ token = await asyncio .to_thread (get_google_id_token , audience )
159
+ return token
0 commit comments