Skip to content

Commit b5b85dc

Browse files
authored
Replaced ratelimit and tenacity dependencies with simpler implementations (#195)
This PR simplifies the handling of rate-limiting to a single `@rate_limited` decorator that could be added to methods requiring throttling.
1 parent 11dd789 commit b5b85dc

File tree

10 files changed

+107
-27
lines changed

10 files changed

+107
-27
lines changed

pyproject.toml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,6 @@ classifiers = [
2828
dependencies = [
2929
"databricks-sdk~=0.8.0",
3030
"PyYAML>=6.0.0,<7.0.0",
31-
32-
# TODO: remove later
33-
"ratelimit>=2.2.1,<3.0.0",
34-
"tenacity>=8.2.2,<9.0.0",
3531
]
3632

3733
[project.optional-dependencies]

src/databricks/labs/ucx/config.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ class ConnectConfig:
4141
profile: str | None = None
4242
debug_headers: bool | None = False
4343
rate_limit: int | None = None
44+
max_connections_per_pool: int | None = None
45+
max_connection_pools: int | None = None
4446

4547
@staticmethod
4648
def from_databricks_config(cfg: Config) -> "ConnectConfig":
@@ -57,6 +59,8 @@ def from_databricks_config(cfg: Config) -> "ConnectConfig":
5759
profile=cfg.profile,
5860
debug_headers=cfg.debug_headers,
5961
rate_limit=cfg.rate_limit,
62+
max_connection_pools=cfg.max_connection_pools,
63+
max_connections_per_pool=cfg.max_connections_per_pool,
6064
)
6165

6266
@classmethod
@@ -136,7 +140,7 @@ def from_dict(cls, raw: dict) -> "MigrationConfig":
136140
groups=GroupsConfig.from_dict(raw.get("groups", {})),
137141
connect=ConnectConfig.from_dict(raw.get("connect", {})),
138142
instance_pool_id=raw.get("instance_pool_id", None),
139-
num_threads=raw.get("num_threads", 4),
143+
num_threads=raw.get("num_threads", 8),
140144
log_level=raw.get("log_level", "INFO"),
141145
)
142146

@@ -170,6 +174,8 @@ def to_databricks_config(self) -> Config:
170174
profile=connect.profile,
171175
debug_headers=connect.debug_headers,
172176
rate_limit=connect.rate_limit,
177+
max_connection_pools=connect.max_connection_pools,
178+
max_connections_per_pool=connect.max_connections_per_pool,
173179
product="ucx",
174180
product_version=__version__,
175181
)

src/databricks/labs/ucx/managers/group.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@
55

66
from databricks.sdk import WorkspaceClient
77
from databricks.sdk.service import iam
8-
from ratelimit import limits, sleep_and_retry
98

109
from databricks.labs.ucx.config import GroupsConfig
1110
from databricks.labs.ucx.generic import StrEnum
1211
from databricks.labs.ucx.providers.groups_info import (
1312
GroupMigrationState,
1413
MigrationGroupInfo,
1514
)
15+
from databricks.labs.ucx.providers.mixins.hardening import rate_limited
1616
from databricks.labs.ucx.utils import ThreadedExecution
1717

1818
logger = logging.getLogger(__name__)
@@ -117,8 +117,7 @@ def _replace_group(self, migration_info: MigrationGroupInfo):
117117

118118
self._reflect_account_group_to_workspace(migration_info.account)
119119

