2525
2626logger  =  logging .getLogger (__name__ )
2727
28+ GCP_METADATA_SERVICE_ACCOUNT_BASE_URL  =  (
29+     "http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default" 
30+ )
31+ 
2832
2933async  def  get_aws_region () ->  str :
3034    """Get the current AWS workload's region.""" 
@@ -81,30 +85,108 @@ async def create_aws_attestation() -> WorkloadIdentityAttestation:
8185    )
8286
8387
84- async  def  create_gcp_attestation (
85-     session_manager : SessionManager  |  None  =  None ,
86- ) ->  WorkloadIdentityAttestation :
87-     """Tries to create a workload identity attestation for GCP. 
88+ async  def  get_gcp_access_token (session_manager : SessionManager ) ->  str :
89+     """Gets a GCP access token from the metadata server. 
90+ 
91+     If the application isn't running on GCP or no credentials were found, raises an error. 
92+     """ 
93+     try :
94+         res  =  await  session_manager .request (
95+             method = "GET" ,
96+             url = f"{ GCP_METADATA_SERVICE_ACCOUNT_BASE_URL }  ,
97+             headers = {
98+                 "Metadata-Flavor" : "Google" ,
99+             },
100+         )
101+ 
102+         content  =  await  res .content .read ()
103+         response_text  =  content .decode ("utf-8" )
104+         return  json .loads (response_text )["access_token" ]
105+     except  Exception  as  e :
106+         raise  ProgrammingError (
107+             msg = f"Error fetching GCP access token: { e }  ,
108+             errno = ER_WIF_CREDENTIALS_NOT_FOUND ,
109+         )
110+ 
111+ 
112+ async  def  get_gcp_identity_token_via_impersonation (
113+     impersonation_path : list [str ], session_manager : SessionManager 
114+ ) ->  str :
115+     """Gets a GCP identity token from the metadata server. 
116+ 
117+     If the application isn't running on GCP or no credentials were found, raises an error. 
118+     """ 
119+     if  not  impersonation_path :
120+         raise  ProgrammingError (
121+             msg = "Error: impersonation_path cannot be empty." ,
122+             errno = ER_WIF_CREDENTIALS_NOT_FOUND ,
123+         )
124+ 
125+     current_sa_token  =  await  get_gcp_access_token (session_manager )
126+     impersonation_path  =  [
127+         f"projects/-/serviceAccounts/{ client_id }   for  client_id  in  impersonation_path 
128+     ]
129+     try :
130+         res  =  await  session_manager .post (
131+             url = f"https://iamcredentials.googleapis.com/v1/{ impersonation_path [- 1 ]}  ,
132+             headers = {
133+                 "Authorization" : f"Bearer { current_sa_token }  ,
134+                 "Content-Type" : "application/json" ,
135+             },
136+             json = {
137+                 "delegates" : impersonation_path [:- 1 ],
138+                 "audience" : SNOWFLAKE_AUDIENCE ,
139+             },
140+         )
141+ 
142+         content  =  await  res .content .read ()
143+         response_text  =  content .decode ("utf-8" )
144+         return  json .loads (response_text )["token" ]
145+     except  Exception  as  e :
146+         raise  ProgrammingError (
147+             msg = f"Error fetching GCP identity token for impersonated GCP service account '{ impersonation_path [- 1 ]} { e }  ,
148+             errno = ER_WIF_CREDENTIALS_NOT_FOUND ,
149+         )
150+ 
151+ 
152+ async  def  get_gcp_identity_token (session_manager : SessionManager ) ->  str :
153+     """Gets a GCP identity token from the metadata server. 
88154
89155    If the application isn't running on GCP or no credentials were found, raises an error. 
90156    """ 
91157    try :
92158        res  =  await  session_manager .request (
93159            method = "GET" ,
94-             url = f"http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default /identity?audience={ SNOWFLAKE_AUDIENCE }  ,
160+             url = f"{ GCP_METADATA_SERVICE_ACCOUNT_BASE_URL } { SNOWFLAKE_AUDIENCE }  ,
95161            headers = {
96162                "Metadata-Flavor" : "Google" ,
97163            },
98164        )
99165
100166        content  =  await  res .content .read ()
101-         jwt_str   =  content .decode ("utf-8" )
167+         return  content .decode ("utf-8" )
102168    except  Exception  as  e :
103169        raise  ProgrammingError (
104-             msg = f"Error fetching GCP metadata : { e }  ,
170+             msg = f"Error fetching GCP identity token : { e }  ,
105171            errno = ER_WIF_CREDENTIALS_NOT_FOUND ,
106172        )
107173
174+ 
175+ async  def  create_gcp_attestation (
176+     session_manager : SessionManager ,
177+     impersonation_path : list [str ] |  None  =  None ,
178+ ) ->  WorkloadIdentityAttestation :
179+     """Tries to create a workload identity attestation for GCP. 
180+ 
181+     If the application isn't running on GCP or no credentials were found, raises an error. 
182+     """ 
183+     if  impersonation_path :
184+         jwt_str  =  await  get_gcp_identity_token_via_impersonation (
185+             impersonation_path , session_manager 
186+         )
187+     else :
188+         jwt_str  =  await  get_gcp_identity_token (session_manager )
189+ 
108190    _ , subject  =  extract_iss_and_sub_without_signature_verification (jwt_str )
109191    return  WorkloadIdentityAttestation (
110192        AttestationProvider .GCP , jwt_str , {"sub" : subject }
@@ -179,6 +261,7 @@ async def create_attestation(
179261    provider : AttestationProvider  |  None ,
180262    entra_resource : str  |  None  =  None ,
181263    token : str  |  None  =  None ,
264+     impersonation_path : list [str ] |  None  =  None ,
182265    session_manager : SessionManager  |  None  =  None ,
183266) ->  WorkloadIdentityAttestation :
184267    """Entry point to create an attestation using the given provider. 
@@ -195,7 +278,7 @@ async def create_attestation(
195278    elif  provider  ==  AttestationProvider .AZURE :
196279        return  await  create_azure_attestation (entra_resource , session_manager )
197280    elif  provider  ==  AttestationProvider .GCP :
198-         return  await  create_gcp_attestation (session_manager )
281+         return  await  create_gcp_attestation (session_manager ,  impersonation_path )
199282    elif  provider  ==  AttestationProvider .OIDC :
200283        return  create_oidc_attestation (token )
201284    else :
0 commit comments