diff --git a/sdks/python/apache_beam/examples/rate_limiter_simple.py b/sdks/python/apache_beam/examples/rate_limiter_simple.py index ea469006f2bb..8cdf1166aadc 100644 --- a/sdks/python/apache_beam/examples/rate_limiter_simple.py +++ b/sdks/python/apache_beam/examples/rate_limiter_simple.py @@ -53,7 +53,7 @@ def init_limiter(): self.rate_limiter = self._shared.acquire(init_limiter) def process(self, element): - self.rate_limiter.throttle() + self.rate_limiter.allow() # Process the element mock API call logging.info("Processing element: %s", element) diff --git a/sdks/python/apache_beam/io/components/rate_limiter.py b/sdks/python/apache_beam/io/components/rate_limiter.py index 5c3b36e8ab0a..2dc8a5340fdb 100644 --- a/sdks/python/apache_beam/io/components/rate_limiter.py +++ b/sdks/python/apache_beam/io/components/rate_limiter.py @@ -61,7 +61,7 @@ def __init__(self, namespace: str = ""): self.rpc_latency = Metrics.distribution(namespace, 'RatelimitRpcLatencyMs') @abc.abstractmethod - def throttle(self, **kwargs) -> bool: + def allow(self, **kwargs) -> bool: """Applies rate limiting to the request. This method checks if the request is permitted by the rate limiting policy. @@ -148,7 +148,7 @@ def init_connection(self): channel = grpc.insecure_channel(self.service_address) self._stub = EnvoyRateLimiter.RateLimitServiceStub(channel) - def throttle(self, hits_added: int = 1) -> bool: + def allow(self, hits_added: int = 1) -> bool: """Calls the Envoy RLS to apply rate limits. Sends a rate limit request to the configured Envoy Rate Limit Service. diff --git a/sdks/python/apache_beam/io/components/rate_limiter_test.py b/sdks/python/apache_beam/io/components/rate_limiter_test.py index 7c3e7b82aad7..24d30a1c5c93 100644 --- a/sdks/python/apache_beam/io/components/rate_limiter_test.py +++ b/sdks/python/apache_beam/io/components/rate_limiter_test.py @@ -42,7 +42,7 @@ def setUp(self): namespace='test_namespace') @mock.patch('grpc.insecure_channel') - def test_throttle_allowed(self, mock_channel): + def test_allow_success(self, mock_channel): # Mock successful OK response mock_stub = mock.Mock() mock_response = RateLimitResponse(overall_code=RateLimitResponseCode.OK) @@ -51,13 +51,13 @@ def test_throttle_allowed(self, mock_channel): # Inject mock stub self.limiter._stub = mock_stub - throttled = self.limiter.throttle() + allowed = self.limiter.allow() - self.assertTrue(throttled) + self.assertTrue(allowed) mock_stub.ShouldRateLimit.assert_called_once() @mock.patch('grpc.insecure_channel') - def test_throttle_over_limit_retries_exceeded(self, mock_channel): + def test_allow_over_limit_retries_exceeded(self, mock_channel): # Mock OVER_LIMIT response mock_stub = mock.Mock() mock_response = RateLimitResponse( @@ -69,9 +69,9 @@ def test_throttle_over_limit_retries_exceeded(self, mock_channel): # We mock time.sleep to run fast with mock.patch('time.sleep'): - throttled = self.limiter.throttle() + allowed = self.limiter.allow() - self.assertFalse(throttled) + self.assertFalse(allowed) # Should be called 1 (initial) + 2 (retries) + 1 (last check > retries # logic depends on loop) # Logic: attempt starts at 0. @@ -83,7 +83,7 @@ def test_throttle_over_limit_retries_exceeded(self, mock_channel): self.assertEqual(mock_stub.ShouldRateLimit.call_count, 3) @mock.patch('grpc.insecure_channel') - def test_throttle_rpc_error_retry(self, mock_channel): + def test_allow_rpc_error_retry(self, mock_channel): # Mock RpcError then Success mock_stub = mock.Mock() mock_response = RateLimitResponse(overall_code=RateLimitResponseCode.OK) @@ -95,13 +95,13 @@ def test_throttle_rpc_error_retry(self, mock_channel): self.limiter._stub = mock_stub with mock.patch('time.sleep'): - throttled = self.limiter.throttle() + allowed = self.limiter.allow() - self.assertTrue(throttled) + self.assertTrue(allowed) self.assertEqual(mock_stub.ShouldRateLimit.call_count, 3) @mock.patch('grpc.insecure_channel') - def test_throttle_rpc_error_fail(self, mock_channel): + def test_allow_rpc_error_fail(self, mock_channel): # Mock Persistent RpcError mock_stub = mock.Mock() error = grpc.RpcError() @@ -111,7 +111,7 @@ def test_throttle_rpc_error_fail(self, mock_channel): with mock.patch('time.sleep'): with self.assertRaises(grpc.RpcError): - self.limiter.throttle() + self.limiter.allow() # The inner loop tries 5 times for connection errors self.assertEqual(mock_stub.ShouldRateLimit.call_count, 5) @@ -134,7 +134,7 @@ def test_extract_duration_from_response(self, mock_random, mock_channel): self.limiter.retries = 0 # Single attempt with mock.patch('time.sleep') as mock_sleep: - self.limiter.throttle() + self.limiter.allow() # Should sleep for 5 seconds (jitter is 0.0) mock_sleep.assert_called_with(5.0) diff --git a/sdks/python/apache_beam/ml/inference/base.py b/sdks/python/apache_beam/ml/inference/base.py index ada7cb3237d4..da7b363052d7 100644 --- a/sdks/python/apache_beam/ml/inference/base.py +++ b/sdks/python/apache_beam/ml/inference/base.py @@ -450,7 +450,7 @@ def init_limiter(): self._shared_rate_limiter = self._shared_handle.acquire(init_limiter) - if not self._shared_rate_limiter.throttle(hits_added=len(batch)): + if not self._shared_rate_limiter.allow(hits_added=len(batch)): raise RateLimitExceeded( "Rate Limit Exceeded, " "Could not process this batch.") diff --git a/sdks/python/apache_beam/ml/inference/base_test.py b/sdks/python/apache_beam/ml/inference/base_test.py index e6865a13ef8f..381bf5456604 100644 --- a/sdks/python/apache_beam/ml/inference/base_test.py +++ b/sdks/python/apache_beam/ml/inference/base_test.py @@ -2076,7 +2076,7 @@ class FakeRateLimiter(base.RateLimiter): def __init__(self): super().__init__(namespace='test_namespace') - def throttle(self, hits_added=1): + def allow(self, hits_added=1): self.requests_counter.inc() return True @@ -2114,7 +2114,7 @@ class FakeRateLimiter(base.RateLimiter): def __init__(self): super().__init__(namespace='test_namespace') - def throttle(self, hits_added=1): + def allow(self, hits_added=1): return False class ConcreteRemoteModelHandler(base.RemoteModelHandler):