This repository was archived by the owner on May 13, 2025. It is now read-only.
forked from Azure/azure-cli-extensions
-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathEntraWorkloadIAM.py
More file actions
196 lines (157 loc) · 8.48 KB
/
EntraWorkloadIAM.py
File metadata and controls
196 lines (157 loc) · 8.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import os
from knack.log import get_logger
from knack.util import CLIError
from azure.cli.core import get_default_cli
from azure.cli.core.azclierror import InvalidArgumentValueError
from ..vendored_sdks.models import (Extension, Scope, ScopeCluster)
from .DefaultExtension import DefaultExtension
logger = get_logger(__name__)
# The user settings are case-insensitive
CONFIG_SETTINGS_USER_TRUST_DOMAIN = 'trustdomain'
CONFIG_SETTINGS_USER_LOCAL_AUTHORITY = 'localauthority'
CONFIG_SETTINGS_USER_TENANT_ID = 'tenantid'
CONFIG_SETTINGS_USER_JOIN_TOKEN = 'jointoken'
CONFIG_SETTINGS_HELM_TRUST_DOMAIN = 'global.workload-iam.trustDomain'
CONFIG_SETTINGS_HELM_TENANT_ID = 'global.workload-iam.tenantID'
CONFIG_SETTINGS_HELM_JOIN_TOKEN = 'workload-iam-local-authority.localAuthorityArgs.joinToken'
def settings_dict_to_lowercase(settings):
"""
Create new dictionary where the keys of the known user settings are all lowercase (but leave the
others as they were in case they are specific settings that have to be passed to the Helm chart).
"""
all_user_settings = [CONFIG_SETTINGS_USER_TRUST_DOMAIN, CONFIG_SETTINGS_USER_TENANT_ID,
CONFIG_SETTINGS_USER_LOCAL_AUTHORITY, CONFIG_SETTINGS_USER_JOIN_TOKEN]
if settings is None:
return dict()
validated_settings = dict()
for key, value in settings.items():
if key.lower() in all_user_settings:
validated_settings[key.lower()] = value
else:
validated_settings[key] = value
return validated_settings
def settings_dict_to_lowercase(settings):
"""
Create new dictionary where the keys of the known user settings are all lowercase (but leave the
others as they were in case they are specific settings that have to be passed to the Helm chart).
"""
all_user_settings = [CONFIG_SETTINGS_USER_TRUST_DOMAIN, CONFIG_SETTINGS_USER_TENANT_ID,
CONFIG_SETTINGS_USER_LOCAL_AUTHORITY, CONFIG_SETTINGS_USER_JOIN_TOKEN]
if settings is None:
return dict()
validated_settings = dict()
for key, value in settings.items():
if key.lower() in all_user_settings:
validated_settings[key.lower()] = value
else:
validated_settings[key] = value
return validated_settings
class EntraWorkloadIAM(DefaultExtension):
def Create(self, cmd, client, resource_group_name, cluster_name, name, cluster_type, cluster_rp,
extension_type, scope, auto_upgrade_minor_version, release_train, version, target_namespace,
release_namespace, configuration_settings, configuration_protected_settings,
configuration_settings_file, configuration_protected_settings_file,
plan_name, plan_publisher, plan_product):
"""
Create method for ExtensionType 'microsoft.entraworkloadiam'.
"""
# Ensure that the values provided by the user for generic values of Arc extensions are
# valid, set sensible default values if not.
if release_train is None:
# TODO - Set this to 'stable' when the extension is ready
release_train = 'preview'
# The name is used as a base to generate Kubernetes labels for config maps, pods, etc, and
# their names are limited to 63 characters (RFC-1123). Instead of calculating the exact
# number of characters that we can allow users to specify, it's better to restrict that even
# more so that we have extra space in case the name of a resource changes and it pushes the
# total string length over the limit.
if len(name) > 20:
raise InvalidArgumentValueError(
f"Name '{name}' is too long, it must be 20 characters long max.")
scope = scope.lower()
if scope is None:
scope = 'cluster'
elif scope != 'cluster':
raise InvalidArgumentValueError(
f"Invalid scope '{scope}'. This extension can only be installed at 'cluster' scope.")
# Scope is always cluster
scope_cluster = ScopeCluster(release_namespace=release_namespace)
ext_scope = Scope(cluster=scope_cluster, namespace=None)
# Turn configuration setting keys into lowercase to make them case-insensitive
config_settings = settings_dict_to_lowercase(configuration_settings)
config_protected_settings = settings_dict_to_lowercase(configuration_protected_settings)
# Get user configuration values and remove them from the dictionary so that they aren't
# passed to the Helm chart
trust_domain = config_settings.pop(CONFIG_SETTINGS_USER_TRUST_DOMAIN, None)
tenant_id = config_settings.pop(CONFIG_SETTINGS_USER_TENANT_ID, None)
local_authority = config_settings.pop(CONFIG_SETTINGS_USER_LOCAL_AUTHORITY, None)
# The join token can also be provided by the user. However, we must prevent them from
# passing it as a regular configuration setting, it must be passed as a protected setting.
join_token = config_settings.pop(CONFIG_SETTINGS_USER_JOIN_TOKEN, None)
if join_token is not None:
raise InvalidArgumentValueError(
"'joinToken' must be provided with --config-protected-settings, not "
"--configuration-settings.")
join_token = config_protected_settings.pop(CONFIG_SETTINGS_USER_JOIN_TOKEN, None)
# A trust domain name is always required
if trust_domain is None:
raise InvalidArgumentValueError(
"Invalid configuration settings. Please provide a trust domain name.")
if tenant_id is None:
raise InvalidArgumentValueError(
"Invalid configuration settings. Please provide a tenant ID.")
# If the user hasn't provided a join token, create one
if join_token is None:
if local_authority is None:
raise InvalidArgumentValueError(
"Invalid configuration settings. One of 'joinToken' or 'localAuthority' "
"must be provided.")
join_token = self.get_join_token(trust_domain, local_authority)
else:
logger.info("Join token is provided")
# Save configuration setting values to overwrite values in the Helm chart
configuration_settings[CONFIG_SETTINGS_HELM_TRUST_DOMAIN] = trust_domain
configuration_settings[CONFIG_SETTINGS_HELM_TENANT_ID] = tenant_id
if configuration_protected_settings is None:
configuration_protected_settings = dict()
configuration_protected_settings[CONFIG_SETTINGS_HELM_JOIN_TOKEN] = join_token
logger.debug("Configuration settings: %s" % str(configuration_settings))
logger.debug("Configuration protected settings keys: %s" %
str(configuration_protected_settings.keys()))
create_identity = True
extension = Extension(
extension_type=extension_type,
auto_upgrade_minor_version=auto_upgrade_minor_version,
release_train=release_train,
version=version,
scope=ext_scope,
configuration_settings=configuration_settings,
configuration_protected_settings=configuration_protected_settings
)
return extension, name, create_identity
def get_join_token(self, trust_domain, local_authority):
"""
Invoke the az command to obtain a join token.
"""
logger.debug("Getting a join token from the control plane")
# Invoke az workload-iam command to obtain the join token
cmd = [
"workload-iam", "local-authority", "attestation-method", "create",
"--td", trust_domain,
"--la", local_authority,
"--type", "joinTokenAttestationMethod",
"--query", "singleUseToken",
"--dn", "myJoinToken",
]
cli = get_default_cli()
cli.invoke(cmd, out_file=open(os.devnull, 'w')) # Don't print output
if cli.result.result:
token = cli.result.result
elif cli.result.error:
cmd_str = "az " + " ".join(cmd)
raise CLIError(f"Error while generating a join token. Command: {cmd_str}")
return token