2020logger  =  logging .getLogger (__name__ )
2121SNOWFLAKE_AUDIENCE  =  "snowflakecomputing.com" 
2222DEFAULT_ENTRA_SNOWFLAKE_RESOURCE  =  "api://fd3f753b-eed3-462c-b6a7-a4b5bb650aad" 
23+ GCP_METADATA_SERVICE_ACCOUNT_BASE_URL  =  (
24+     "http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default" 
25+ )
2326
2427
2528@unique  
@@ -184,29 +187,103 @@ def create_aws_attestation(
184187    )
185188
186189
187- def  create_gcp_attestation (
188-     session_manager : SessionManager  |  None  =  None ,
189- ) ->  WorkloadIdentityAttestation :
190-     """Tries to create a workload identity attestation for GCP. 
190+ def  get_gcp_access_token (session_manager : SessionManager ) ->  str :
191+     """Gets a GCP access token from the metadata server. 
192+ 
193+     If the application isn't running on GCP or no credentials were found, raises an error. 
194+     """ 
195+     try :
196+         res  =  session_manager .request (
197+             method = "GET" ,
198+             url = f"{ GCP_METADATA_SERVICE_ACCOUNT_BASE_URL }  ,
199+             headers = {
200+                 "Metadata-Flavor" : "Google" ,
201+             },
202+         )
203+         res .raise_for_status ()
204+         return  res .json ()["access_token" ]
205+     except  Exception  as  e :
206+         raise  ProgrammingError (
207+             msg = f"Error fetching GCP access token: { e }  ,
208+             errno = ER_WIF_CREDENTIALS_NOT_FOUND ,
209+         )
210+ 
211+ 
212+ def  get_gcp_identity_token_via_impersonation (
213+     impersonation_path : list [str ], session_manager : SessionManager 
214+ ) ->  str :
215+     """Gets a GCP identity token from the metadata server. 
216+ 
217+     If the application isn't running on GCP or no credentials were found, raises an error. 
218+     """ 
219+     if  not  impersonation_path :
220+         raise  ProgrammingError (
221+             msg = "Error: impersonation_path cannot be empty." ,
222+             errno = ER_WIF_CREDENTIALS_NOT_FOUND ,
223+         )
224+ 
225+     current_sa_token  =  get_gcp_access_token (session_manager )
226+     impersonation_path  =  [
227+         f"projects/-/serviceAccounts/{ client_id }   for  client_id  in  impersonation_path 
228+     ]
229+     try :
230+         res  =  session_manager .post (
231+             url = f"https://iamcredentials.googleapis.com/v1/{ impersonation_path [- 1 ]}  ,
232+             headers = {
233+                 "Authorization" : f"Bearer { current_sa_token }  ,
234+                 "Content-Type" : "application/json" ,
235+             },
236+             json = {
237+                 "delegates" : impersonation_path [:- 1 ],
238+                 "audience" : SNOWFLAKE_AUDIENCE ,
239+             },
240+         )
241+         res .raise_for_status ()
242+         return  res .json ()["token" ]
243+     except  Exception  as  e :
244+         raise  ProgrammingError (
245+             msg = f"Error fetching GCP identity token for impersonated GCP service account '{ impersonation_path [- 1 ]} { e }  ,
246+             errno = ER_WIF_CREDENTIALS_NOT_FOUND ,
247+         )
248+ 
249+ 
250+ def  get_gcp_identity_token (session_manager : SessionManager ) ->  str :
251+     """Gets a GCP identity token from the metadata server. 
191252
192253    If the application isn't running on GCP or no credentials were found, raises an error. 
193254    """ 
194255    try :
195256        res  =  session_manager .request (
196257            method = "GET" ,
197-             url = f"http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default /identity?audience={ SNOWFLAKE_AUDIENCE }  ,
258+             url = f"{ GCP_METADATA_SERVICE_ACCOUNT_BASE_URL } { SNOWFLAKE_AUDIENCE }  ,
198259            headers = {
199260                "Metadata-Flavor" : "Google" ,
200261            },
201262        )
202263        res .raise_for_status ()
264+         return  res .content .decode ("utf-8" )
203265    except  Exception  as  e :
204266        raise  ProgrammingError (
205-             msg = f"Error fetching GCP metadata : { e }  ,
267+             msg = f"Error fetching GCP identity token : { e }  ,
206268            errno = ER_WIF_CREDENTIALS_NOT_FOUND ,
207269        )
208270
209-     jwt_str  =  res .content .decode ("utf-8" )
271+ 
272+ def  create_gcp_attestation (
273+     session_manager : SessionManager ,
274+     impersonation_path : list [str ] |  None  =  None ,
275+ ) ->  WorkloadIdentityAttestation :
276+     """Tries to create a workload identity attestation for GCP. 
277+ 
278+     If the application isn't running on GCP or no credentials were found, raises an error. 
279+     """ 
280+     if  impersonation_path :
281+         jwt_str  =  get_gcp_identity_token_via_impersonation (
282+             impersonation_path , session_manager 
283+         )
284+     else :
285+         jwt_str  =  get_gcp_identity_token (session_manager )
286+ 
210287    _ , subject  =  extract_iss_and_sub_without_signature_verification (jwt_str )
211288    return  WorkloadIdentityAttestation (
212289        AttestationProvider .GCP , jwt_str , {"sub" : subject }
@@ -295,6 +372,7 @@ def create_attestation(
295372    provider : AttestationProvider ,
296373    entra_resource : str  |  None  =  None ,
297374    token : str  |  None  =  None ,
375+     impersonation_path : list [str ] |  None  =  None ,
298376    session_manager : SessionManager  |  None  =  None ,
299377) ->  WorkloadIdentityAttestation :
300378    """Entry point to create an attestation using the given provider. 
@@ -311,7 +389,7 @@ def create_attestation(
311389    elif  provider  ==  AttestationProvider .AZURE :
312390        return  create_azure_attestation (entra_resource , session_manager )
313391    elif  provider  ==  AttestationProvider .GCP :
314-         return  create_gcp_attestation (session_manager )
392+         return  create_gcp_attestation (session_manager ,  impersonation_path )
315393    elif  provider  ==  AttestationProvider .OIDC :
316394        return  create_oidc_attestation (token )
317395    else :
0 commit comments