Skip to content

Commit 92fd0cd

Browse files
committed
fix format issues
1 parent 26f23ca commit 92fd0cd

File tree

2 files changed

+38
-31
lines changed

2 files changed

+38
-31
lines changed

sdks/python/apache_beam/examples/rate_limiter_simple.py

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929

3030
class SampleApiDoFn(beam.DoFn):
3131
"""A DoFn that simulates calling an external API with rate limiting."""
32-
3332
def __init__(self, rls_address, domain, descriptors):
3433
self.rls_address = rls_address
3534
self.domain = domain
@@ -42,23 +41,24 @@ def setup(self):
4241
# We use shared.Shared() to ensure only one RateLimiter instance is created
4342
# per worker and shared across threads.
4443
def init_limiter():
45-
logging.info(f"Connecting to Envoy RLS at {self.rls_address}")
46-
return EnvoyRateLimiter(
47-
service_address=self.rls_address,
48-
domain=self.domain,
49-
descriptors=self.descriptors,
50-
namespace='example_pipeline'
51-
)
44+
logging.info(f"Connecting to Envoy RLS at {self.rls_address}")
45+
return EnvoyRateLimiter(
46+
service_address=self.rls_address,
47+
domain=self.domain,
48+
descriptors=self.descriptors,
49+
namespace='example_pipeline')
50+
5251
self.rate_limiter = self._shared.acquire(init_limiter)
5352

5453
def process(self, element):
5554
self.rate_limiter.throttle()
5655

5756
# Process the element mock API call
5857
logging.info(f"Processing element: {element}")
59-
time.sleep(0.1)
58+
time.sleep(0.1)
6059
yield element
6160

61+
6262
def parse_known_args(argv):
6363
"""Parses args for the workflow."""
6464
parser = argparse.ArgumentParser()
@@ -77,11 +77,13 @@ def run(argv=None):
7777
(
7878
p
7979
| 'Create' >> beam.Create(range(100))
80-
| 'RateLimit' >> beam.ParDo(SampleApiDoFn(
81-
rls_address=known_args.rls_address,
82-
domain="mongo_cps",
83-
descriptors=[{"database": "users"}]))
84-
)
80+
| 'RateLimit' >> beam.ParDo(
81+
SampleApiDoFn(
82+
rls_address=known_args.rls_address,
83+
domain="mongo_cps",
84+
descriptors=[{
85+
"database": "users"
86+
}])))
8587

8688

8789
if __name__ == '__main__':

sdks/python/apache_beam/io/components/rate_limiter.py

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,19 @@
2020
"""
2121

2222
import abc
23-
from envoy_data_plane.envoy.service.ratelimit.v3 import RateLimitRequest
24-
from envoy_data_plane.envoy.service.ratelimit.v3 import RateLimitResponse
25-
from envoy_data_plane.envoy.service.ratelimit.v3 import RateLimitResponseCode
26-
from envoy_data_plane.envoy.extensions.common.ratelimit.v3 import RateLimitDescriptor
27-
from envoy_data_plane.envoy.extensions.common.ratelimit.v3 import RateLimitDescriptorEntry
2823
import logging
29-
import time
3024
import threading
31-
import random
25+
import time
3226
from typing import Dict
3327
from typing import List
28+
3429
import grpc
35-
import time
30+
from envoy_data_plane.envoy.extensions.common.ratelimit.v3 import RateLimitDescriptor
31+
from envoy_data_plane.envoy.extensions.common.ratelimit.v3 import RateLimitDescriptorEntry
32+
from envoy_data_plane.envoy.service.ratelimit.v3 import RateLimitRequest
33+
from envoy_data_plane.envoy.service.ratelimit.v3 import RateLimitResponse
34+
from envoy_data_plane.envoy.service.ratelimit.v3 import RateLimitResponseCode
35+
3636
from apache_beam.io.components import adaptive_throttler
3737
from apache_beam.metrics import Metrics
3838

@@ -41,19 +41,24 @@
4141
_MAX_CONNECTION_RETRIES = 5
4242
_RETRY_DELAY_SECONDS = 10
4343

44+
4445
class RateLimiter(abc.ABC):
4546
"""Abstract base class for RateLimiters."""
4647
def __init__(self, namespace: str = ""):
4748
# Metrics collected from the RateLimiter
4849
# Metric updates are thread safe
4950
self.throttling_signaler = adaptive_throttler.ThrottlingSignaler(
5051
namespace=namespace)
51-
self.requests_counter = Metrics.counter(namespace, 'envoyRatelimitRequestsTotal')
52-
self.requests_allowed = Metrics.counter(namespace, 'envoyRatelimitRequestsAllowed')
53-
self.requests_throttled = Metrics.counter(namespace, 'envoyRatelimitRequestsThrottled')
52+
self.requests_counter = Metrics.counter(
53+
namespace, 'envoyRatelimitRequestsTotal')
54+
self.requests_allowed = Metrics.counter(
55+
namespace, 'envoyRatelimitRequestsAllowed')
56+
self.requests_throttled = Metrics.counter(
57+
namespace, 'envoyRatelimitRequestsThrottled')
5458
self.rpc_errors = Metrics.counter(namespace, 'envoyRatelimitRpcErrors')
5559
self.rpc_retries = Metrics.counter(namespace, 'envoyRatelimitRpcRetries')
56-
self.rpc_latency = Metrics.distribution(namespace, 'envoyRatelimitRpcLatencyMs')
60+
self.rpc_latency = Metrics.distribution(
61+
namespace, 'envoyRatelimitRpcLatencyMs')
5762

5863
@abc.abstractmethod
5964
def throttle(self, **kwargs) -> bool:
@@ -69,7 +74,6 @@ def throttle(self, **kwargs) -> bool:
6974
Exception: If an underlying infrastructure error occurs (e.g. RPC failure).
7075
"""
7176
pass
72-
7377

7478

7579
class EnvoyRateLimiter(RateLimiter):
@@ -99,7 +103,7 @@ def __init__(
99103
throttling is occurring.
100104
"""
101105
super().__init__(namespace=namespace)
102-
106+
103107
self.service_address = service_address
104108
self.domain = domain
105109
self.descriptors = descriptors
@@ -154,9 +158,10 @@ def throttle(self, hits_added: int = 1) -> bool:
154158
entries.append(RateLimitDescriptorEntry(key=k, value=v))
155159
proto_descriptors.append(RateLimitDescriptor(entries=entries))
156160

157-
158161
request = RateLimitRequest(
159-
domain=self.domain, descriptors=proto_descriptors, hits_addend=hits_added)
162+
domain=self.domain,
163+
descriptors=proto_descriptors,
164+
hits_addend=hits_added)
160165

161166
self.requests_counter.inc()
162167
attempt = 0
@@ -199,7 +204,7 @@ def throttle(self, hits_added: int = 1) -> bool:
199204
val = dur.total_seconds()
200205
if val > sleep_s:
201206
sleep_s = val
202-
207+
203208
_LOGGER.warning("Throttled for %s seconds", sleep_s)
204209
# signal throttled time to backend
205210
self.throttling_signaler.signal_throttled(int(sleep_s))

0 commit comments

Comments
 (0)