2525
2626logger = logging .getLogger (__name__ )
2727
28+ GCP_METADATA_SERVICE_ACCOUNT_BASE_URL = (
29+ "http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default"
30+ )
31+
2832
2933async def get_aws_region () -> str :
3034 """Get the current AWS workload's region."""
@@ -81,30 +85,108 @@ async def create_aws_attestation() -> WorkloadIdentityAttestation:
8185 )
8286
8387
84- async def create_gcp_attestation (
85- session_manager : SessionManager | None = None ,
86- ) -> WorkloadIdentityAttestation :
87- """Tries to create a workload identity attestation for GCP.
88+ async def get_gcp_access_token (session_manager : SessionManager ) -> str :
89+ """Gets a GCP access token from the metadata server.
90+
91+ If the application isn't running on GCP or no credentials were found, raises an error.
92+ """
93+ try :
94+ res = await session_manager .request (
95+ method = "GET" ,
96+ url = f"{ GCP_METADATA_SERVICE_ACCOUNT_BASE_URL } /token" ,
97+ headers = {
98+ "Metadata-Flavor" : "Google" ,
99+ },
100+ )
101+
102+ content = await res .content .read ()
103+ response_text = content .decode ("utf-8" )
104+ return json .loads (response_text )["access_token" ]
105+ except Exception as e :
106+ raise ProgrammingError (
107+ msg = f"Error fetching GCP access token: { e } . Ensure the application is running on GCP." ,
108+ errno = ER_WIF_CREDENTIALS_NOT_FOUND ,
109+ )
110+
111+
112+ async def get_gcp_identity_token_via_impersonation (
113+ impersonation_path : list [str ], session_manager : SessionManager
114+ ) -> str :
115+ """Gets a GCP identity token from the metadata server.
116+
117+ If the application isn't running on GCP or no credentials were found, raises an error.
118+ """
119+ if not impersonation_path :
120+ raise ProgrammingError (
121+ msg = "Error: impersonation_path cannot be empty." ,
122+ errno = ER_WIF_CREDENTIALS_NOT_FOUND ,
123+ )
124+
125+ current_sa_token = await get_gcp_access_token (session_manager )
126+ impersonation_path = [
127+ f"projects/-/serviceAccounts/{ client_id } " for client_id in impersonation_path
128+ ]
129+ try :
130+ res = await session_manager .post (
131+ url = f"https://iamcredentials.googleapis.com/v1/{ impersonation_path [- 1 ]} :generateIdToken" ,
132+ headers = {
133+ "Authorization" : f"Bearer { current_sa_token } " ,
134+ "Content-Type" : "application/json" ,
135+ },
136+ json = {
137+ "delegates" : impersonation_path [:- 1 ],
138+ "audience" : SNOWFLAKE_AUDIENCE ,
139+ },
140+ )
141+
142+ content = await res .content .read ()
143+ response_text = content .decode ("utf-8" )
144+ return json .loads (response_text )["token" ]
145+ except Exception as e :
146+ raise ProgrammingError (
147+ msg = f"Error fetching GCP identity token for impersonated GCP service account '{ impersonation_path [- 1 ]} ': { e } . Ensure the application is running on GCP." ,
148+ errno = ER_WIF_CREDENTIALS_NOT_FOUND ,
149+ )
150+
151+
152+ async def get_gcp_identity_token (session_manager : SessionManager ) -> str :
153+ """Gets a GCP identity token from the metadata server.
88154
89155 If the application isn't running on GCP or no credentials were found, raises an error.
90156 """
91157 try :
92158 res = await session_manager .request (
93159 method = "GET" ,
94- url = f"http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default /identity?audience={ SNOWFLAKE_AUDIENCE } " ,
160+ url = f"{ GCP_METADATA_SERVICE_ACCOUNT_BASE_URL } /identity?audience={ SNOWFLAKE_AUDIENCE } " ,
95161 headers = {
96162 "Metadata-Flavor" : "Google" ,
97163 },
98164 )
99165
100166 content = await res .content .read ()
101- jwt_str = content .decode ("utf-8" )
167+ return content .decode ("utf-8" )
102168 except Exception as e :
103169 raise ProgrammingError (
104- msg = f"Error fetching GCP metadata : { e } . Ensure the application is running on GCP." ,
170+ msg = f"Error fetching GCP identity token : { e } . Ensure the application is running on GCP." ,
105171 errno = ER_WIF_CREDENTIALS_NOT_FOUND ,
106172 )
107173
174+
175+ async def create_gcp_attestation (
176+ session_manager : SessionManager ,
177+ impersonation_path : list [str ] | None = None ,
178+ ) -> WorkloadIdentityAttestation :
179+ """Tries to create a workload identity attestation for GCP.
180+
181+ If the application isn't running on GCP or no credentials were found, raises an error.
182+ """
183+ if impersonation_path :
184+ jwt_str = await get_gcp_identity_token_via_impersonation (
185+ impersonation_path , session_manager
186+ )
187+ else :
188+ jwt_str = await get_gcp_identity_token (session_manager )
189+
108190 _ , subject = extract_iss_and_sub_without_signature_verification (jwt_str )
109191 return WorkloadIdentityAttestation (
110192 AttestationProvider .GCP , jwt_str , {"sub" : subject }
@@ -179,6 +261,7 @@ async def create_attestation(
179261 provider : AttestationProvider | None ,
180262 entra_resource : str | None = None ,
181263 token : str | None = None ,
264+ impersonation_path : list [str ] | None = None ,
182265 session_manager : SessionManager | None = None ,
183266) -> WorkloadIdentityAttestation :
184267 """Entry point to create an attestation using the given provider.
@@ -197,7 +280,7 @@ async def create_attestation(
197280 elif provider == AttestationProvider .AZURE :
198281 return await create_azure_attestation (entra_resource , session_manager )
199282 elif provider == AttestationProvider .GCP :
200- return await create_gcp_attestation (session_manager )
283+ return await create_gcp_attestation (session_manager , impersonation_path )
201284 elif provider == AttestationProvider .OIDC :
202285 return create_oidc_attestation (token )
203286 else :
0 commit comments