forked from Azure/azure-cli
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcommands.py
More file actions
224 lines (193 loc) · 12.8 KB
/
commands.py
File metadata and controls
224 lines (193 loc) · 12.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from azure.cli.core.commands import LongRunningOperation, CliCommandType
from azure.cli.command_modules.iot._client_factory import (iot_hub_service_factory,
iot_central_service_factory,
iot_service_provisioning_factory)
from azure.cli.command_modules.iot._utils import _dps_certificate_response_transform
CS_DEPRECATION_INFO = 'IoT Extension (azure-iot) connection-string command (az iot hub connection-string show)'
ROUTE_DEPRECATION_INFO = 'IoT Extension (azure-iot) message-route command group (az iot hub message-route)'
ENDPOINT_DEPRECATION_INFO = 'IoT Extension (azure-iot) message-endpoint command group (az iot hub message-endpoint)'
class PolicyUpdateResultTransform(LongRunningOperation): # pylint: disable=too-few-public-methods
def __call__(self, poller):
result = super().__call__(poller)
return result.properties.authorization_policies
class EndpointUpdateResultTransform(LongRunningOperation): # pylint: disable=too-few-public-methods
def __call__(self, poller):
result = super().__call__(poller)
return result.properties.routing.endpoints
class RouteUpdateResultTransform(LongRunningOperation): # pylint: disable=too-few-public-methods
def __call__(self, poller):
result = super().__call__(poller)
return result.properties.routing.routes
# Deleting IoT Hub is a long-running operation. Due to API implementation issue, 404 error will be thrown during
# deletion of an IoT Hub.
# This is a work around to suppress the 404 error. It should be removed after API is fixed.
class HubDeleteResultTransform(LongRunningOperation): # pylint: disable=too-few-public-methods
def __call__(self, poller):
from azure.cli.core.util import CLIError
# if no wait, return right away
if not poller:
return poller
try:
super().__call__(poller)
except CLIError as e:
if 'not found' not in str(e):
raise e
def load_command_table(self, _): # pylint: disable=too-many-statements
update_custom_util = CliCommandType(operations_tmpl='azure.cli.command_modules.iot.custom#{}')
iot_central_sdk = CliCommandType(
operations_tmpl='azure.mgmt.iotcentral.operations#IoTCentralOperations.{}'
)
private_endpoint_sdk = CliCommandType(
operations_tmpl='azure.mgmt.iotcentral.operations#PrivateEndpointConnectionsOperations.{}',
# client_factory=cf_private_endpoint,
)
private_link_resource_sdk = CliCommandType(
operations_tmpl='azure.mgmt.iotcentral.operations#PrivateLinksOperations.{}',
# client_factory=cf_private_link,
)
# iot dps commands
with self.command_group('iot dps', client_factory=iot_service_provisioning_factory) as g:
g.custom_command('list', 'iot_dps_list')
g.custom_show_command('show', 'iot_dps_get')
g.custom_command('create', 'iot_dps_create')
g.custom_command('delete', 'iot_dps_delete')
g.generic_update_command('update', getter_name='iot_dps_get', setter_name='iot_dps_update',
command_type=update_custom_util)
# iot dps linked-hub commands
with self.command_group('iot dps linked-hub', client_factory=iot_service_provisioning_factory) as g:
g.custom_command('list', 'iot_dps_linked_hub_list')
g.custom_show_command('show', 'iot_dps_linked_hub_get')
g.custom_command('create', 'iot_dps_linked_hub_create', supports_no_wait=True)
g.custom_command('update', 'iot_dps_linked_hub_update', supports_no_wait=True)
g.custom_command('delete', 'iot_dps_linked_hub_delete', supports_no_wait=True)
# iot dps certificate commands
with self.command_group('iot dps certificate',
client_factory=iot_service_provisioning_factory,
transform=_dps_certificate_response_transform) as g:
g.custom_command(
'list', 'iot_dps_certificate_list',
table_transformer=(
"value[*].{Name:name,ResourceGroup:resourceGroup,Created:properties.created,Expiry:properties.expiry,"
"Subject:properties.subject,Thumbprint:properties.thumbprint,IsVerified:properties.isVerified}"
)
)
g.custom_show_command('show', 'iot_dps_certificate_get')
g.custom_command('create', 'iot_dps_certificate_create')
g.custom_command('delete', 'iot_dps_certificate_delete')
g.custom_command('generate-verification-code', 'iot_dps_certificate_gen_code')
g.custom_command('verify', 'iot_dps_certificate_verify')
g.custom_command('update', 'iot_dps_certificate_update')
# iot dps policy commands
with self.command_group('iot dps policy', client_factory=iot_service_provisioning_factory) as g:
g.custom_command('list', 'iot_dps_policy_list')
g.custom_show_command('show', 'iot_dps_policy_get')
g.custom_command('create', 'iot_dps_policy_create', supports_no_wait=True)
g.custom_command('update', 'iot_dps_policy_update', supports_no_wait=True)
g.custom_command('delete', 'iot_dps_policy_delete', supports_no_wait=True)
# iot hub certificate commands
with self.command_group('iot hub certificate', client_factory=iot_hub_service_factory) as g:
g.custom_command(
'list', 'iot_hub_certificate_list',
table_transformer=(
"value[*].{Name:name,ResourceGroup:resourceGroup,Created:properties.created,Expiry:properties.expiry,"
"Subject:properties.subject,Thumbprint:properties.thumbprint,IsVerified:properties.isVerified}"
)
)
g.custom_show_command('show', 'iot_hub_certificate_get')
g.custom_command('create', 'iot_hub_certificate_create')
g.custom_command('delete', 'iot_hub_certificate_delete')
g.custom_command('generate-verification-code', 'iot_hub_certificate_gen_code')
g.custom_command('verify', 'iot_hub_certificate_verify')
g.custom_command('update', 'iot_hub_certificate_update')
# iot hub commands
with self.command_group('iot hub', client_factory=iot_hub_service_factory) as g:
g.custom_command('create', 'iot_hub_create', supports_no_wait=True)
g.custom_command('list', 'iot_hub_list')
g.custom_command('show-connection-string', 'iot_hub_show_connection_string',
deprecate_info=self.deprecate(redirect=CS_DEPRECATION_INFO, hide=True))
g.custom_show_command('show', 'iot_hub_get')
g.generic_update_command('update', getter_name='iot_hub_get', setter_name='iot_hub_update',
command_type=update_custom_util, custom_func_name='update_iot_hub_custom')
g.custom_command('delete', 'iot_hub_delete', transform=HubDeleteResultTransform(self.cli_ctx),
supports_no_wait=True)
g.custom_wait_command('wait', 'iot_hub_get')
g.custom_command('list-skus', 'iot_hub_sku_list')
g.custom_command('show-quota-metrics', 'iot_hub_get_quota_metrics')
g.custom_command('show-stats', 'iot_hub_get_stats')
g.custom_command('manual-failover', 'iot_hub_manual_failover', supports_no_wait=True)
# iot hub consumer group commands
with self.command_group('iot hub consumer-group', client_factory=iot_hub_service_factory) as g:
g.custom_command('create', 'iot_hub_consumer_group_create')
g.custom_command('list', 'iot_hub_consumer_group_list')
g.custom_show_command('show', 'iot_hub_consumer_group_get')
g.custom_command('delete', 'iot_hub_consumer_group_delete')
# iot hub identity commands
with self.command_group('iot hub identity', client_factory=iot_hub_service_factory) as g:
g.custom_command('assign', 'iot_hub_identity_assign')
g.custom_show_command('show', 'iot_hub_identity_show')
g.custom_command('remove', 'iot_hub_identity_remove')
# iot hub policy commands
with self.command_group('iot hub policy', client_factory=iot_hub_service_factory) as g:
g.custom_command('list', 'iot_hub_policy_list')
g.custom_show_command('show', 'iot_hub_policy_get')
g.custom_command('create', 'iot_hub_policy_create', transform=PolicyUpdateResultTransform(self.cli_ctx))
g.custom_command('delete', 'iot_hub_policy_delete', transform=PolicyUpdateResultTransform(self.cli_ctx))
g.custom_command('renew-key', 'iot_hub_policy_key_renew', supports_no_wait=True)
# iot hub routing endpoint commands
with self.command_group('iot hub routing-endpoint', client_factory=iot_hub_service_factory,
deprecate_info=self.deprecate(redirect=ENDPOINT_DEPRECATION_INFO, hide=True)) as g:
g.custom_command('create', 'iot_hub_routing_endpoint_create',
transform=EndpointUpdateResultTransform(self.cli_ctx))
g.custom_show_command('show', 'iot_hub_routing_endpoint_show')
g.custom_command('list', 'iot_hub_routing_endpoint_list')
g.custom_command('delete', 'iot_hub_routing_endpoint_delete',
transform=EndpointUpdateResultTransform(self.cli_ctx))
# iot hub message enrichment commands
with self.command_group('iot hub message-enrichment', client_factory=iot_hub_service_factory) as g:
g.custom_command('create', 'iot_message_enrichment_create')
g.custom_command('list', 'iot_message_enrichment_list')
g.custom_command('delete', 'iot_message_enrichment_delete')
g.custom_command('update', 'iot_message_enrichment_update')
# iot hub route commands
with self.command_group('iot hub route', client_factory=iot_hub_service_factory,
deprecate_info=self.deprecate(redirect=ROUTE_DEPRECATION_INFO, hide=True)) as g:
g.custom_command('create', 'iot_hub_route_create', transform=RouteUpdateResultTransform(self.cli_ctx))
g.custom_show_command('show', 'iot_hub_route_show')
g.custom_command('list', 'iot_hub_route_list')
g.custom_command('delete', 'iot_hub_route_delete', transform=RouteUpdateResultTransform(self.cli_ctx))
g.custom_command('update', 'iot_hub_route_update', transform=RouteUpdateResultTransform(self.cli_ctx))
g.custom_command('test', 'iot_hub_route_test')
# iot central commands
with self.command_group('iot central app', iot_central_sdk, client_factory=iot_central_service_factory) as g:
g.custom_command('create', 'iot_central_app_create', supports_no_wait=True)
g.custom_command('list', 'iot_central_app_list')
g.custom_show_command('show', 'iot_central_app_get')
g.generic_update_command('update', getter_name='iot_central_app_get',
setter_name='iot_central_app_update', command_type=update_custom_util)
g.custom_command('delete', 'iot_central_app_delete', confirmation=True, supports_no_wait=True)
g.custom_command('identity assign', 'iot_central_app_assign_identity')
g.custom_command('identity remove', 'iot_central_app_remove_identity')
g.custom_show_command('identity show', 'iot_central_app_show_identity')
with self.command_group('iot central app private-endpoint-connection',
private_endpoint_sdk,
client_factory=iot_central_service_factory) as g:
from azure.cli.command_modules.iot._validators import validate_private_endpoint_connection_id
g.custom_command('delete', 'delete_private_endpoint_connection', confirmation=True,
validator=validate_private_endpoint_connection_id)
g.custom_show_command('show', 'show_private_endpoint_connection',
validator=validate_private_endpoint_connection_id)
g.custom_command('approve', 'approve_private_endpoint_connection',
validator=validate_private_endpoint_connection_id)
g.custom_command('reject', 'reject_private_endpoint_connection',
validator=validate_private_endpoint_connection_id)
g.custom_command('list', 'list_private_endpoint_connection')
with self.command_group('iot central app private-link-resource', private_link_resource_sdk,
client_factory=iot_central_service_factory) as g:
from azure.cli.core.commands.transform import gen_dict_to_list_transform
g.custom_command('list', 'list_private_link_resource',
transform=gen_dict_to_list_transform(key="value"))
g.custom_show_command('show', 'get_private_link_resource')