Skip to content

Commit c71edca

Browse files
committed
resolve comments
1 parent a548fef commit c71edca

File tree

1 file changed

+7
-6
lines changed

1 file changed

+7
-6
lines changed

sdks/python/apache_beam/io/components/rate_limiter.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import abc
2323
import logging
24+
import math
2425
import random
2526
import threading
2627
import time
@@ -39,8 +40,8 @@
3940

4041
_LOGGER = logging.getLogger(__name__)
4142

42-
_MAX_RETRIES = 5
43-
_RETRY_DELAY_SECONDS = 10
43+
_RPC_MAX_RETRIES = 5
44+
_RPC_RETRY_DELAY_SECONDS = 10
4445

4546

4647
class RateLimiter(abc.ABC):
@@ -171,14 +172,14 @@ def throttle(self, hits_added: int = 1) -> bool:
171172
break
172173

173174
# retry loop
174-
for retry_attempt in range(_MAX_RETRIES):
175+
for retry_attempt in range(_RPC_MAX_RETRIES):
175176
try:
176177
start_time = time.time()
177178
response = self._stub.ShouldRateLimit(request, timeout=self.timeout)
178179
self.rpc_latency.update(int((time.time() - start_time) * 1000))
179180
break
180181
except grpc.RpcError as e:
181-
if retry_attempt == _MAX_RETRIES - 1:
182+
if retry_attempt == _RPC_MAX_RETRIES - 1:
182183
_LOGGER.error(
183184
"[EnvoyRateLimiter] ratelimit service call failed: %s", e)
184185
self.rpc_errors.inc()
@@ -187,7 +188,7 @@ def throttle(self, hits_added: int = 1) -> bool:
187188
_LOGGER.warning(
188189
"[EnvoyRateLimiter] ratelimit service call failed, retrying: %s",
189190
e)
190-
time.sleep(_RETRY_DELAY_SECONDS)
191+
time.sleep(_RPC_RETRY_DELAY_SECONDS)
191192

192193
if response.overall_code == RateLimitResponseCode.OK:
193194
self.requests_allowed.inc()
@@ -214,7 +215,7 @@ def throttle(self, hits_added: int = 1) -> bool:
214215

215216
_LOGGER.warning("[EnvoyRateLimiter] Throttled for %s seconds", sleep_s)
216217
# signal throttled time to backend
217-
self.throttling_signaler.signal_throttled(int(sleep_s))
218+
self.throttling_signaler.signal_throttled(math.ceil(sleep_s))
218219
time.sleep(sleep_s)
219220
attempt += 1
220221
else:

0 commit comments

Comments
 (0)