Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/examples/rate_limiter_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def init_limiter():
self.rate_limiter = self._shared.acquire(init_limiter)

def process(self, element):
self.rate_limiter.throttle()
self.rate_limiter.allow()

# Process the element mock API call
logging.info("Processing element: %s", element)
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/io/components/rate_limiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def __init__(self, namespace: str = ""):
self.rpc_latency = Metrics.distribution(namespace, 'RatelimitRpcLatencyMs')

@abc.abstractmethod
def throttle(self, **kwargs) -> bool:
def allow(self, **kwargs) -> bool:
"""Applies rate limiting to the request.

This method checks if the request is permitted by the rate limiting policy.
Expand Down Expand Up @@ -148,7 +148,7 @@ def init_connection(self):
channel = grpc.insecure_channel(self.service_address)
self._stub = EnvoyRateLimiter.RateLimitServiceStub(channel)

def throttle(self, hits_added: int = 1) -> bool:
def allow(self, hits_added: int = 1) -> bool:
"""Calls the Envoy RLS to apply rate limits.

Sends a rate limit request to the configured Envoy Rate Limit Service.
Expand Down
24 changes: 12 additions & 12 deletions sdks/python/apache_beam/io/components/rate_limiter_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def setUp(self):
namespace='test_namespace')

@mock.patch('grpc.insecure_channel')
def test_throttle_allowed(self, mock_channel):
def test_allow_success(self, mock_channel):
# Mock successful OK response
mock_stub = mock.Mock()
mock_response = RateLimitResponse(overall_code=RateLimitResponseCode.OK)
Expand All @@ -51,13 +51,13 @@ def test_throttle_allowed(self, mock_channel):
# Inject mock stub
self.limiter._stub = mock_stub

throttled = self.limiter.throttle()
allowed = self.limiter.allow()

self.assertTrue(throttled)
self.assertTrue(allowed)
mock_stub.ShouldRateLimit.assert_called_once()

@mock.patch('grpc.insecure_channel')
def test_throttle_over_limit_retries_exceeded(self, mock_channel):
def test_allow_over_limit_retries_exceeded(self, mock_channel):
# Mock OVER_LIMIT response
mock_stub = mock.Mock()
mock_response = RateLimitResponse(
Expand All @@ -69,9 +69,9 @@ def test_throttle_over_limit_retries_exceeded(self, mock_channel):

# We mock time.sleep to run fast
with mock.patch('time.sleep'):
throttled = self.limiter.throttle()
allowed = self.limiter.allow()

self.assertFalse(throttled)
self.assertFalse(allowed)
# Should be called 1 (initial) + 2 (retries) + 1 (last check > retries
# logic depends on loop)
# Logic: attempt starts at 0.
Expand All @@ -83,7 +83,7 @@ def test_throttle_over_limit_retries_exceeded(self, mock_channel):
self.assertEqual(mock_stub.ShouldRateLimit.call_count, 3)

@mock.patch('grpc.insecure_channel')
def test_throttle_rpc_error_retry(self, mock_channel):
def test_allow_rpc_error_retry(self, mock_channel):
# Mock RpcError then Success
mock_stub = mock.Mock()
mock_response = RateLimitResponse(overall_code=RateLimitResponseCode.OK)
Expand All @@ -95,13 +95,13 @@ def test_throttle_rpc_error_retry(self, mock_channel):
self.limiter._stub = mock_stub

with mock.patch('time.sleep'):
throttled = self.limiter.throttle()
allowed = self.limiter.allow()

self.assertTrue(throttled)
self.assertTrue(allowed)
self.assertEqual(mock_stub.ShouldRateLimit.call_count, 3)

@mock.patch('grpc.insecure_channel')
def test_throttle_rpc_error_fail(self, mock_channel):
def test_allow_rpc_error_fail(self, mock_channel):
# Mock Persistent RpcError
mock_stub = mock.Mock()
error = grpc.RpcError()
Expand All @@ -111,7 +111,7 @@ def test_throttle_rpc_error_fail(self, mock_channel):

with mock.patch('time.sleep'):
with self.assertRaises(grpc.RpcError):
self.limiter.throttle()
self.limiter.allow()

# The inner loop tries 5 times for connection errors
self.assertEqual(mock_stub.ShouldRateLimit.call_count, 5)
Expand All @@ -134,7 +134,7 @@ def test_extract_duration_from_response(self, mock_random, mock_channel):
self.limiter.retries = 0 # Single attempt

with mock.patch('time.sleep') as mock_sleep:
self.limiter.throttle()
self.limiter.allow()
# Should sleep for 5 seconds (jitter is 0.0)
mock_sleep.assert_called_with(5.0)

Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/ml/inference/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ def init_limiter():

self._shared_rate_limiter = self._shared_handle.acquire(init_limiter)

if not self._shared_rate_limiter.throttle(hits_added=len(batch)):
if not self._shared_rate_limiter.allow(hits_added=len(batch)):
raise RateLimitExceeded(
"Rate Limit Exceeded, "
"Could not process this batch.")
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/ml/inference/base_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2076,7 +2076,7 @@ class FakeRateLimiter(base.RateLimiter):
def __init__(self):
super().__init__(namespace='test_namespace')

def throttle(self, hits_added=1):
def allow(self, hits_added=1):
self.requests_counter.inc()
return True

Expand Down Expand Up @@ -2114,7 +2114,7 @@ class FakeRateLimiter(base.RateLimiter):
def __init__(self):
super().__init__(namespace='test_namespace')

def throttle(self, hits_added=1):
def allow(self, hits_added=1):
return False

class ConcreteRemoteModelHandler(base.RemoteModelHandler):
Expand Down
Loading