4
4
# ------------------------------------
5
5
import logging
6
6
import os
7
- from typing import Optional , Any , Mapping , cast
7
+ from typing import Optional , Any , Mapping , cast , Tuple
8
8
9
9
from azure .core .credentials import AccessToken , AccessTokenInfo , TokenRequestOptions , TokenCredential , SupportsTokenInfo
10
10
from .. import CredentialUnavailableError
15
15
_LOGGER = logging .getLogger (__name__ )
16
16
17
17
18
- def validate_identity_config (client_id : Optional [str ], identity_config : Optional [Mapping [str , str ]]) -> None :
18
+ def validate_identity_config (
19
+ client_id : Optional [str ], identity_config : Optional [Mapping [str , str ]]
20
+ ) -> Optional [Tuple [str , str ]]:
19
21
if identity_config :
22
+ valid_keys = {"object_id" , "resource_id" , "client_id" }
20
23
if client_id :
21
- if any (key in identity_config for key in ( "object_id" , "resource_id" , "client_id" ) ):
24
+ if any (key in identity_config for key in valid_keys ):
22
25
raise ValueError (
23
- "identity_config must not contain 'object_id' , 'resource_id', or 'client_id' when 'client_id' is "
24
- "provided as a keyword argument. "
26
+ "When 'client_id' is provided as a keyword argument , 'identity_config' must not contain any of the "
27
+ f"following keys: { ', ' . join ( valid_keys ) } "
25
28
)
26
- # Only one of these keys should be present if one is present.
27
- valid_keys = {"object_id" , "resource_id" , "client_id" }
28
- if len (identity_config .keys () & valid_keys ) > 1 :
29
- raise ValueError (
30
- f"identity_config must not contain more than one of the following keys: { ', ' .join (valid_keys )} "
31
- )
29
+ return "client_id" , client_id
30
+
31
+ # Only one of the valid keys should be present if one is present.
32
+ result = None
33
+ for key in valid_keys :
34
+ if key in identity_config :
35
+ if result :
36
+ raise ValueError (
37
+ "identity_config must not contain more than one of the following keys: "
38
+ f"{ ', ' .join (valid_keys )} "
39
+ )
40
+ result = key , identity_config [key ]
41
+ return result
42
+
43
+ if client_id :
44
+ return "client_id" , client_id
45
+ return None
32
46
33
47
34
48
class ManagedIdentityCredential :
@@ -59,51 +73,58 @@ class ManagedIdentityCredential:
59
73
def __init__ (
60
74
self , * , client_id : Optional [str ] = None , identity_config : Optional [Mapping [str , str ]] = None , ** kwargs : Any
61
75
) -> None :
62
- validate_identity_config (client_id , identity_config )
76
+ user_identity_info = validate_identity_config (client_id , identity_config )
63
77
self ._credential : Optional [SupportsTokenInfo ] = None
64
78
exclude_workload_identity = kwargs .pop ("_exclude_workload_identity_credential" , False )
79
+ managed_identity_type = None
80
+
65
81
if os .environ .get (EnvironmentVariables .IDENTITY_ENDPOINT ):
66
82
if os .environ .get (EnvironmentVariables .IDENTITY_HEADER ):
67
83
if os .environ .get (EnvironmentVariables .IDENTITY_SERVER_THUMBPRINT ):
68
- _LOGGER . info ( "%s will use Service Fabric managed identity", self . __class__ . __name__ )
84
+ managed_identity_type = " Service Fabric managed identity"
69
85
from .service_fabric import ServiceFabricCredential
70
86
71
87
self ._credential = ServiceFabricCredential (
72
88
client_id = client_id , identity_config = identity_config , ** kwargs
73
89
)
74
90
else :
75
- _LOGGER . info ( "%s will use App Service managed identity", self . __class__ . __name__ )
91
+ managed_identity_type = " App Service managed identity"
76
92
from .app_service import AppServiceCredential
77
93
78
94
self ._credential = AppServiceCredential (
79
95
client_id = client_id , identity_config = identity_config , ** kwargs
80
96
)
81
97
elif os .environ .get (EnvironmentVariables .IMDS_ENDPOINT ):
82
- _LOGGER . info ( "%s will use Azure Arc managed identity", self . __class__ . __name__ )
98
+ managed_identity_type = " Azure Arc managed identity"
83
99
from .azure_arc import AzureArcCredential
84
100
85
101
self ._credential = AzureArcCredential (client_id = client_id , identity_config = identity_config , ** kwargs )
86
102
elif os .environ .get (EnvironmentVariables .MSI_ENDPOINT ):
87
103
if os .environ .get (EnvironmentVariables .MSI_SECRET ):
88
- _LOGGER . info ( "%s will use Azure ML managed identity", self . __class__ . __name__ )
104
+ managed_identity_type = " Azure ML managed identity"
89
105
from .azure_ml import AzureMLCredential
90
106
91
107
self ._credential = AzureMLCredential (client_id = client_id , identity_config = identity_config , ** kwargs )
92
108
else :
93
- _LOGGER . info ( "%s will use Cloud Shell managed identity", self . __class__ . __name__ )
109
+ managed_identity_type = " Cloud Shell managed identity"
94
110
from .cloud_shell import CloudShellCredential
95
111
96
112
self ._credential = CloudShellCredential (client_id = client_id , identity_config = identity_config , ** kwargs )
97
113
elif (
98
114
all (os .environ .get (var ) for var in EnvironmentVariables .WORKLOAD_IDENTITY_VARS )
99
115
and not exclude_workload_identity
100
116
):
101
- _LOGGER .info ("%s will use workload identity" , self .__class__ .__name__ )
102
117
from .workload_identity import WorkloadIdentityCredential
103
118
104
119
workload_client_id = client_id or os .environ .get (EnvironmentVariables .AZURE_CLIENT_ID )
105
120
if not workload_client_id :
106
- raise ValueError ('Configure the environment with a client ID or pass a value for "client_id" argument' )
121
+ raise ValueError (
122
+ "Workload identity was selected but no client ID was provided. "
123
+ 'Configure the environment with a client ID or pass a value for "client_id" argument'
124
+ )
125
+
126
+ managed_identity_type = "workload identity"
127
+ user_identity_info = ("client_id" , workload_client_id )
107
128
108
129
self ._credential = WorkloadIdentityCredential (
109
130
tenant_id = os .environ [EnvironmentVariables .AZURE_TENANT_ID ],
@@ -112,11 +133,17 @@ def __init__(
112
133
** kwargs ,
113
134
)
114
135
else :
136
+ managed_identity_type = "IMDS"
115
137
from .imds import ImdsCredential
116
138
117
- _LOGGER .info ("%s will use IMDS" , self .__class__ .__name__ )
118
139
self ._credential = ImdsCredential (client_id = client_id , identity_config = identity_config , ** kwargs )
119
140
141
+ if managed_identity_type :
142
+ log_msg = f"{ self .__class__ .__name__ } will use { managed_identity_type } "
143
+ if user_identity_info :
144
+ log_msg += f" with { user_identity_info [0 ]} : { user_identity_info [1 ]} "
145
+ _LOGGER .info (log_msg )
146
+
120
147
def __enter__ (self ) -> "ManagedIdentityCredential" :
121
148
if self ._credential :
122
149
self ._credential .__enter__ () # type: ignore
0 commit comments