3131import com .google .common .base .Stopwatch ;
3232import com .google .common .util .concurrent .RateLimiter ;
3333import java .util .concurrent .TimeUnit ;
34+ import java .util .concurrent .atomic .AtomicBoolean ;
3435import java .util .concurrent .atomic .AtomicReference ;
35- import java .util .logging .Level ;
3636import java .util .logging .Logger ;
3737import javax .annotation .Nonnull ;
3838import org .threeten .bp .Duration ;
3939import org .threeten .bp .Instant ;
4040
4141class RateLimitingServerStreamingCallable
4242 extends ServerStreamingCallable <MutateRowsRequest , MutateRowsResponse > {
43+
4344 private static final Logger logger =
4445 Logger .getLogger (RateLimitingServerStreamingCallable .class .getName ());
4546
@@ -64,16 +65,14 @@ class RateLimitingServerStreamingCallable
6465 // as the server side cap
6566 private static final double MAX_FACTOR = 1.3 ;
6667
67- private final RateLimiter limiter ;
68+ private final ConditionalRateLimiter limiter ;
6869
69- private final AtomicReference <Instant > lastQpsChangeTime = new AtomicReference <>(Instant .now ());
7070 private final ServerStreamingCallable <MutateRowsRequest , MutateRowsResponse > innerCallable ;
7171
7272 RateLimitingServerStreamingCallable (
7373 @ Nonnull ServerStreamingCallable <MutateRowsRequest , MutateRowsResponse > innerCallable ) {
74- this .limiter = RateLimiter . create (DEFAULT_QPS );
74+ this .limiter = new ConditionalRateLimiter (DEFAULT_QPS );
7575 this .innerCallable = Preconditions .checkNotNull (innerCallable , "Inner callable must be set" );
76- logger .info ("Rate limiting is enabled with initial QPS of " + limiter .getRate ());
7776 }
7877
7978 @ Override
@@ -88,44 +87,158 @@ public void call(
8887 ((BigtableTracer ) context .getTracer ())
8988 .batchRequestThrottled (stopwatch .elapsed (TimeUnit .NANOSECONDS ));
9089 }
91- RateLimitingResponseObserver innerObserver =
92- new RateLimitingResponseObserver (limiter , lastQpsChangeTime , responseObserver );
90+ RateLimitingResponseObserver innerObserver = new RateLimitingResponseObserver (responseObserver );
9391 innerCallable .call (request , innerObserver , context );
9492 }
9593
94+ /** A rate limiter wrapper class that can be disabled. */
95+ static class ConditionalRateLimiter {
96+
97+ private final AtomicBoolean enabled = new AtomicBoolean (false );
98+
99+ private final RateLimiter limiter ;
100+
101+ // This is the next time allowed to change QPS or disable rate limiting.
102+ private final AtomicReference <Instant > nextRateUpdateTime =
103+ new AtomicReference <>(Instant .now ());
104+
105+ public ConditionalRateLimiter (long defaultQps ) {
106+ limiter = RateLimiter .create (defaultQps );
107+ logger .info ("Rate limiting is initiated (but disabled) with rate of " + defaultQps + " QPS." );
108+ }
109+
110+ /**
111+ * Works the same way with {@link RateLimiter#acquire()} except that when the rate limiter is
112+ * disabled, {@link ConditionalRateLimiter#acquire()} always returns immediately.
113+ */
114+ public void acquire () {
115+ if (enabled .get ()) {
116+ limiter .acquire ();
117+ }
118+ }
119+
120+ /**
121+ * Disables the rate limier if the current time exceeded the next rate update time. When
122+ * disabled, the rate is retained and will be re-used if re-enabled later.
123+ */
124+ public void tryDisable () {
125+ // Only disable after the rate update time.
126+ Instant nextTime = nextRateUpdateTime .get ();
127+ Instant now = Instant .now ();
128+ if (now .isAfter (nextTime )) {
129+ boolean wasEnabled = this .enabled .getAndSet (false );
130+ if (wasEnabled ) {
131+ logger .info ("Rate limiter is disabled." );
132+ }
133+ // No need to update nextRateUpdateTime, any new RateLimitInfo can enable rate limiting and
134+ // update the rate again.
135+ }
136+ }
137+
138+ /** Enables the rate limiter immediately. */
139+ public void enable () {
140+ boolean wasEnabled = this .enabled .getAndSet (true );
141+ if (!wasEnabled ) {
142+ logger .info ("Rate limiter is enabled." );
143+ }
144+ }
145+
146+ public boolean isEnabled () {
147+ return this .enabled .get ();
148+ }
149+
150+ public double getRate () {
151+ return limiter .getRate ();
152+ }
153+
154+ /**
155+ * Sets the rate and the next rate update time based on period, if the current time exceeds the
156+ * next rate update time. Otherwise, no-op.
157+ *
158+ * @param rate The new rate of the rate limiter.
159+ * @param period The period during which rate should not be updated again and the rate limiter
160+ * should not be disabled.
161+ */
162+ public void trySetRate (double rate , Duration period ) {
163+ Instant nextTime = nextRateUpdateTime .get ();
164+ Instant now = Instant .now ();
165+
166+ if (now .isBefore (nextTime )) {
167+ return ;
168+ }
169+
170+ Instant newNextTime = now .plusSeconds (period .getSeconds ());
171+
172+ if (!nextRateUpdateTime .compareAndSet (nextTime , newNextTime )) {
173+ // Someone else updated it already.
174+ return ;
175+ }
176+ final double oldRate = limiter .getRate ();
177+ limiter .setRate (rate );
178+ logger .info (
179+ "Updated max rate from "
180+ + oldRate
181+ + " to "
182+ + rate
183+ + " with period "
184+ + period .getSeconds ()
185+ + " seconds." );
186+ }
187+
188+ @ VisibleForTesting
189+ void setEnabled (boolean enabled ) {
190+ this .enabled .set (enabled );
191+ }
192+
193+ @ VisibleForTesting
194+ void setRate (double rate ) {
195+ limiter .setRate (rate );
196+ }
197+ }
198+
96199 class RateLimitingResponseObserver extends SafeResponseObserver <MutateRowsResponse > {
97- private final ResponseObserver <MutateRowsResponse > outerObserver ;
98- private final RateLimiter rateLimiter ;
99200
100- private final AtomicReference < Instant > lastQpsChangeTime ;
201+ private final ResponseObserver < MutateRowsResponse > outerObserver ;
101202
102- RateLimitingResponseObserver (
103- RateLimiter rateLimiter ,
104- AtomicReference <Instant > lastQpsChangeTime ,
105- ResponseObserver <MutateRowsResponse > observer ) {
203+ RateLimitingResponseObserver (ResponseObserver <MutateRowsResponse > observer ) {
106204 super (observer );
107205 this .outerObserver = observer ;
108- this .rateLimiter = rateLimiter ;
109- this .lastQpsChangeTime = lastQpsChangeTime ;
110206 }
111207
112208 @ Override
113209 protected void onStartImpl (StreamController controller ) {
114210 outerObserver .onStart (controller );
115211 }
116212
213+ private boolean hasValidRateLimitInfo (MutateRowsResponse response ) {
214+ // RateLimitInfo is an optional field. However, proto3 sub-message field always
215+ // have presence even thought it's marked as "optional". Check the factor and
216+ // period to make sure they're not 0.
217+ if (!response .hasRateLimitInfo ()) {
218+ logger .finest ("Response carries no RateLimitInfo" );
219+ return false ;
220+ }
221+
222+ if (response .getRateLimitInfo ().getFactor () <= 0
223+ || response .getRateLimitInfo ().getPeriod ().getSeconds () <= 0 ) {
224+ logger .finest ("Response carries invalid RateLimitInfo=" + response .getRateLimitInfo ());
225+ return false ;
226+ }
227+
228+ logger .finest ("Response carries valid RateLimitInfo=" + response .getRateLimitInfo ());
229+ return true ;
230+ }
231+
117232 @ Override
118233 protected void onResponseImpl (MutateRowsResponse response ) {
119- if (response .hasRateLimitInfo ()) {
234+ if (hasValidRateLimitInfo (response )) {
235+ limiter .enable ();
120236 RateLimitInfo info = response .getRateLimitInfo ();
121- // RateLimitInfo is an optional field. However, proto3 sub-message field always
122- // have presence even thought it's marked as "optional". Check the factor and
123- // period to make sure they're not 0.
124- if (info .getFactor () != 0 && info .getPeriod ().getSeconds () != 0 ) {
125- updateQps (
126- info .getFactor (),
127- Duration .ofSeconds (com .google .protobuf .util .Durations .toSeconds (info .getPeriod ())));
128- }
237+ updateQps (
238+ info .getFactor (),
239+ Duration .ofSeconds (com .google .protobuf .util .Durations .toSeconds (info .getPeriod ())));
240+ } else {
241+ limiter .tryDisable ();
129242 }
130243 outerObserver .onResponse (response );
131244 }
@@ -148,28 +261,35 @@ protected void onCompleteImpl() {
148261 }
149262
150263 private void updateQps (double factor , Duration period ) {
151- Instant lastTime = lastQpsChangeTime .get ();
152- Instant now = Instant .now ();
153-
154- if (now .minus (period ).isAfter (lastTime ) && lastQpsChangeTime .compareAndSet (lastTime , now )) {
155- double cappedFactor = Math .min (Math .max (factor , MIN_FACTOR ), MAX_FACTOR );
156- double currentRate = limiter .getRate ();
157- limiter .setRate (Math .min (Math .max (currentRate * cappedFactor , MIN_QPS ), MAX_QPS ));
158- logger .log (
159- Level .FINE ,
160- "Updated QPS from {0} to {1}, server returned factor is {2}, capped factor is {3}" ,
161- new Object [] {currentRate , limiter .getRate (), factor , cappedFactor });
162- }
264+ double cappedFactor = Math .min (Math .max (factor , MIN_FACTOR ), MAX_FACTOR );
265+ double currentRate = limiter .getRate ();
266+ double cappedRate = Math .min (Math .max (currentRate * cappedFactor , MIN_QPS ), MAX_QPS );
267+ limiter .trySetRate (cappedRate , period );
163268 }
164269 }
165270
166271 @ VisibleForTesting
167- AtomicReference <Instant > getLastQpsChangeTime () {
168- return lastQpsChangeTime ;
272+ AtomicReference <Instant > getNextRateUpdateTime () {
273+ return limiter . nextRateUpdateTime ;
169274 }
170275
171276 @ VisibleForTesting
172277 double getCurrentRate () {
173278 return limiter .getRate ();
174279 }
280+
281+ @ VisibleForTesting
282+ void setRate (double rate ) {
283+ limiter .setRate (rate );
284+ }
285+
286+ @ VisibleForTesting
287+ boolean getLimiterEnabled () {
288+ return limiter .isEnabled ();
289+ }
290+
291+ @ VisibleForTesting
292+ void setLimiterEnabled (boolean enabled ) {
293+ limiter .setEnabled (enabled );
294+ }
175295}
0 commit comments