41
41
# --- Constants ---
42
42
BEARER_TOKEN_PREFIX = "Bearer "
43
43
CACHE_REFRESH_MARGIN = timedelta (seconds = 60 )
44
+ DEFAULT_CLOCK_SKEW = 0
44
45
45
46
_token_cache : Dict [str , Any ] = {
46
47
"token" : None ,
@@ -57,7 +58,7 @@ def _is_token_valid() -> bool:
57
58
)
58
59
59
60
60
- def _update_cache (new_token : str ) -> None :
61
+ def _update_cache (new_token : str , clock_skew_in_seconds : int ) -> None :
61
62
"""
62
63
Validates a new token, extracts its expiry, and updates the cache.
63
64
@@ -71,7 +72,9 @@ def _update_cache(new_token: str) -> None:
71
72
# verify_oauth2_token not only decodes but also validates the token's
72
73
# signature and claims against Google's public keys.
73
74
# It's a synchronous, CPU-bound operation, safe for async contexts.
74
- claims = id_token .verify_oauth2_token (new_token , Request ())
75
+ claims = id_token .verify_oauth2_token (
76
+ new_token , Request (), clock_skew_in_seconds = clock_skew_in_seconds
77
+ )
75
78
76
79
expiry_timestamp = claims .get ("exp" )
77
80
if not expiry_timestamp :
@@ -89,7 +92,15 @@ def _update_cache(new_token: str) -> None:
89
92
raise ValueError (f"Failed to validate and cache the new token: { e } " ) from e
90
93
91
94
92
- def get_google_token_from_aud (audience : Optional [str ] = None ) -> str :
95
+ def get_google_token_from_aud (
96
+ clock_skew_in_seconds : int = 0 , audience : Optional [str ] = None
97
+ ) -> str :
98
+ if clock_skew_in_seconds < 0 or clock_skew_in_seconds > 60 :
99
+ raise ValueError (
100
+ f"Illegal clock_skew_in_seconds value: { clock_skew_in_seconds } . Must be between 0 and 60"
101
+ ", inclusive."
102
+ )
103
+
93
104
if _is_token_valid ():
94
105
return BEARER_TOKEN_PREFIX + _token_cache ["token" ]
95
106
@@ -102,7 +113,7 @@ def get_google_token_from_aud(audience: Optional[str] = None) -> str:
102
113
if hasattr (credentials , "id_token" ):
103
114
new_id_token = getattr (credentials , "id_token" , None )
104
115
if new_id_token :
105
- _update_cache (new_id_token )
116
+ _update_cache (new_id_token , clock_skew_in_seconds )
106
117
return BEARER_TOKEN_PREFIX + new_id_token
107
118
108
119
if audience is None :
@@ -115,7 +126,7 @@ def get_google_token_from_aud(audience: Optional[str] = None) -> str:
115
126
try :
116
127
request = Request ()
117
128
new_token = id_token .fetch_id_token (request , audience )
118
- _update_cache (new_token )
129
+ _update_cache (new_token , clock_skew_in_seconds )
119
130
return BEARER_TOKEN_PREFIX + _token_cache ["token" ]
120
131
121
132
except GoogleAuthError as e :
@@ -124,15 +135,20 @@ def get_google_token_from_aud(audience: Optional[str] = None) -> str:
124
135
) from e
125
136
126
137
127
- def get_google_id_token (audience : Optional [str ] = None ) -> Callable [[], str ]:
138
+ def get_google_id_token (
139
+ audience : Optional [str ] = None , clock_skew_in_seconds : int = DEFAULT_CLOCK_SKEW
140
+ ) -> Callable [[], str ]:
128
141
"""
129
142
Returns a SYNC function that, when called, fetches a Google ID token.
130
143
This function uses Application Default Credentials for local systems
131
144
and standard google auth libraries for Google Cloud environments.
132
145
It caches the token in memory.
133
146
134
147
Args:
135
- audience: The audience for the ID token (e.g., a service URL or client ID).
148
+ audience: The audience for the ID token (e.g., a service URL or client
149
+ ID).
150
+ clock_skew_in_seconds: The number of seconds to tolerate when checking the token.
151
+ Must be between 0-60. Defaults to 0.
136
152
137
153
Returns:
138
154
A function that when executed returns string in the format "Bearer <google_id_token>".
@@ -143,13 +159,13 @@ def get_google_id_token(audience: Optional[str] = None) -> Callable[[], str]:
143
159
"""
144
160
145
161
def _token_getter () -> str :
146
- return get_google_token_from_aud (audience )
162
+ return get_google_token_from_aud (clock_skew_in_seconds , audience )
147
163
148
164
return _token_getter
149
165
150
166
151
167
def aget_google_id_token (
152
- audience : Optional [str ] = None ,
168
+ audience : Optional [str ] = None , clock_skew_in_seconds : int = DEFAULT_CLOCK_SKEW
153
169
) -> Callable [[], Coroutine [Any , Any , str ]]:
154
170
"""
155
171
Returns an ASYNC function that, when called, fetches a Google ID token.
@@ -158,7 +174,10 @@ def aget_google_id_token(
158
174
It caches the token in memory.
159
175
160
176
Args:
161
- audience: The audience for the ID token (e.g., a service URL or client ID).
177
+ audience: The audience for the ID token (e.g., a service URL or client
178
+ ID).
179
+ clock_skew_in_seconds: The number of seconds to tolerate when checking the token.
180
+ Must be between 0-60. Defaults to 0.
162
181
163
182
Returns:
164
183
An async function that when executed returns string in the format "Bearer <google_id_token>".
@@ -169,6 +188,8 @@ def aget_google_id_token(
169
188
"""
170
189
171
190
async def _token_getter () -> str :
172
- return await asyncio .to_thread (get_google_token_from_aud , audience )
191
+ return await asyncio .to_thread (
192
+ get_google_token_from_aud , clock_skew_in_seconds , audience
193
+ )
173
194
174
195
return _token_getter
0 commit comments