120-
@sleep_and_retry
121-
@limits(calls=5, period=1) # assumption
120+
@rate_limited(max_requests=5) # assumption
122121
def _reflect_account_group_to_workspace(self, acc_group: iam.Group) -> None:
123122
logger.info(f"Reflecting group {acc_group.display_name} to workspace")
124123

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import logging
2+
import threading
3+
import time
4+
from functools import wraps
5+
6+
logger = logging.getLogger(__name__)
7+
8+
9+
class RateLimiter:
10+
"""Used to limit the rate of requests to a certain maximum rate within a specified burst period,
11+
defaulting to one second. It is nice to control and manage the flow of requests to prevent
12+
overloading a service.
13+
14+
It uses a variation of the Leaky Bucket algorithm (https://en.wikipedia.org/wiki/Leaky_bucket)
15+
for its simplicity.
16+
"""
17+
18+
def __init__(self, *, max_requests: int = 30, burst_period_seconds: int = 1):
19+
self._capacity = max_requests
20+
self._burst_period_seconds = burst_period_seconds
21+
self._lock = threading.RLock()
22+
self._last = time.time()
23+
self._bucket = 0
24+
25+
def throttle(self):
26+
"""The throttle method is used to check and control the rate of requests. It's intended to be called
27+
before making a new request and is thread-safe.
28+
29+
It calculates the time elapsed since the last request and subtracts this from the _burst_period_seconds
30+
to determine how much time remains in the current burst period. If the calculated delay is less than zero,
31+
it means that the burst period has elapsed, so it resets the request count and updates the last request
32+
timestamp. If the request count exceeds the allowed maximum (_max_requests), it sleeps for the remaining
33+
time in the burst period (delay) to enforce the rate limit.
34+
"""
35+
with self._lock:
36+
now = time.time()
37+
delay = self._burst_period_seconds - (now - self._last)
38+
if delay < 0:
39+
# If the bucket is empty, it stops leaking.
40+
self._bucket = 0
41+
self._last = now
42+
self._bucket += 1
43+
if self._bucket > self._capacity:
44+
# If the bucket is over capacity, start leaking
45+
# by sleeping on the current thread.
46+
logger.debug(f"Throttled for {delay}s")
47+
time.sleep(delay)
48+
49+
50+
def rate_limited(*, max_requests: int = 30, burst_period_seconds: int = 1):
51+
def decorator(func):
52+
rate_limiter = RateLimiter(max_requests=max_requests, burst_period_seconds=burst_period_seconds)
53+
54+
@wraps(func)
55+
def wrapper(*args, **kwargs):
56+
rate_limiter.throttle()
57+
return func(*args, **kwargs)
58+
59+
return wrapper
60+
61+
return decorator

src/databricks/labs/ucx/support/group_level.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
from functools import partial
33

44
from databricks.sdk.service import iam
5-
from ratelimit import limits, sleep_and_retry
65

76
from databricks.labs.ucx.inventory.types import Destination, PermissionsInventoryItem
87
from databricks.labs.ucx.providers.groups_info import GroupMigrationState
8+
from databricks.labs.ucx.providers.mixins.hardening import rate_limited
99
from databricks.labs.ucx.support.base import BaseSupport
1010

1111

@@ -17,8 +17,7 @@ def _crawler_task(self, group: iam.Group, property_name: str):
1717
raw_object_permissions=json.dumps([e.as_dict() for e in getattr(group, property_name)]),
1818
)
1919

20-
@sleep_and_retry
21-
@limits(calls=10, period=1)
20+
@rate_limited(max_requests=10)
2221
def _applier_task(self, group_id: str, value: list[iam.ComplexValue], property_name: str):
2322
operations = [iam.Patch(op=iam.PatchOp.ADD, path=property_name, value=[e.as_dict() for e in value])]
2423
schemas = [iam.PatchSchema.URN_IETF_PARAMS_SCIM_API_MESSAGES_2_0_PATCH_OP]

src/databricks/labs/ucx/support/listing.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
from databricks.sdk import WorkspaceClient
88
from databricks.sdk.service import ml, workspace
99
from databricks.sdk.service.workspace import ObjectInfo, ObjectType
10-
from ratelimit import limits, sleep_and_retry
1110

1211
from databricks.labs.ucx.inventory.types import RequestObjectType
12+
from databricks.labs.ucx.providers.mixins.hardening import rate_limited
1313
from databricks.labs.ucx.support.permissions import GenericPermissionsInfo
1414

1515
logger = logging.getLogger(__name__)
@@ -44,8 +44,7 @@ def _progress_report(self, _):
4444
f" rps: {rps:.3f}/sec"
4545
)
4646

47-
@sleep_and_retry
48-
@limits(calls=45, period=1) # safety value, can be 50 actually
47+
@rate_limited(max_requests=45) # safety value, can be 50 actually
4948
def _list_workspace(self, path: str) -> Iterator[ObjectType]:
5049
# TODO: remove, use SDK
5150
return self._ws.workspace.list(path=path, recursive=False)

src/databricks/labs/ucx/support/permissions.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@
66
from databricks.sdk import WorkspaceClient
77
from databricks.sdk.core import DatabricksError
88
from databricks.sdk.service import iam
9-
from ratelimit import limits, sleep_and_retry
109

1110
from databricks.labs.ucx.inventory.types import (
1211
Destination,
1312
PermissionsInventoryItem,
1413
RequestObjectType,
1514
)
1615
from databricks.labs.ucx.providers.groups_info import GroupMigrationState
16+
from databricks.labs.ucx.providers.mixins.hardening import rate_limited
1717
from databricks.labs.ucx.support.base import BaseSupport, logger
1818

