Skip to content

Commit bad81a5

Browse files
authored
Implements batching of flow control requests (#92)
* Implemented batching of flow control requests * Minor renames of variables * Tests and review comments * Expose constant for flow request flush interval * Review comments: revert name requestForRestart
1 parent 74d61fd commit bad81a5

File tree

5 files changed

+282
-27
lines changed

5 files changed

+282
-27
lines changed
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsublite.internal.wire;
18+
19+
import com.google.cloud.pubsublite.SequencedMessage;
20+
import com.google.cloud.pubsublite.proto.FlowControlRequest;
21+
import io.grpc.StatusException;
22+
import java.util.Collection;
23+
import java.util.Optional;
24+
25+
// A FlowControlBatcher manages batching of FlowControlRequests.
26+
class FlowControlBatcher {
27+
// If the pending flow request exceeds this ratio of the current outstanding tokens, the pending
28+
// flow request should be flushed to the stream ASAP.
29+
private static double EXPEDITE_BATCH_REQUEST_RATIO = 0.5;
30+
31+
// The current amount of outstanding byte and message flow control tokens.
32+
private final TokenCounter clientTokens = new TokenCounter();
33+
34+
// The pending aggregate flow control request that needs to be sent to the stream.
35+
private final TokenCounter pendingTokens = new TokenCounter();
36+
37+
void onClientFlowRequest(FlowControlRequest request) throws StatusException {
38+
clientTokens.add(request.getAllowedBytes(), request.getAllowedMessages());
39+
pendingTokens.add(request.getAllowedBytes(), request.getAllowedMessages());
40+
}
41+
42+
void onMessages(Collection<SequencedMessage> received) throws StatusException {
43+
long byteSize = received.stream().mapToLong(SequencedMessage::byteSize).sum();
44+
clientTokens.sub(byteSize, received.size());
45+
}
46+
47+
void onClientSeek() {
48+
clientTokens.reset();
49+
pendingTokens.reset();
50+
}
51+
52+
// The caller must send the FlowControlRequest to the stream, as pending tokens are reset.
53+
Optional<FlowControlRequest> requestForRestart() {
54+
pendingTokens.reset();
55+
return clientTokens.toFlowControlRequest();
56+
}
57+
58+
// The caller must send the FlowControlRequest to the stream, as pending tokens are reset.
59+
Optional<FlowControlRequest> releasePendingRequest() {
60+
Optional<FlowControlRequest> request = pendingTokens.toFlowControlRequest();
61+
pendingTokens.reset();
62+
return request;
63+
}
64+
65+
boolean shouldExpediteBatchRequest() {
66+
if (exceedsExpediteRatio(pendingTokens.bytes(), clientTokens.bytes())) return true;
67+
if (exceedsExpediteRatio(pendingTokens.messages(), clientTokens.messages())) return true;
68+
return false;
69+
}
70+
71+
private boolean exceedsExpediteRatio(long pending, long client) {
72+
return client > 0 && ((double) pending / client) >= EXPEDITE_BATCH_REQUEST_RATIO;
73+
}
74+
}

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java

Lines changed: 53 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,23 +38,32 @@
3838
import io.grpc.Status;
3939
import io.grpc.StatusException;
4040
import java.util.Optional;
41+
import java.util.concurrent.Executors;
42+
import java.util.concurrent.Future;
43+
import java.util.concurrent.ScheduledExecutorService;
44+
import java.util.concurrent.TimeUnit;
4145
import java.util.function.Consumer;
4246
import javax.annotation.concurrent.GuardedBy;
4347

4448
public class SubscriberImpl extends ProxyService
4549
implements Subscriber, RetryingConnectionObserver<Response> {
50+
@VisibleForTesting static final long FLOW_REQUESTS_FLUSH_INTERVAL_MS = 100;
51+
4652
private final Consumer<ImmutableList<SequencedMessage>> messageConsumer;
4753

4854
private final CloseableMonitor monitor = new CloseableMonitor();
4955

56+
private final ScheduledExecutorService executorService;
57+
private Future<?> alarmFuture;
58+
5059
@GuardedBy("monitor.monitor")
5160
private final RetryingConnection<ConnectedSubscriber> connection;
5261

5362
@GuardedBy("monitor.monitor")
5463
private final NextOffsetTracker nextOffsetTracker = new NextOffsetTracker();
5564

5665
@GuardedBy("monitor.monitor")
57-
private final TokenCounter tokenCounter = new TokenCounter();
66+
private final FlowControlBatcher flowControlBatcher = new FlowControlBatcher();
5867

5968
@GuardedBy("monitor.monitor")
6069
private Optional<InFlightSeek> inFlightSeek = Optional.empty();
@@ -89,6 +98,7 @@ private static class InFlightSeek {
8998
factory,
9099
SubscribeRequest.newBuilder().setInitial(initialRequest).build(),
91100
this);
101+
this.executorService = Executors.newSingleThreadScheduledExecutor();
92102
addServices(this.connection);
93103
}
94104

@@ -112,10 +122,21 @@ protected void handlePermanentError(StatusException error) {
112122
}
113123

114124
@Override
115-
protected void start() {}
125+
protected void start() {
126+
try (CloseableMonitor.Hold h = monitor.enter()) {
127+
alarmFuture =
128+
executorService.scheduleWithFixedDelay(
129+
this::processBatchFlowRequest,
130+
FLOW_REQUESTS_FLUSH_INTERVAL_MS,
131+
FLOW_REQUESTS_FLUSH_INTERVAL_MS,
132+
TimeUnit.MILLISECONDS);
133+
}
134+
}
116135

117136
@Override
118137
protected void stop() {
138+
alarmFuture.cancel(false /* mayInterruptIfRunning */);
139+
executorService.shutdown();
119140
try (CloseableMonitor.Hold h = monitor.enter()) {
120141
shutdown = true;
121142
inFlightSeek.ifPresent(
@@ -143,7 +164,7 @@ public boolean isSatisfied() {
143164
checkState(!inFlightSeek.isPresent(), "Seeked while seek is already in flight.");
144165
SettableApiFuture<Offset> future = SettableApiFuture.create();
145166
inFlightSeek = Optional.of(new InFlightSeek(request, future));
146-
tokenCounter.onClientSeek();
167+
flowControlBatcher.onClientSeek();
147168
connection.modifyConnection(
148169
connectedSubscriber ->
149170
connectedSubscriber.ifPresent(subscriber -> subscriber.seek(request)));
@@ -161,15 +182,16 @@ public boolean seekInFlight() {
161182
}
162183
}
163184

164-
// TODO: Consider batching these requests before sending to the stream.
165185
@Override
166-
public void allowFlow(FlowControlRequest request) {
186+
public void allowFlow(FlowControlRequest clientRequest) {
167187
try (CloseableMonitor.Hold h = monitor.enter()) {
168188
if (shutdown) return;
169-
tokenCounter.onClientFlowRequest(request);
170-
connection.modifyConnection(
171-
connectedSubscriber ->
172-
connectedSubscriber.ifPresent(subscriber -> subscriber.allowFlow(request)));
189+
flowControlBatcher.onClientFlowRequest(clientRequest);
190+
if (flowControlBatcher.shouldExpediteBatchRequest()) {
191+
connection.modifyConnection(
192+
connectedSubscriber ->
193+
connectedSubscriber.ifPresent(subscriber -> flushBatchFlowRequest(subscriber)));
194+
}
173195
} catch (StatusException e) {
174196
onPermanentError(e);
175197
throw e.getStatus().asRuntimeException();
@@ -197,7 +219,7 @@ public void triggerReinitialize() {
197219
connectedSubscriber.get().seek(request);
198220
});
199221
}
200-
tokenCounter
222+
flowControlBatcher
201223
.requestForRestart()
202224
.ifPresent(request -> connectedSubscriber.get().allowFlow(request));
203225
});
@@ -223,7 +245,7 @@ private Status onMessageResponse(ImmutableList<SequencedMessage> messages) {
223245
return Status.OK;
224246
}
225247
nextOffsetTracker.onMessages(messages);
226-
tokenCounter.onMessages(messages);
248+
flowControlBatcher.onMessages(messages);
227249
} catch (StatusException e) {
228250

229251
onPermanentError(e);
@@ -252,4 +274,24 @@ private Status onSeekResponse(Offset seekOffset) {
252274
return e.getStatus();
253275
}
254276
}
277+
278+
@VisibleForTesting
279+
void processBatchFlowRequest() {
280+
try (CloseableMonitor.Hold h = monitor.enter()) {
281+
if (shutdown) return;
282+
connection.modifyConnection(
283+
connectedSubscriber ->
284+
connectedSubscriber.ifPresent(subscriber -> flushBatchFlowRequest(subscriber)));
285+
} catch (StatusException e) {
286+
onPermanentError(e);
287+
}
288+
}
289+
290+
private void flushBatchFlowRequest(ConnectedSubscriber subscriber) {
291+
try (CloseableMonitor.Hold h = monitor.enter()) {
292+
flowControlBatcher
293+
.releasePendingRequest()
294+
.ifPresent(request -> subscriber.allowFlow(request));
295+
}
296+
}
255297
}

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/TokenCounter.java

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,43 +19,50 @@
1919
import static com.google.cloud.pubsublite.internal.Preconditions.checkArgument;
2020
import static com.google.cloud.pubsublite.internal.Preconditions.checkState;
2121

22-
import com.google.cloud.pubsublite.SequencedMessage;
2322
import com.google.cloud.pubsublite.proto.FlowControlRequest;
2423
import com.google.common.math.LongMath;
2524
import io.grpc.StatusException;
26-
import java.util.Collection;
2725
import java.util.Optional;
2826

29-
// A TokenCounter counts the amount of outstanding byte and message flow control tokens that the
27+
// A TokenCounter stores the amount of outstanding byte and message flow control tokens that the
3028
// client believes exists for the stream.
3129
class TokenCounter {
3230
private long bytes = 0;
3331
private long messages = 0;
3432

35-
void onClientFlowRequest(FlowControlRequest request) throws StatusException {
36-
checkArgument(request.getAllowedMessages() >= 0);
37-
checkArgument(request.getAllowedBytes() >= 0);
33+
void add(long deltaBytes, long deltaMessages) throws StatusException {
34+
checkArgument(deltaBytes >= 0);
35+
checkArgument(deltaMessages >= 0);
3836

39-
bytes = LongMath.saturatedAdd(bytes, request.getAllowedBytes());
40-
messages = LongMath.saturatedAdd(messages, request.getAllowedMessages());
37+
bytes = LongMath.saturatedAdd(bytes, deltaBytes);
38+
messages = LongMath.saturatedAdd(messages, deltaMessages);
4139
}
4240

43-
void onMessages(Collection<SequencedMessage> received) throws StatusException {
44-
long byteSize = received.stream().mapToLong(SequencedMessage::byteSize).sum();
41+
void sub(long deltaBytes, long deltaMessages) throws StatusException {
42+
checkArgument(deltaBytes >= 0);
43+
checkArgument(deltaMessages >= 0);
4544
checkState(
46-
byteSize <= bytes, "Received messages that account for more bytes than were requested.");
47-
checkState(received.size() <= messages, "Received more messages than were requested.");
45+
deltaBytes <= bytes, "Received messages that account for more bytes than were requested.");
46+
checkState(deltaMessages <= messages, "Received more messages than were requested.");
4847

49-
bytes -= byteSize;
50-
messages -= received.size();
48+
bytes -= deltaBytes;
49+
messages -= deltaMessages;
5150
}
5251

53-
void onClientSeek() {
52+
void reset() {
5453
bytes = 0;
5554
messages = 0;
5655
}
5756

58-
Optional<FlowControlRequest> requestForRestart() {
57+
long bytes() {
58+
return bytes;
59+
}
60+
61+
long messages() {
62+
return messages;
63+
}
64+
65+
Optional<FlowControlRequest> toFlowControlRequest() {
5966
if (bytes == 0 && messages == 0) return Optional.empty();
6067
return Optional.of(
6168
FlowControlRequest.newBuilder()
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsublite.internal.wire;
18+
19+
import static com.google.common.truth.Truth.assertThat;
20+
21+
import com.google.cloud.pubsublite.Message;
22+
import com.google.cloud.pubsublite.Offset;
23+
import com.google.cloud.pubsublite.SequencedMessage;
24+
import com.google.cloud.pubsublite.proto.FlowControlRequest;
25+
import com.google.common.collect.ImmutableList;
26+
import com.google.protobuf.util.Timestamps;
27+
import io.grpc.StatusException;
28+
import org.junit.Test;
29+
import org.junit.runner.RunWith;
30+
import org.junit.runners.JUnit4;
31+
32+
@RunWith(JUnit4.class)
33+
public class FlowControlBatcherTest {
34+
35+
private final FlowControlBatcher batcher = new FlowControlBatcher();
36+
37+
@Test
38+
public void onClientFlowRequestIncrementsTokens() throws StatusException {
39+
FlowControlRequest clientFlowRequest =
40+
FlowControlRequest.newBuilder().setAllowedBytes(500).setAllowedMessages(10).build();
41+
batcher.onClientFlowRequest(clientFlowRequest);
42+
43+
assertThat(batcher.releasePendingRequest().get()).isEqualTo(clientFlowRequest);
44+
assertThat(batcher.releasePendingRequest().isPresent()).isFalse();
45+
assertThat(batcher.requestForRestart().get()).isEqualTo(clientFlowRequest);
46+
assertThat(batcher.requestForRestart().get()).isEqualTo(clientFlowRequest);
47+
}
48+
49+
@Test
50+
public void onMessagesDecrementsClientTokens() throws StatusException {
51+
FlowControlRequest clientFlowRequest =
52+
FlowControlRequest.newBuilder().setAllowedBytes(500).setAllowedMessages(10).build();
53+
batcher.onClientFlowRequest(clientFlowRequest);
54+
ImmutableList<SequencedMessage> messages =
55+
ImmutableList.of(
56+
SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(0), 100),
57+
SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(1), 150));
58+
batcher.onMessages(messages);
59+
60+
assertThat(batcher.releasePendingRequest().get()).isEqualTo(clientFlowRequest);
61+
FlowControlRequest expectedRequestForRestart =
62+
FlowControlRequest.newBuilder().setAllowedBytes(250).setAllowedMessages(8).build();
63+
assertThat(batcher.requestForRestart().get()).isEqualTo(expectedRequestForRestart);
64+
}
65+
66+
@Test
67+
public void shouldExpediteBatchRequestChecksByteRatio() throws StatusException {
68+
batcher.onClientFlowRequest(
69+
FlowControlRequest.newBuilder().setAllowedBytes(100).setAllowedMessages(100).build());
70+
batcher.releasePendingRequest();
71+
72+
batcher.onClientFlowRequest(FlowControlRequest.newBuilder().setAllowedBytes(10).build());
73+
assertThat(batcher.shouldExpediteBatchRequest()).isFalse();
74+
75+
batcher.onClientFlowRequest(FlowControlRequest.newBuilder().setAllowedBytes(90).build());
76+
assertThat(batcher.shouldExpediteBatchRequest()).isTrue();
77+
}
78+
79+
@Test
80+
public void shouldExpediteBatchRequestChecksMessageRatio() throws StatusException {
81+
batcher.onClientFlowRequest(
82+
FlowControlRequest.newBuilder().setAllowedBytes(100).setAllowedMessages(100).build());
83+
batcher.releasePendingRequest();
84+
85+
batcher.onClientFlowRequest(FlowControlRequest.newBuilder().setAllowedMessages(80).build());
86+
assertThat(batcher.shouldExpediteBatchRequest()).isFalse();
87+
88+
batcher.onClientFlowRequest(FlowControlRequest.newBuilder().setAllowedMessages(20).build());
89+
assertThat(batcher.shouldExpediteBatchRequest()).isTrue();
90+
}
91+
92+
@Test
93+
public void shouldExpediteBatchRequestHandlesDivByZero() {
94+
assertThat(batcher.shouldExpediteBatchRequest()).isFalse();
95+
}
96+
}

0 commit comments

Comments
 (0)