Skip to content

Commit a680c98

Browse files
authored
rls: adaptive throttler (#6749)
1 parent 6e0748d commit a680c98

File tree

4 files changed

+521
-1
lines changed

4 files changed

+521
-1
lines changed

rls/build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ dependencies {
1313
project(':grpc-protobuf'),
1414
project(':grpc-stub')
1515
compileOnly libraries.javax_annotation
16-
testCompile libraries.truth
16+
testCompile libraries.truth,
17+
project(':grpc-core').sourceSets.test.output // for FakeClock
1718
}
1819

1920
configureProtoCompilation()
Lines changed: 344 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,344 @@
1+
/*
2+
* Copyright 2020 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc.rls.internal;
18+
19+
import static com.google.common.base.Preconditions.checkArgument;
20+
import static com.google.common.base.Preconditions.checkNotNull;
21+
22+
import com.google.common.annotations.VisibleForTesting;
23+
import com.google.common.base.MoreObjects;
24+
import io.grpc.internal.TimeProvider;
25+
import java.util.concurrent.ThreadLocalRandom;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
28+
import java.util.concurrent.atomic.AtomicReferenceArray;
29+
30+
/**
31+
* Implementation of {@link Throttler} that keeps track of recent history (the duration of which is
32+
* specified to the constructor) and throttles requests at the client side based on the number of
33+
* requests that the
34+
* backend has accepted and the total number of requests generated. A given request will be
35+
* throttled with a probability
36+
* <pre>
37+
* throttleProbability = (requests - ratio_for_accepts * accepts) / (requests + requests_padding)
38+
* </pre>
39+
* where requests is the total number of requests, accepts is the total number of requests that the
40+
* backend has accepted and ratio_for_accepts is just a constant multiplier passed to the
41+
* constructor (see the description of ratio_for_accepts for more information).
42+
*/
43+
public final class AdaptiveThrottler implements Throttler {
44+
45+
private static final int DEFAULT_HISTORY_SECONDS = 30;
46+
private static final int DEFAULT_REQUEST_PADDING = 8;
47+
private static final float DEFAULT_RATIO_FOR_ACCEPT = 1.2f;
48+
49+
/**
50+
* The duration of history of calls used by Adaptive Throttler.
51+
*/
52+
private final int historySeconds;
53+
/**
54+
* A magic number to tune the aggressiveness of the throttling. High numbers throttle less. The
55+
* default is 8.
56+
*/
57+
private final int requestsPadding;
58+
/**
59+
* The ratio by which the Adaptive Throttler will attempt to send requests above what the server
60+
* is currently accepting.
61+
*/
62+
private final float ratioForAccepts;
63+
private final TimeProvider timeProvider;
64+
/**
65+
* The number of requests attempted by the client during the Adaptive Throttler instance's
66+
* history of calls. This includes requests throttled at the client. The history period defaults
67+
* to 30 seconds.
68+
*/
69+
@VisibleForTesting
70+
final TimeBasedAccumulator requestStat;
71+
/**
72+
* Counter for the total number of requests that were throttled by either the client (this class)
73+
* or the backend in recent history.
74+
*/
75+
@VisibleForTesting
76+
final TimeBasedAccumulator throttledStat;
77+
78+
private AdaptiveThrottler(Builder builder) {
79+
this.historySeconds = builder.historySeconds;
80+
this.requestsPadding = builder.requestsPadding;
81+
this.ratioForAccepts = builder.ratioForAccepts;
82+
this.timeProvider = builder.timeProvider;
83+
long internalNanos = TimeUnit.SECONDS.toNanos(historySeconds);
84+
this.requestStat = new TimeBasedAccumulator(internalNanos, timeProvider);
85+
this.throttledStat = new TimeBasedAccumulator(internalNanos, timeProvider);
86+
}
87+
88+
@Override
89+
public boolean shouldThrottle() {
90+
return shouldThrottle(randomFloat());
91+
}
92+
93+
@VisibleForTesting
94+
boolean shouldThrottle(float random) {
95+
long nowNanos = timeProvider.currentTimeNanos();
96+
if (getThrottleProbability(nowNanos) <= random) {
97+
return false;
98+
}
99+
requestStat.increment(nowNanos);
100+
throttledStat.increment(nowNanos);
101+
return true;
102+
}
103+
104+
/**
105+
* Calculates throttleProbability.
106+
* <pre>
107+
* throttleProbability = (requests - ratio_for_accepts * accepts) / (requests + requests_padding)
108+
* </pre>
109+
*/
110+
@VisibleForTesting
111+
float getThrottleProbability(long nowNanos) {
112+
long requests = this.requestStat.get(nowNanos);
113+
long accepts = requests - throttledStat.get(nowNanos);
114+
// It's possible that this probability will be negative, which means that no throttling should
115+
// take place.
116+
return (requests - ratioForAccepts * accepts) / (requests + requestsPadding);
117+
}
118+
119+
@Override
120+
public void registerBackendResponse(boolean throttled) {
121+
long now = timeProvider.currentTimeNanos();
122+
requestStat.increment(now);
123+
if (throttled) {
124+
throttledStat.increment(now);
125+
}
126+
}
127+
128+
private static float randomFloat() {
129+
return ThreadLocalRandom.current().nextFloat();
130+
}
131+
132+
@Override
133+
public String toString() {
134+
return MoreObjects.toStringHelper(this)
135+
.add("historySeconds", historySeconds)
136+
.add("requestsPadding", requestsPadding)
137+
.add("ratioForAccepts", ratioForAccepts)
138+
.add("requestStat", requestStat)
139+
.add("throttledStat", throttledStat)
140+
.toString();
141+
}
142+
143+
public static Builder builder() {
144+
return new Builder();
145+
}
146+
147+
/** Builder for {@link AdaptiveThrottler}. */
148+
public static final class Builder {
149+
150+
private float ratioForAccepts = DEFAULT_RATIO_FOR_ACCEPT;
151+
private int historySeconds = DEFAULT_HISTORY_SECONDS;
152+
private int requestsPadding = DEFAULT_REQUEST_PADDING;
153+
private TimeProvider timeProvider = TimeProvider.SYSTEM_TIME_PROVIDER;
154+
155+
public Builder setRatioForAccepts(float ratioForAccepts) {
156+
this.ratioForAccepts = ratioForAccepts;
157+
return this;
158+
}
159+
160+
public Builder setHistorySeconds(int historySeconds) {
161+
this.historySeconds = historySeconds;
162+
return this;
163+
}
164+
165+
public Builder setRequestsPadding(int requestsPadding) {
166+
this.requestsPadding = requestsPadding;
167+
return this;
168+
}
169+
170+
public Builder setTimeProvider(TimeProvider timeProvider) {
171+
this.timeProvider = checkNotNull(timeProvider, "timeProvider");
172+
return this;
173+
}
174+
175+
public AdaptiveThrottler build() {
176+
return new AdaptiveThrottler(this);
177+
}
178+
}
179+
180+
static final class TimeBasedAccumulator {
181+
/**
182+
* The number of slots. This value determines the accuracy of the get() method to interval /
183+
* NUM_SLOTS.
184+
*/
185+
private static final int NUM_SLOTS = 50;
186+
187+
/** Holds the data for each slot (amount and end timestamp). */
188+
private static final class Slot {
189+
static final AtomicLongFieldUpdater<Slot> ATOMIC_COUNT =
190+
AtomicLongFieldUpdater.newUpdater(Slot.class, "count");
191+
192+
// The count of statistics for the time range represented by this slot.
193+
volatile long count;
194+
// The nearest 0 modulo slot boundary in nanoseconds. The slot boundary
195+
// is exclusive. [previous_slot.end, end)
196+
final long endNanos;
197+
198+
Slot(long endNanos) {
199+
this.endNanos = endNanos;
200+
this.count = 0;
201+
}
202+
203+
void increment() {
204+
ATOMIC_COUNT.incrementAndGet(this);
205+
}
206+
}
207+
208+
// Represents a slot which is not initialized and is unusable.
209+
private static final Slot NULL_SLOT = new Slot(-1);
210+
211+
/** The array of slots. */
212+
private final AtomicReferenceArray<Slot> slots = new AtomicReferenceArray<>(NUM_SLOTS);
213+
214+
/** The time interval this statistic is concerned with. */
215+
private final long interval;
216+
217+
/** The number of nanoseconds in each slot. */
218+
private final long slotNanos;
219+
220+
/**
221+
* The current index into the slot array. {@code currentIndex} may be safely read without
222+
* synchronization, but all writes must be performed inside of a {@code synchronized(this){}}
223+
* block.
224+
*/
225+
private volatile int currentIndex;
226+
227+
private final TimeProvider timeProvider;
228+
229+
/**
230+
* Interval constructor.
231+
*
232+
* @param internalNanos is the stat interval in nanoseconds
233+
* @throws IllegalArgumentException if the supplied interval is too small to be effective
234+
*/
235+
TimeBasedAccumulator(long internalNanos, TimeProvider timeProvider) {
236+
checkArgument(
237+
internalNanos >= NUM_SLOTS,
238+
"Interval must be greater than %s",
239+
NUM_SLOTS);
240+
this.interval = internalNanos;
241+
this.slotNanos = internalNanos / NUM_SLOTS;
242+
this.currentIndex = 0;
243+
for (int i = 0; i < NUM_SLOTS; i++) {
244+
slots.set(i, NULL_SLOT);
245+
}
246+
this.timeProvider = checkNotNull(timeProvider, "ticker");
247+
}
248+
249+
/** Gets the current slot. */
250+
private Slot getSlot(long now) {
251+
Slot currentSlot = slots.get(currentIndex);
252+
if (now < currentSlot.endNanos) {
253+
return currentSlot;
254+
} else {
255+
long slotBoundary = getSlotEndTime(now);
256+
synchronized (this) {
257+
int index = currentIndex;
258+
currentSlot = slots.get(index);
259+
if (now < currentSlot.endNanos) {
260+
return currentSlot;
261+
}
262+
int newIndex = (index == NUM_SLOTS - 1) ? 0 : index + 1;
263+
Slot nextSlot = new Slot(slotBoundary);
264+
slots.set(newIndex, nextSlot);
265+
// Set currentIndex only after assigning the new slot to slots, otherwise
266+
// racing readers will see NULL_SLOT or an old slot.
267+
currentIndex = newIndex;
268+
return nextSlot;
269+
}
270+
}
271+
}
272+
273+
/**
274+
* Computes the end boundary since the last bucket can be partial size.
275+
*
276+
* @param time the time for which to find the nearest slot boundary
277+
* @return the nearest slot boundary in nanos
278+
*/
279+
private long getSlotEndTime(long time) {
280+
return (time / slotNanos + 1) * slotNanos;
281+
}
282+
283+
/**
284+
* Returns the interval used by this statistic.
285+
*
286+
* @return the interval
287+
*/
288+
public long getInterval() {
289+
return this.interval;
290+
}
291+
292+
/**
293+
* Increments the count of the statistic by the specified amount for the specified time.
294+
*
295+
* @param now is the time used to increment the count
296+
*/
297+
final void increment(long now) {
298+
getSlot(now).increment();
299+
}
300+
301+
/**
302+
* Returns the count of the statistic using the specified time value as the current time.
303+
*
304+
* @param now the current time
305+
* @return the statistic count
306+
*/
307+
public final long get(long now) {
308+
long intervalEnd = getSlotEndTime(now);
309+
long intervalStart = intervalEnd - interval;
310+
// This is the point at which increments to new slots will be ignored.
311+
int index = currentIndex;
312+
313+
long accumulated = 0L;
314+
long prevSlotEnd = Long.MAX_VALUE;
315+
for (int i = 0; i < NUM_SLOTS; i++) {
316+
if (index < 0) {
317+
index = NUM_SLOTS - 1;
318+
}
319+
Slot currentSlot = slots.get(index);
320+
index--;
321+
long currentSlotEnd = currentSlot.endNanos;
322+
323+
if (currentSlotEnd <= intervalStart || currentSlotEnd > prevSlotEnd) {
324+
break;
325+
}
326+
prevSlotEnd = currentSlotEnd;
327+
328+
if (currentSlotEnd > intervalEnd) {
329+
continue;
330+
}
331+
accumulated = accumulated + currentSlot.count;
332+
}
333+
return accumulated;
334+
}
335+
336+
@Override
337+
public String toString() {
338+
return MoreObjects.toStringHelper(this)
339+
.add("interval", interval)
340+
.add("current_count", get(timeProvider.currentTimeNanos()))
341+
.toString();
342+
}
343+
}
344+
}

0 commit comments

Comments
 (0)