4
4
5
5
from __future__ import annotations
6
6
7
- from enum import Enum , unique
8
7
from typing import Any
9
8
10
- from ...auth .by_plugin import AuthType
11
- from ...network import WORKLOAD_IDENTITY_AUTHENTICATOR
9
+ from ...auth .workload_identity import (
10
+ AuthByWorkloadIdentity as AuthByWorkloadIdentitySync ,
11
+ )
12
12
from .._wif_util import AttestationProvider , create_attestation
13
13
from ._by_plugin import AuthByPlugin as AuthByPluginAsync
14
14
15
15
16
- @unique
17
- class ApiFederatedAuthenticationType (Enum ):
18
- """An API-specific enum of the WIF authentication type."""
19
-
20
- AWS = "AWS"
21
- AZURE = "AZURE"
22
- GCP = "GCP"
23
- OIDC = "OIDC"
24
-
25
- @staticmethod
26
- def from_attestation (attestation ) -> ApiFederatedAuthenticationType :
27
- """Maps the internal / driver-specific attestation providers to API authenticator types."""
28
- if attestation .provider == AttestationProvider .AWS :
29
- return ApiFederatedAuthenticationType .AWS
30
- if attestation .provider == AttestationProvider .AZURE :
31
- return ApiFederatedAuthenticationType .AZURE
32
- if attestation .provider == AttestationProvider .GCP :
33
- return ApiFederatedAuthenticationType .GCP
34
- if attestation .provider == AttestationProvider .OIDC :
35
- return ApiFederatedAuthenticationType .OIDC
36
- raise ValueError (f"Unknown attestation provider '{ attestation .provider } '" )
37
-
38
-
39
- class AuthByWorkloadIdentity (AuthByPluginAsync ):
16
+ class AuthByWorkloadIdentity (AuthByWorkloadIdentitySync , AuthByPluginAsync ):
40
17
"""Plugin to authenticate via workload identity."""
41
18
42
19
def __init__ (
@@ -48,17 +25,16 @@ def __init__(
48
25
** kwargs ,
49
26
) -> None :
50
27
"""Initializes an instance with workload identity authentication."""
51
- super ().__init__ (** kwargs )
52
- self .provider = provider
53
- self .token = token
54
- self .entra_resource = entra_resource
55
- self .attestation = None
56
-
57
- def type_ (self ) -> AuthType :
58
- return AuthType .WORKLOAD_IDENTITY
28
+ AuthByWorkloadIdentitySync .__init__ (
29
+ self ,
30
+ provider = provider ,
31
+ token = token ,
32
+ entra_resource = entra_resource ,
33
+ ** kwargs ,
34
+ )
59
35
60
36
async def reset_secrets (self ) -> None :
61
- self . attestation = None
37
+ AuthByWorkloadIdentitySync . reset_secrets ( self )
62
38
63
39
async def prepare (self , ** kwargs : Any ) -> None :
64
40
"""Fetch the token using async wif_util."""
@@ -71,19 +47,4 @@ async def reauthenticate(self, **kwargs: Any) -> dict[str, bool]:
71
47
return {"success" : False }
72
48
73
49
async def update_body (self , body : dict [Any , Any ]) -> None :
74
- body ["data" ]["AUTHENTICATOR" ] = WORKLOAD_IDENTITY_AUTHENTICATOR
75
- body ["data" ]["PROVIDER" ] = ApiFederatedAuthenticationType .from_attestation (
76
- self .attestation
77
- ).value
78
- body ["data" ]["TOKEN" ] = self .attestation .credential
79
-
80
- @property
81
- def assertion_content (self ) -> str :
82
- """Returns the CSP provider name and an identifier. Used for logging purposes."""
83
- if not self .attestation :
84
- return ""
85
- properties = self .attestation .user_identifier_components
86
- properties ["_provider" ] = self .attestation .provider .value
87
- import json
88
-
89
- return json .dumps (properties , sort_keys = True , separators = ("," , ":" ))
50
+ AuthByWorkloadIdentitySync .update_body (self , body )
0 commit comments