1+ import os
12from datetime import datetime
2- from typing import Any , Dict , Optional , List
3+ from typing import Any , Dict , Optional
4+ import logging
35
46import requests
57import boto3
68from botocore .auth import S3SigV4Auth
79from botocore .awsrequest import AWSRequest
810from botocore .credentials import Credentials
911from prometheus_api_client import PrometheusApiClientException
12+ from botocore .exceptions import BotoCoreError , ClientError
1013
1114from prometrix .connect .custom_connect import CustomPrometheusConnect
1215
16+ SA_TOKEN_PATH = os .environ .get ("SA_TOKEN_PATH" , "/var/run/secrets/eks.amazonaws.com/serviceaccount/token" )
17+ AWS_ASSUME_ROLE = os .environ .get ("AWS_ASSUME_ROLE" )
1318
1419class AWSPrometheusConnect (CustomPrometheusConnect ):
1520 def __init__ (
@@ -19,6 +24,7 @@ def __init__(
1924 region : str ,
2025 service_name : str ,
2126 token : Optional [str ] = None ,
27+ assume_role_arn : Optional [str ] = None ,
2228 ** kwargs ,
2329 ):
2430 super ().__init__ (** kwargs )
@@ -36,6 +42,45 @@ def __init__(
3642 raise RuntimeError ("No AWS credentials found (neither static keys nor IRSA)" )
3743 self ._credentials = creds
3844
45+ role_to_assume = assume_role_arn or AWS_ASSUME_ROLE
46+ if role_to_assume :
47+ self ._assume_role_with_web_identity (role_to_assume )
48+
49+ def _assume_role_with_web_identity (self , role_arn : str ) -> None :
50+ """Assume the given role using the pod's service-account web identity token."""
51+ try :
52+ with open (SA_TOKEN_PATH , "r" , encoding = "utf-8" ) as f :
53+ web_identity_token = f .read ().strip ()
54+ except FileNotFoundError :
55+ raise RuntimeError (f"Service Account token not found at { SA_TOKEN_PATH } " )
56+
57+ try :
58+ sts = boto3 .client ("sts" , region_name = self .region )
59+ resp = sts .assume_role_with_web_identity (
60+ RoleArn = role_arn ,
61+ RoleSessionName = "amp-auto" ,
62+ WebIdentityToken = web_identity_token ,
63+ )
64+
65+ credentials = resp .get ("Credentials" )
66+ if not credentials :
67+ logging .error ("Invalid assume role response {resp}" )
68+ return
69+
70+ required_fields = ["AccessKeyId" , "SecretAccessKey" , "SessionToken" ]
71+ missing = [f for f in required_fields if not credentials .get (f )]
72+ if missing :
73+ logging .error ("Missing required credential fields: {missing}. Raw response: {resp}" )
74+ raise Exception (f"Failed to assume role: missing fields { missing } " )
75+
76+ self ._credentials = Credentials (
77+ credentials ["AccessKeyId" ],credentials ["SecretAccessKey" ], credentials ["SessionToken" ]
78+ )
79+ except (ClientError , BotoCoreError , Exception ) as e :
80+ raise Exception (
81+ f"Failed to assume role { role_arn } with web identity: { str (e )} "
82+ )
83+
3984 def _build_auth (self ) -> S3SigV4Auth :
4085 """Builds fresh SigV4 auth with current credentials (handles rotation)."""
4186 frozen = self ._credentials .get_frozen_credentials ()
@@ -53,6 +98,7 @@ def signed_request(
5398 headers = dict (request .headers ),
5499 verify = verify ,
55100 data = data ,
101+ params = params ,
56102 )
57103
58104 def _custom_query (self , query : str , params : dict = None ):
0 commit comments