1919

@@ -79,15 +79,13 @@ def _prepare_new_acl(
7979

8080
return acl_requests
8181

82-
@sleep_and_retry
83-
@limits(calls=30, period=1)
82+
@rate_limited(max_requests=30)
8483
def _applier_task(
8584
self, ws: WorkspaceClient, object_id: str, acl: list[iam.AccessControlRequest], request_type: RequestObjectType
8685
):
8786
ws.permissions.update(request_object_type=request_type, request_object_id=object_id, access_control_list=acl)
8887

89-
@sleep_and_retry
90-
@limits(calls=100, period=1)
88+
@rate_limited(max_requests=100)
9189
def _crawler_task(
9290
self,
9391
ws: WorkspaceClient,

src/databricks/labs/ucx/support/secrets.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55

66
from databricks.sdk import WorkspaceClient
77
from databricks.sdk.service import iam, workspace
8-
from ratelimit import limits, sleep_and_retry
98

109
from databricks.labs.ucx.inventory.types import Destination, PermissionsInventoryItem
1110
from databricks.labs.ucx.providers.groups_info import GroupMigrationState
11+
from databricks.labs.ucx.providers.mixins.hardening import rate_limited
1212
from databricks.labs.ucx.support.base import BaseSupport
1313

1414

@@ -66,8 +66,7 @@ def _inflight_check(
6666
msg = f"Failed to apply permissions for {group_name} on scope {scope_name} in {num_retries} retries"
6767
raise ValueError(msg)
6868

69-
@sleep_and_retry
70-
@limits(calls=30, period=1)
69+
@rate_limited(max_requests=30)
7170
def _rate_limited_put_acl(self, object_id: str, principal: str, permission: workspace.AclPermission):
7271
self._ws.secrets.put_acl(object_id, principal, permission)
7372
self._inflight_check(scope_name=object_id, group_name=principal, expected_permission=permission)

src/databricks/labs/ucx/support/sql.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@
77
from databricks.sdk import WorkspaceClient
88
from databricks.sdk.core import DatabricksError
99
from databricks.sdk.service import iam, sql
10-
from ratelimit import limits, sleep_and_retry
1110

1211
from databricks.labs.ucx.inventory.types import Destination, PermissionsInventoryItem
1312
from databricks.labs.ucx.providers.groups_info import GroupMigrationState
13+
from databricks.labs.ucx.providers.mixins.hardening import rate_limited
1414
from databricks.labs.ucx.support.base import BaseSupport, logger
1515

1616

@@ -46,8 +46,7 @@ def _safe_get_dbsql_permissions(self, object_type: sql.ObjectTypePlural, object_
4646
else:
4747
raise e
4848

49-
@sleep_and_retry
50-
@limits(calls=100, period=1)
49+
@rate_limited(max_requests=100)
5150
def _crawler_task(self, object_id: str, object_type: sql.ObjectTypePlural) -> PermissionsInventoryItem | None:
5251
permissions = self._safe_get_dbsql_permissions(object_type=object_type, object_id=object_id)
5352
if permissions:
@@ -57,8 +56,7 @@ def _crawler_task(self, object_id: str, object_type: sql.ObjectTypePlural) -> Pe
5756
raw_object_permissions=json.dumps(permissions.as_dict()),
5857
)
5958

60-
@sleep_and_retry
61-
@limits(calls=30, period=1)
59+
@rate_limited(max_requests=30)
6260
def _applier_task(self, object_type: sql.ObjectTypePlural, object_id: str, acl: list[sql.AccessControl]):
6361
"""
6462
Please note that we only have SET option (DBSQL Permissions API doesn't support UPDATE operation).

tests/unit/test_ratelimit.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import pytest
2+
3+
from databricks.labs.ucx.providers.mixins.hardening import rate_limited
4+
5+
6+
def test_ratelimiting(mocker):
7+
_x = 0
8+
9+
def _now():
10+
nonlocal _x
11+
_x += 0.2
12+
return _x
13+
14+
mocker.patch("time.time", side_effect=_now)
15+
sleep = mocker.patch("time.sleep")
16+
17+
@rate_limited(max_requests=5)
18+
def try_something():
19+
return 1
20+
21+
for _ in range(0, 20):
22+
try_something()
23+
24+
sleep.assert_called()
25+
assert 4.2 == pytest.approx(_x)

0 commit comments

Comments
 (0)