Skip to content

Commit f4527e3

Browse files
committed
Refactor credential refresh logic in IdentityClient
Simplifies and restructures the credential refresh flow to prioritize constructor/environment credentials, handle expired session tokens, and streamline VeFaaS IAM and AssumeRole fallback logic. Improves readability and maintainability by removing redundant checks and helper functions.
1 parent c744f93 commit f4527e3

File tree

1 file changed

+36
-57
lines changed

1 file changed

+36
-57
lines changed

veadk/integrations/ve_identity/identity_client.py

Lines changed: 36 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -56,76 +56,55 @@ def refresh_credentials(func):
5656
"""
5757
import asyncio
5858

59+
def _try_get_vefaas_credentials():
60+
"""Attempt to retrieve credentials from VeFaaS IAM."""
61+
try:
62+
ve_iam_cred = get_credential_from_vefaas_iam()
63+
return (
64+
ve_iam_cred.access_key_id,
65+
ve_iam_cred.secret_access_key,
66+
ve_iam_cred.session_token,
67+
)
68+
except FileNotFoundError:
69+
pass # VeFaaS IAM file not found, ignore
70+
except Exception as e:
71+
logger.warning(f"Failed to retrieve credentials from VeFaaS IAM: {e}")
72+
return None
73+
5974
@wraps(func)
6075
def _refresh_creds(self: IdentityClient):
6176
"""Helper to refresh credentials."""
62-
# Try to get credentials from environment variables first
77+
# Step 1: Get initial credentials from constructor or environment variables
6378
ak = self._initial_access_key or os.getenv("VOLCENGINE_ACCESS_KEY", "")
6479
sk = self._initial_secret_key or os.getenv("VOLCENGINE_SECRET_KEY", "")
6580
session_token = self._initial_session_token or os.getenv(
6681
"VOLCENGINE_SESSION_TOKEN", ""
6782
)
6883

69-
# Helper function to attempt VeFaaS IAM credential retrieval
70-
def try_get_vefaas_credentials():
71-
"""Attempt to retrieve credentials from VeFaaS IAM."""
72-
try:
73-
ve_iam_cred = get_credential_from_vefaas_iam()
74-
return (
75-
ve_iam_cred.access_key_id,
76-
ve_iam_cred.secret_access_key,
77-
ve_iam_cred.session_token,
78-
)
79-
except FileNotFoundError:
80-
pass # If VeFaaS IAM file not found, ignore
81-
except Exception as e:
82-
logger.warning(f"Failed to retrieve credentials from VeFaaS IAM: {e}")
83-
return None
84+
# Step 2: Clear expired session_token
85+
if self._is_sts_credential_expired():
86+
logger.info("STS credentials expired, clearing...")
87+
session_token = ""
8488

85-
# If no AK/SK, try to get from VeFaaS IAM
86-
if not (ak and sk):
87-
logger.info(
88-
"Credentials not found in environment, attempting to fetch from VeFaaS IAM..."
89-
)
90-
credentials = try_get_vefaas_credentials()
91-
if credentials:
92-
ak, sk, session_token = credentials
89+
# Step 3: Try VeFaaS IAM if no credentials or no session_token
90+
# VeFaaS IAM provides complete credentials (ak, sk, session_token)
91+
if not (ak and sk) or (ak and sk and not session_token):
92+
ak, sk, session_token = _try_get_vefaas_credentials()
9393

94-
# If we have AK/SK but no session token, or STS credentials are expired,
95-
# try to get complete credentials
96-
need_refresh = False
94+
# Step 4: If still no session_token, try AssumeRole
9795
if ak and sk and not session_token:
98-
need_refresh = True
99-
elif ak and sk and session_token:
100-
# Check if STS credentials are expired
101-
if self._is_sts_credential_expired():
102-
logger.info("STS credentials expired, refreshing...")
103-
need_refresh = True
104-
# Clear expired session token to force refresh
105-
session_token = ""
106-
107-
if need_refresh:
108-
# First attempt: try VeFaaS IAM
109-
credentials = try_get_vefaas_credentials()
110-
if credentials:
111-
ak, sk, session_token = credentials
112-
113-
# Second attempt: if still no session token, try AssumeRole
114-
if not session_token:
115-
role_trn = self._get_iam_role_trn_from_vefaas_iam() or os.getenv(
116-
"RUNTIME_IAM_ROLE_TRN", ""
117-
)
118-
119-
if role_trn:
120-
try:
121-
sts_credentials = self._assume_role(ak, sk, role_trn)
122-
ak = sts_credentials.access_key_id
123-
sk = sts_credentials.secret_access_key
124-
session_token = sts_credentials.session_token
125-
except Exception as e:
126-
logger.warning(f"Failed to assume role: {e}")
96+
if role_trn := self._get_iam_role_trn_from_vefaas_iam() or os.getenv(
97+
"RUNTIME_IAM_ROLE_TRN", ""
98+
):
99+
try:
100+
sts_cred = self._assume_role(ak, sk, role_trn)
101+
ak = sts_cred.access_key_id
102+
sk = sts_cred.secret_access_key
103+
session_token = sts_cred.session_token
104+
except Exception as e:
105+
logger.warning(f"Failed to assume role: {e}")
127106

128-
# Update configuration with the credentials
107+
# Step 5: Update configuration with the credentials
129108
self._api_client.api_client.configuration.ak = ak
130109
self._api_client.api_client.configuration.sk = sk
131110
self._api_client.api_client.configuration.session_token = session_token

0 commit comments

Comments
 (0)