|
2 | 2 | from typing import Any, Dict, Optional, List |
3 | 3 |
|
4 | 4 | import requests |
| 5 | +import boto3 |
5 | 6 | from botocore.auth import S3SigV4Auth |
6 | 7 | from botocore.awsrequest import AWSRequest |
7 | 8 | from botocore.credentials import Credentials |
|
12 | 13 |
|
13 | 14 | class AWSPrometheusConnect(CustomPrometheusConnect): |
14 | 15 | def __init__( |
15 | | - self, access_key: str, secret_key: str, region: str, service_name: str, token: Optional[str] = None, **kwargs |
| 16 | + self, |
| 17 | + access_key: Optional[str], |
| 18 | + secret_key: Optional[str], |
| 19 | + region: str, |
| 20 | + service_name: str, |
| 21 | + token: Optional[str] = None, |
| 22 | + **kwargs, |
16 | 23 | ): |
17 | 24 | super().__init__(**kwargs) |
18 | | - self._credentials = Credentials(access_key, secret_key, token) |
19 | | - self._sigv4auth = S3SigV4Auth(self._credentials, service_name, region) |
| 25 | + self.region = region |
| 26 | + self.service_name = service_name |
| 27 | + |
| 28 | + if access_key and secret_key: |
| 29 | + # Backwards compatibility: use static keys |
| 30 | + self._credentials = Credentials(access_key, secret_key, token) |
| 31 | + else: |
| 32 | + # IRSA |
| 33 | + session = boto3.Session() |
| 34 | + creds = session.get_credentials() |
| 35 | + if not creds: |
| 36 | + raise RuntimeError("No AWS credentials found (neither static keys nor IRSA)") |
| 37 | + self._credentials = creds |
| 38 | + |
| 39 | + def _build_auth(self) -> S3SigV4Auth: |
| 40 | + """Builds fresh SigV4 auth with current credentials (handles rotation).""" |
| 41 | + frozen = self._credentials.get_frozen_credentials() |
| 42 | + return S3SigV4Auth(frozen, self.service_name, self.region) |
20 | 43 |
|
21 | 44 | def signed_request( |
22 | 45 | self, method, url, data=None, params=None, verify=False, headers=None |
23 | 46 | ): |
24 | | - request = AWSRequest( |
25 | | - method=method, url=url, data=data, params=params, headers=headers |
26 | | - ) |
27 | | - self._sigv4auth.add_auth(request) |
| 47 | + request = AWSRequest(method=method, url=url, data=data, params=params, headers=headers) |
| 48 | + auth = self._build_auth() |
| 49 | + auth.add_auth(request) |
28 | 50 | return requests.request( |
29 | 51 | method=method, |
30 | 52 | url=url, |
|
0 commit comments