Skip to content

Commit 45f1efd

Browse files
committed
Merge branch 'm-kovalsky/surge_protection'
2 parents 8cbbbd7 + ad93108 commit 45f1efd

File tree

2 files changed

+379
-0
lines changed

2 files changed

+379
-0
lines changed
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from ._items import (
2+
get_workspace_consumption_rules,
3+
get_background_operation_rules,
4+
set_background_operation_rules,
5+
set_workspace_consumption_rules,
6+
delete_workspace_consumption_rules,
7+
delete_background_operation_rules,
8+
)
9+
10+
__all__ = [
11+
"get_workspace_consumption_rules",
12+
"get_background_operation_rules",
13+
"set_background_operation_rules",
14+
"set_workspace_consumption_rules",
15+
"delete_workspace_consumption_rules",
16+
"delete_background_operation_rules",
17+
]
Lines changed: 362 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,362 @@
1+
import requests
2+
import pandas as pd
3+
from uuid import UUID
4+
from sempy_labs._helper_functions import (
5+
get_pbi_token_headers,
6+
_get_url_prefix,
7+
resolve_capacity_id,
8+
_create_dataframe,
9+
_update_dataframe_datatypes,
10+
)
11+
import sempy_labs._icons as icons
12+
from sempy._utils._log import log
13+
from sempy.fabric.exceptions import FabricHTTPException
14+
15+
16+
def _surge_api(
17+
capacity, request, payload, method="get", status_code=200, return_json=True
18+
):
19+
20+
headers = get_pbi_token_headers()
21+
prefix = _get_url_prefix()
22+
capacity_id = resolve_capacity_id(capacity)
23+
24+
response = requests.request(
25+
method=method,
26+
url=f"{prefix}/capacities/{capacity_id}/{request}",
27+
headers=headers,
28+
json=payload,
29+
)
30+
31+
if response.status_code != status_code:
32+
raise FabricHTTPException(response)
33+
if return_json:
34+
return response.json()
35+
else:
36+
return None # response
37+
38+
39+
@log
40+
def get_workspace_consumption_rules(
41+
capacity: str | UUID = None, return_dataframe: bool = True
42+
) -> pd.DataFrame | dict:
43+
"""
44+
Retrieves the workspace consumption surge protection rules for the specified capacity.
45+
46+
Workspace Consumption
47+
When CU consumption by a single workspace reaches the rejection threshold, reject new operation requests and block the workspace for the specified amount of time.
48+
49+
Parameters
50+
----------
51+
capacity : str | uuid.UUID, default=None
52+
The capacity name or ID.
53+
Defaults to None which resolves to the capacity id of the workspace of the attached lakehouse
54+
or if no lakehouse attached, resolves to the capacity name of the workspace of the notebook.
55+
56+
Returns
57+
-------
58+
pandas.DataFrame | dict
59+
A pandas dataframe showing the workspace consumption surge protection rules for the specified capacity,
60+
or a dictionary if return_dataframe is set to False.
61+
"""
62+
63+
response_json = _surge_api(
64+
capacity=capacity, request="detectionRules", payload=None
65+
)
66+
67+
if not return_dataframe:
68+
return response_json
69+
else:
70+
columns = {
71+
"Usage Threshold": "float",
72+
"Blocked Duration": "str",
73+
"Blocked Duration Policy": "str",
74+
"Detection Rule Id": "str",
75+
"Detection Rule Type": "str",
76+
}
77+
df = _create_dataframe(columns=columns)
78+
rows = []
79+
workspace_action = response_json.get("blockWorkspaceAction", {})
80+
rows.append(
81+
{
82+
"Usage Threshold": response_json.get(
83+
"highWorkspaceUsageCondition", {}
84+
).get("usageThreshold"),
85+
"Blocked Duration": workspace_action.get("blockedDuration"),
86+
"Blocked Duration Policy": workspace_action.get(
87+
"blockedDurationPolicy"
88+
),
89+
"Detection Rule Id": response_json.get("detectionRuleId"),
90+
"Detection Rule Type": response_json.get("detectionRuleType"),
91+
}
92+
)
93+
if rows:
94+
df = pd.DataFrame(rows)
95+
_update_dataframe_datatypes(dataframe=df, column_map=columns)
96+
return df
97+
98+
99+
@log
100+
def get_background_operation_rules(
101+
capacity: str | UUID = None, return_dataframe: bool = True
102+
) -> pd.DataFrame | dict:
103+
"""
104+
Retrieves the background operation surge protection rules for the specified capacity.
105+
106+
Background Operations
107+
When total CU consumption reaches the rejection threshold, reject new background operation requests. When total CU consumption drops below the recovery threshold, accept new background operation requests.
108+
109+
Parameters
110+
----------
111+
capacity : str | uuid.UUID, default=None
112+
The capacity name or ID.
113+
Defaults to None which resolves to the capacity id of the workspace of the attached lakehouse
114+
or if no lakehouse attached, resolves to the capacity name of the workspace of the notebook.
115+
116+
Returns
117+
-------
118+
pandas.DataFrame | dict
119+
A pandas dataframe showing the background operation surge protection rules for the specified capacity,
120+
or a dictionary if return_dataframe is set to False.
121+
"""
122+
123+
response_json = _surge_api(
124+
capacity=capacity, request="surgeProtectionRules", payload=None
125+
)
126+
127+
if not return_dataframe:
128+
return response_json
129+
else:
130+
columns = {
131+
"Rule Instance Id": "str",
132+
"Rule Type Id": "int",
133+
"Rule Id": "str",
134+
"Rejection Threshold Percentage": "int",
135+
"Recovery Threshold Percentage": "int",
136+
"Utilization Type": "str",
137+
"Throttling Level": "str",
138+
}
139+
140+
df = _create_dataframe(columns=columns)
141+
rows = []
142+
for rule in response_json.get("rules", []):
143+
trig = rule.get("triggeredThrottlingLevel", {})
144+
throt = rule.get("usageThrottlingCriteria", {})
145+
rows.append(
146+
{
147+
"Rule Instance Id": rule.get("ruleInstanceId"),
148+
"Rule Type Id": rule.get("ruleTypeId"),
149+
"Rule Id": rule.get("ruleId"),
150+
"Rejection Threshold Percentage": throt.get(
151+
"triggerThresholdPercentage"
152+
),
153+
"Recovery Threshold Percentage": throt.get(
154+
"recoveryThresholdPercentage"
155+
),
156+
"Utilization Type": trig.get("utilizationType"),
157+
"Throttling Level": trig.get("throttlingLevel"),
158+
}
159+
)
160+
if rows:
161+
df = pd.DataFrame(rows)
162+
_update_dataframe_datatypes(dataframe=df, column_map=columns)
163+
return df
164+
165+
166+
@log
167+
def set_workspace_consumption_rules(
168+
capacity: str | UUID = None,
169+
rejection_threshold: int = 75,
170+
block_duration_hours: int = 24,
171+
block_indefinitely: bool = False,
172+
) -> dict:
173+
"""
174+
Sets the workspace consumption surge protection rules for the specified capacity.
175+
176+
Workspace Consumption
177+
When total CU consumption by a single workspace reaches the rejection threshold, reject new operation requests and block the workspace for the specified amount of time.
178+
179+
Parameters
180+
----------
181+
capacity : str | uuid.UUID, default=None
182+
The capacity name or ID.
183+
Defaults to None which resolves to the capacity id of the workspace of the attached lakehouse
184+
or if no lakehouse attached, resolves to the capacity name of the workspace of the notebook.
185+
rejection_threshold : int, default=75
186+
The CU consumption percentage threshold (1-100) at which new operation requests will be rejected.
187+
block_duration_hours : int, default=24
188+
The duration in hours to block the workspace when the rejection threshold is reached. Must be at least 1 hour.
189+
Ignored if block_indefinitely is set to True.
190+
block_indefinitely : bool, default=False
191+
If True, the workspace will be blocked indefinitely when the rejection threshold is reached.
192+
193+
Returns
194+
-------
195+
dict
196+
A dictionary showing the workspace consumption surge protection rules for the specified capacity.
197+
"""
198+
199+
if not block_indefinitely and block_duration_hours < 1:
200+
raise ValueError(
201+
f"{icons.red_dot} The block_duration_hours must be at least 1 hour."
202+
)
203+
204+
if rejection_threshold < 1 or rejection_threshold > 100:
205+
raise ValueError(
206+
f"{icons.red_dot} The rejection_threshold must be between 1 and 100."
207+
)
208+
209+
payload = {
210+
"detectionRuleId": None,
211+
"detectionRuleType": "detectHighWorkspaceUsageAndBlockWorkspace",
212+
"blockWorkspaceAction": {
213+
"blockedDurationPolicy": (
214+
"indefinite" if block_indefinitely else "fixedDuration"
215+
),
216+
"blockedDuration": (
217+
None if block_indefinitely else f"PT{block_duration_hours}H"
218+
),
219+
},
220+
"highWorkspaceUsageCondition": {
221+
"usageThreshold": rejection_threshold,
222+
},
223+
}
224+
225+
return _surge_api(
226+
capacity=capacity, request="detectionRules", payload=payload, method="post"
227+
)
228+
229+
230+
@log
231+
def set_background_operation_rules(
232+
capacity: str | UUID = None,
233+
rejection_threshold: int = 75,
234+
recovery_threshold: int = 25,
235+
) -> dict:
236+
"""
237+
Sets the background operation surge protection rules for the specified capacity.
238+
239+
Background Operations
240+
When total CU consumption reaches the rejection threshold, reject new background operation requests. When total CU consumption drops below the recovery threshold, accept new background operation requests.
241+
242+
Parameters
243+
----------
244+
capacity : str | uuid.UUID, default=None
245+
The capacity name or ID.
246+
Defaults to None which resolves to the capacity id of the workspace of the attached lakehouse
247+
or if no lakehouse attached, resolves to the capacity name of the workspace of the notebook.
248+
rejection_threshold : int, default=75
249+
The CU consumption percentage threshold (1-100) at which new background operation requests will be rejected.
250+
recovery_threshold : int, default=25
251+
The CU consumption percentage threshold (5-100) at which new background operation requests will be accepted.
252+
253+
Returns
254+
-------
255+
dict
256+
A dictionary showing the background operation surge protection rules for the specified capacity.
257+
"""
258+
259+
if rejection_threshold < 1 or rejection_threshold > 100:
260+
raise ValueError(
261+
f"{icons.red_dot} The rejection_threshold must be between 1 and 100."
262+
)
263+
if recovery_threshold < 5 or recovery_threshold > 100:
264+
raise ValueError(
265+
f"{icons.red_dot} The recovery_threshold must be between 5 and 100."
266+
)
267+
if recovery_threshold >= rejection_threshold:
268+
raise ValueError(
269+
f"{icons.red_dot} The recovery_threshold must be less than the rejection_threshold."
270+
)
271+
272+
payload = {
273+
"rules": [
274+
{
275+
"ruleTypeId": 1,
276+
"ruleInstanceId": 1,
277+
"triggeredThrottlingLevel": {
278+
"utilizationType": "Background",
279+
"throttlingLevel": "Extreme",
280+
},
281+
"usageThrottlingCriteria": {
282+
"triggerThresholdPercentage": rejection_threshold,
283+
"recoveryThresholdPercentage": recovery_threshold,
284+
},
285+
}
286+
]
287+
}
288+
289+
_surge_api(
290+
capacity=capacity,
291+
request="surgeProtectionRules",
292+
payload=payload,
293+
method="put",
294+
return_json=False,
295+
)
296+
297+
return get_background_operation_rules(capacity=capacity, return_dataframe=False)
298+
299+
300+
@log
301+
def delete_workspace_consumption_rules(capacity: str | UUID = None):
302+
"""
303+
Deletes the workspace consumption surge protection rules for the specified capacity.
304+
305+
Workspace Consumption
306+
When total CU consumption by a single workspace reaches the rejection threshold, reject new operation requests and block the workspace for the specified amount of time.
307+
308+
Parameters
309+
----------
310+
capacity : str | uuid.UUID, default=None
311+
The capacity name or ID.
312+
Defaults to None which resolves to the capacity id of the workspace of the attached lakehouse
313+
or if no lakehouse attached, resolves to the capacity name of the workspace of the notebook.
314+
"""
315+
316+
rules = get_workspace_consumption_rules(capacity=capacity, return_dataframe=False)
317+
if not rules:
318+
print(f"{icons.yellow_dot} No workspace consumption rules found to delete.")
319+
return
320+
321+
for v in rules.get("value", []):
322+
rule_id = v.get("detectionRuleId")
323+
break
324+
325+
_surge_api(
326+
capacity=capacity,
327+
request=f"detectionRules/{rule_id}",
328+
payload=None,
329+
method="delete",
330+
status_code=204,
331+
return_json=False,
332+
)
333+
334+
print(f"{icons.green_dot} The workspace consumption rules deleted successfully.")
335+
336+
337+
@log
338+
def delete_background_operation_rules(capacity: str | UUID = None):
339+
"""
340+
Deletes the background operation surge protection rules for the specified capacity.
341+
342+
Background Operations
343+
When total CU consumption reaches the rejection threshold, reject new background operation requests. When total CU consumption drops below the recovery threshold, accept new background operation requests.
344+
345+
Parameters
346+
----------
347+
capacity : str | uuid.UUID, default=None
348+
The capacity name or ID.
349+
Defaults to None which resolves to the capacity id of the workspace of the attached lakehouse
350+
or if no lakehouse attached, resolves to the capacity name of the workspace of the notebook.
351+
"""
352+
353+
_surge_api(
354+
capacity=capacity,
355+
request="surgeProtectionRules",
356+
payload=None,
357+
method="delete",
358+
status_code=200,
359+
return_json=False,
360+
)
361+
362+
print(f"{icons.green_dot} The background operation rules deleted successfully.")

0 commit comments

Comments
 (0)