Skip to content

Commit 2d83694

Browse files
authored
fix: Improve STS credential refresh and logging (#327)
Refactored credential refresh logic to handle expired STS credentials and avoid unnecessary AssumeRole calls. Reduced verbose logging and clarified cache handling for AssumeRole credentials. Refactor credential refresh logic in IdentityClient Simplifies and restructures the credential refresh flow to prioritize constructor/environment credentials, handle expired session tokens, and streamline VeFaaS IAM and AssumeRole fallback logic. Improves readability and maintainability by removing redundant checks and helper functions.
1 parent 39712f5 commit 2d83694

File tree

1 file changed

+45
-70
lines changed

1 file changed

+45
-70
lines changed

veadk/integrations/ve_identity/identity_client.py

Lines changed: 45 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -56,67 +56,56 @@ def refresh_credentials(func):
5656
"""
5757
import asyncio
5858

59+
def _try_get_vefaas_credentials():
60+
"""Attempt to retrieve credentials from VeFaaS IAM."""
61+
try:
62+
ve_iam_cred = get_credential_from_vefaas_iam()
63+
return (
64+
ve_iam_cred.access_key_id,
65+
ve_iam_cred.secret_access_key,
66+
ve_iam_cred.session_token,
67+
)
68+
except FileNotFoundError:
69+
pass # VeFaaS IAM file not found, ignore
70+
except Exception as e:
71+
logger.warning(f"Failed to retrieve credentials from VeFaaS IAM: {e}")
72+
return None
73+
5974
@wraps(func)
6075
def _refresh_creds(self: IdentityClient):
6176
"""Helper to refresh credentials."""
62-
# Try to get credentials from environment variables first
77+
# Step 1: Get initial credentials from constructor or environment variables
6378
ak = self._initial_access_key or os.getenv("VOLCENGINE_ACCESS_KEY", "")
6479
sk = self._initial_secret_key or os.getenv("VOLCENGINE_SECRET_KEY", "")
6580
session_token = self._initial_session_token or os.getenv(
6681
"VOLCENGINE_SESSION_TOKEN", ""
6782
)
6883

69-
# Helper function to attempt VeFaaS IAM credential retrieval
70-
def try_get_vefaas_credentials():
71-
"""Attempt to retrieve credentials from VeFaaS IAM."""
72-
try:
73-
logger.info("Attempting to fetch credentials from VeFaaS IAM...")
74-
ve_iam_cred = get_credential_from_vefaas_iam()
75-
return (
76-
ve_iam_cred.access_key_id,
77-
ve_iam_cred.secret_access_key,
78-
ve_iam_cred.session_token,
79-
)
80-
except FileNotFoundError as e:
81-
logger.warning(f"VeFaaS IAM credentials not available: {e}")
82-
except Exception as e:
83-
logger.warning(f"Failed to retrieve credentials from VeFaaS IAM: {e}")
84-
return None
84+
# Step 2: Clear expired session_token
85+
if self._is_sts_credential_expired():
86+
logger.info("STS credentials expired, clearing...")
87+
session_token = ""
8588

86-
# If no AK/SK, try to get from VeFaaS IAM
87-
if not (ak and sk):
88-
logger.info(
89-
"Credentials not found in environment, attempting to fetch from VeFaaS IAM..."
90-
)
91-
credentials = try_get_vefaas_credentials()
92-
if credentials:
89+
# Step 3: Try VeFaaS IAM if no credentials or no session_token
90+
# VeFaaS IAM provides complete credentials (ak, sk, session_token)
91+
if not (ak and sk) or (ak and sk and not session_token):
92+
if credentials := _try_get_vefaas_credentials():
9393
ak, sk, session_token = credentials
9494

95-
# If we have AK/SK but no session token, try to get complete credentials
95+
# Step 4: If still no session_token, try AssumeRole
9696
if ak and sk and not session_token:
97-
# First attempt: try VeFaaS IAM
98-
credentials = try_get_vefaas_credentials()
99-
if credentials:
100-
ak, sk, session_token = credentials
101-
102-
# Second attempt: if still no session token, try AssumeRole
103-
if not session_token:
104-
role_trn = self._get_iam_role_trn_from_vefaas_iam() or os.getenv(
105-
"RUNTIME_IAM_ROLE_TRN", ""
106-
)
97+
if role_trn := self._get_iam_role_trn_from_vefaas_iam() or os.getenv(
98+
"RUNTIME_IAM_ROLE_TRN", ""
99+
):
100+
try:
101+
sts_cred = self._assume_role(ak, sk, role_trn)
102+
ak = sts_cred.access_key_id
103+
sk = sts_cred.secret_access_key
104+
session_token = sts_cred.session_token
105+
except Exception as e:
106+
logger.warning(f"Failed to assume role: {e}")
107107

108-
if role_trn:
109-
try:
110-
logger.info(f"Attempting AssumeRole with role: {role_trn}")
111-
sts_credentials = self._assume_role(ak, sk, role_trn)
112-
ak = sts_credentials.access_key_id
113-
sk = sts_credentials.secret_access_key
114-
session_token = sts_credentials.session_token
115-
logger.info("Successfully obtained credentials via AssumeRole")
116-
except Exception as e:
117-
logger.warning(f"Failed to assume role: {e}")
118-
119-
# Update configuration with the credentials
108+
# Step 5: Update configuration with the credentials
120109
self._api_client.api_client.configuration.ak = ak
121110
self._api_client.api_client.configuration.sk = sk
122111
self._api_client.api_client.configuration.session_token = session_token
@@ -192,16 +181,9 @@ def __init__(
192181
self._sts_credential_expires_at: Optional[int] = None
193182

194183
def _get_iam_role_trn_from_vefaas_iam(self) -> Optional[str]:
195-
logger.info(
196-
f"Try to get IAM Role TRN from VeFaaS IAM file (path={VEFAAS_IAM_CRIDENTIAL_PATH})."
197-
)
198-
199184
path = Path(VEFAAS_IAM_CRIDENTIAL_PATH)
200185

201186
if not path.exists():
202-
logger.error(
203-
f"Get IAM Role TRN from IAM file failed, and VeFaaS IAM file (path={VEFAAS_IAM_CRIDENTIAL_PATH}) not exists. Please check your configuration."
204-
)
205187
return None
206188

207189
with open(VEFAAS_IAM_CRIDENTIAL_PATH, "r") as f:
@@ -233,6 +215,9 @@ def _assume_role(
233215
) -> AssumeRoleCredential:
234216
"""Execute AssumeRole to get STS temporary credentials.
235217
218+
This method performs the AssumeRole operation and caches the result.
219+
Cache validation is handled by the caller (refresh_credentials decorator).
220+
236221
Args:
237222
access_key: VolcEngine access key
238223
secret_key: VolcEngine secret key
@@ -244,16 +229,9 @@ def _assume_role(
244229
Raises:
245230
Exception: If AssumeRole fails
246231
"""
247-
# Check if the cached credentials are still valid
248-
if (
249-
self._cached_sts_credential is not None
250-
and not self._is_sts_credential_expired()
251-
):
252-
logger.info("Using cached STS credentials")
253-
return self._cached_sts_credential
254-
255232
logger.info(
256-
"Cached STS credentials expired or not found, requesting new credentials..."
233+
f"Requesting new STS credentials for role: {role_trn}, "
234+
f"session: {settings.veidentity.role_session_name}"
257235
)
258236

259237
# Create STS client configuration
@@ -272,11 +250,7 @@ def _assume_role(
272250
role_session_name=settings.veidentity.role_session_name,
273251
)
274252

275-
logger.info(
276-
f"Executing AssumeRole for role: {role_trn}, "
277-
f"session: {settings.veidentity.role_session_name}"
278-
)
279-
253+
# Execute AssumeRole
280254
response: volcenginesdksts.AssumeRoleResponse = sts_client.assume_role(
281255
assume_role_request
282256
)
@@ -298,18 +272,19 @@ def _assume_role(
298272
expires_at_timestamp = calendar.timegm(dt.timetuple())
299273
except Exception as e:
300274
logger.warning(f"Failed to parse STS credential expiration time: {e}")
301-
# Expires in 1 hour by default
275+
# Default to 1 hour expiration
302276
import time
303277

304278
expires_at_timestamp = int(time.time()) + 3600
305279

280+
# Create credential object
306281
sts_credential = AssumeRoleCredential(
307282
access_key_id=credentials.access_key_id,
308283
secret_access_key=credentials.secret_access_key,
309284
session_token=credentials.session_token,
310285
)
311286

312-
# Cached credentials and expiration time
287+
# Cache credentials and expiration time
313288
self._cached_sts_credential = sts_credential
314289
self._sts_credential_expires_at = expires_at_timestamp
315290

0 commit comments

Comments
 (0)