Skip to content

Commit ff0c0ff

Browse files
authored
switch to xds-proto
switch to xds-proto
2 parents 6868614 + 4a498ce commit ff0c0ff

File tree

3 files changed

+27
-45
lines changed

3 files changed

+27
-45
lines changed

sdks/python/apache_beam/io/components/rate_limiter.py

Lines changed: 12 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,9 @@
2828
from typing import List
2929

3030
import grpc
31-
from envoy_data_plane.envoy.extensions.common.ratelimit.v3 import RateLimitDescriptor
32-
from envoy_data_plane.envoy.extensions.common.ratelimit.v3 import RateLimitDescriptorEntry
33-
from envoy_data_plane.envoy.service.ratelimit.v3 import RateLimitRequest
34-
from envoy_data_plane.envoy.service.ratelimit.v3 import RateLimitResponse
35-
from envoy_data_plane.envoy.service.ratelimit.v3 import RateLimitResponseCode
31+
from envoy.extensions.common.ratelimit.v3 import ratelimit_pb2
32+
from envoy.service.ratelimit.v3 import rls_pb2
33+
from envoy.service.ratelimit.v3 import rls_pb2_grpc
3634

3735
from apache_beam.io.components import adaptive_throttler
3836
from apache_beam.metrics import Metrics
@@ -114,30 +112,14 @@ def __init__(
114112
self._stub = None
115113
self._lock = threading.Lock()
116114

117-
class RateLimitServiceStub(object):
118-
"""
119-
Wrapper for gRPC stub to be compatible with envoy_data_plane messages.
120-
121-
The envoy-data-plane package uses 'betterproto' which generates async stubs
122-
for 'grpclib'. As Beam uses standard synchronous 'grpcio', RateLimitServiceStub
123-
is a bridge class to use the betterproto Message types (RateLimitRequest)
124-
with a standard grpcio Channel.
125-
"""
126-
def __init__(self, channel):
127-
self.ShouldRateLimit = channel.unary_unary(
128-
'/envoy.service.ratelimit.v3.RateLimitService/ShouldRateLimit',
129-
request_serializer=RateLimitRequest.SerializeToString,
130-
response_deserializer=RateLimitResponse.FromString,
131-
)
132-
133115
def init_connection(self):
134116
if self._stub is None:
135117
# Acquire lock to safegaurd againest multiple DoFn threads sharing the same
136118
# RateLimiter instance, which is the case when using Shared().
137119
with self._lock:
138120
if self._stub is None:
139121
channel = grpc.insecure_channel(self.service_address)
140-
self._stub = EnvoyRateLimiter.RateLimitServiceStub(channel)
122+
self._stub = rls_pb2_grpc.RateLimitServiceStub(channel)
141123

142124
def throttle(self, hits_added: int = 1) -> bool:
143125
"""Calls the Envoy RLS to check for rate limits.
@@ -156,10 +138,10 @@ def throttle(self, hits_added: int = 1) -> bool:
156138
for d in self.descriptors:
157139
entries = []
158140
for k, v in d.items():
159-
entries.append(RateLimitDescriptorEntry(key=k, value=v))
160-
proto_descriptors.append(RateLimitDescriptor(entries=entries))
141+
entries.append(ratelimit_pb2.RateLimitDescriptor.Entry(key=k, value=v))
142+
proto_descriptors.append(ratelimit_pb2.RateLimitDescriptor(entries=entries))
161143

162-
request = RateLimitRequest(
144+
request = rls_pb2.RateLimitRequest(
163145
domain=self.domain,
164146
descriptors=proto_descriptors,
165147
hits_addend=hits_added)
@@ -188,22 +170,21 @@ def throttle(self, hits_added: int = 1) -> bool:
188170
"[EnvoyRateLimiter] Connection Failed, retrying: %s", e)
189171
time.sleep(_RETRY_DELAY_SECONDS)
190172

191-
if response.overall_code == RateLimitResponseCode.OK:
173+
if response.overall_code == rls_pb2.RateLimitResponse.OK:
192174
self.requests_allowed.inc()
193175
throttled = True
194176
break
195-
elif response.overall_code == RateLimitResponseCode.OVER_LIMIT:
177+
elif response.overall_code == rls_pb2.RateLimitResponse.OVER_LIMIT:
196178
self.requests_throttled.inc()
197179
# Ratelimit exceeded, sleep for duration until reset and retry
198180
# multiple rules can be set in the RLS config, so we need to find the max duration
199181
sleep_s = 0.0
200182
if response.statuses:
201183
for status in response.statuses:
202-
if status.code == RateLimitResponseCode.OVER_LIMIT:
184+
if status.code == rls_pb2.RateLimitResponse.OVER_LIMIT:
203185
dur = status.duration_until_reset
204-
# duration_until_reset is converted to timedelta by betterproto
205-
# duration_until_reset has microsecond precision
206-
val = dur.total_seconds()
186+
# duration_until_reset is google.protobuf.Duration
187+
val = dur.seconds + dur.nanos / 1e9
207188
if val > sleep_s:
208189
sleep_s = val
209190

sdks/python/apache_beam/io/components/rate_limiter_test.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,8 @@
2121
from datetime import timedelta
2222

2323
from apache_beam.io.components import rate_limiter
24-
from envoy_data_plane.envoy.service.ratelimit.v3 import RateLimitResponse
25-
from envoy_data_plane.envoy.service.ratelimit.v3 import RateLimitResponseCode
26-
from envoy_data_plane.envoy.service.ratelimit.v3 import RateLimitResponseDescriptorStatus
24+
from envoy.service.ratelimit.v3 import rls_pb2
25+
from google.protobuf.duration_pb2 import Duration
2726

2827

2928
class EnvoyRateLimiterTest(unittest.TestCase):
@@ -44,7 +43,8 @@ def setUp(self):
4443
def test_throttle_allowed(self, mock_channel):
4544
# Mock successful OK response
4645
mock_stub = mock.Mock()
47-
mock_response = RateLimitResponse(overall_code=RateLimitResponseCode.OK)
46+
mock_response = rls_pb2.RateLimitResponse(
47+
overall_code=rls_pb2.RateLimitResponse.OK)
4848
mock_stub.ShouldRateLimit.return_value = mock_response
4949

5050
# Inject mock stub
@@ -59,8 +59,8 @@ def test_throttle_allowed(self, mock_channel):
5959
def test_throttle_over_limit_retries_exceeded(self, mock_channel):
6060
# Mock OVER_LIMIT response
6161
mock_stub = mock.Mock()
62-
mock_response = RateLimitResponse(
63-
overall_code=RateLimitResponseCode.OVER_LIMIT)
62+
mock_response = rls_pb2.RateLimitResponse(
63+
overall_code=rls_pb2.RateLimitResponse.OVER_LIMIT)
6464
mock_stub.ShouldRateLimit.return_value = mock_response
6565

6666
self.limiter._stub = mock_stub
@@ -84,7 +84,8 @@ def test_throttle_over_limit_retries_exceeded(self, mock_channel):
8484
def test_throttle_rpc_error_retry(self, mock_channel):
8585
# Mock RpcError then Success
8686
mock_stub = mock.Mock()
87-
mock_response = RateLimitResponse(overall_code=RateLimitResponseCode.OK)
87+
mock_response = rls_pb2.RateLimitResponse(
88+
overall_code=rls_pb2.RateLimitResponse.OK)
8889

8990
# Side effect: Error, Error, Success
9091
error = grpc.RpcError()
@@ -121,11 +122,12 @@ def test_extract_duration_from_response(self, mock_random, mock_channel):
121122
mock_stub = mock.Mock()
122123

123124
# Valid until 5 seconds
124-
status = RateLimitResponseDescriptorStatus(
125-
code=RateLimitResponseCode.OVER_LIMIT,
126-
duration_until_reset=timedelta(seconds=5))
127-
mock_response = RateLimitResponse(
128-
overall_code=RateLimitResponseCode.OVER_LIMIT, statuses=[status])
125+
# Valid until 5 seconds
126+
status = rls_pb2.RateLimitResponse.DescriptorStatus(
127+
code=rls_pb2.RateLimitResponse.OVER_LIMIT,
128+
duration_until_reset=Duration(seconds=5))
129+
mock_response = rls_pb2.RateLimitResponse(
130+
overall_code=rls_pb2.RateLimitResponse.OVER_LIMIT, statuses=[status])
129131

130132
mock_stub.ShouldRateLimit.return_value = mock_response
131133
self.limiter._stub = mock_stub

sdks/python/setup.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -376,8 +376,7 @@ def get_portability_package_data():
376376
'cryptography>=39.0.0,<48.0.0',
377377
'fastavro>=0.23.6,<2',
378378
'fasteners>=0.3,<1.0',
379-
'envoy-data-plane>=1.0.3,<2; python_version >= "3.11"',
380-
'envoy-data-plane==0.8.1; python_version < "3.11"',
379+
'xds-protos<=1.75.0',
381380
# TODO(https://github.com/grpc/grpc/issues/37710): Unpin grpc
382381
'grpcio>=1.33.1,<2,!=1.48.0,!=1.59.*,!=1.60.*,!=1.61.*,!=1.62.0,!=1.62.1,<1.66.0; python_version <= "3.12"', # pylint: disable=line-too-long
383382
'grpcio>=1.67.0; python_version >= "3.13"',

0 commit comments

Comments
 (0)