Skip to content

Commit fda13d5

Browse files
authored
feat: add WithMaxSubscriptions config backed by stream vs unary grpc connection pools (#450)
Opted for minimal config changes to maintain backwards compatibility. - Added `WithMaxSubscriptions` and an `IsNumStreamChannelsDynamic` flag to the `TopicConfiguration` - Topics configs will default to using a static pool of stream grpc channels for subscriptions (defaults to 4 for both unary and stream) - Using WithMaxSubscriptions will tell the topic client to use a dynamic pool instead (starts from 1 channel, grows to support the specified maxSubscriptions) Also added more tests to the subscription initialization tests (part of the retry tests) to exercise the dynamic channel creation. The main difference between the static and dynamic pools are: - `StaticStreamGrpcConnectionPool` uses a regular `java.util.List` that is initialized once in the constructor and is never modified. - `DynamicStreamGrpcConnectionPool` uses `java.util.concurrent.CopyOnWriteArrayList` and `AtomicInteger` to be able to dynamically add channels as needed up until the maximum number of channels has been reached.
1 parent b5acdd4 commit fda13d5

10 files changed

+858
-151
lines changed

momento-sdk/src/intTest/java/momento/sdk/retry/TopicsSubscriptionInitializationTest.java

Lines changed: 422 additions & 12 deletions
Large diffs are not rendered by default.
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package momento.sdk;
2+
3+
import grpc.cache_client.pubsub.PubsubGrpc;
4+
import io.grpc.ManagedChannel;
5+
import java.io.Closeable;
6+
import java.util.UUID;
7+
import java.util.concurrent.CopyOnWriteArrayList;
8+
import java.util.concurrent.atomic.AtomicInteger;
9+
import java.util.stream.Collectors;
10+
import java.util.stream.IntStream;
11+
import momento.sdk.auth.CredentialProvider;
12+
import momento.sdk.config.TopicConfiguration;
13+
import momento.sdk.exceptions.ClientSdkException;
14+
import momento.sdk.exceptions.MomentoErrorCode;
15+
import momento.sdk.internal.GrpcChannelOptions;
16+
17+
public class DynamicStreamGrpcConnectionPool implements StreamTopicGrpcConnectionPool, Closeable {
18+
private final CredentialProvider credentialProvider;
19+
private final TopicConfiguration configuration;
20+
private final UUID connectionIdKey;
21+
22+
private final AtomicInteger index = new AtomicInteger(0);
23+
private final AtomicInteger currentNumStreamGrpcChannels = new AtomicInteger(1);
24+
private final int maxStreamGrpcChannels;
25+
26+
private final int currentMaxConcurrentStreams;
27+
private final AtomicInteger currentNumActiveStreams = new AtomicInteger(0);
28+
29+
private final CopyOnWriteArrayList<ManagedChannel> streamChannels;
30+
private final CopyOnWriteArrayList<StreamStubWithCount> streamStubs;
31+
32+
public DynamicStreamGrpcConnectionPool(
33+
CredentialProvider credentialProvider,
34+
TopicConfiguration configuration,
35+
UUID connectionIdKey) {
36+
this.currentMaxConcurrentStreams = GrpcChannelOptions.NUM_CONCURRENT_STREAMS_PER_GRPC_CHANNEL;
37+
this.maxStreamGrpcChannels =
38+
configuration.getTransportStrategy().getGrpcConfiguration().getNumStreamGrpcChannels();
39+
40+
this.credentialProvider = credentialProvider;
41+
this.configuration = configuration;
42+
this.connectionIdKey = connectionIdKey;
43+
44+
this.streamChannels =
45+
IntStream.range(0, this.currentNumStreamGrpcChannels.get())
46+
.mapToObj(
47+
i ->
48+
TopicGrpcConnectionPoolUtils.setupConnection(
49+
credentialProvider, configuration, connectionIdKey))
50+
.collect(Collectors.toCollection(CopyOnWriteArrayList::new));
51+
this.streamStubs =
52+
streamChannels.stream()
53+
.map(PubsubGrpc::newStub)
54+
.map(StreamStubWithCount::new)
55+
.collect(Collectors.toCollection(CopyOnWriteArrayList::new));
56+
}
57+
58+
// Multiple threads could get to the point of seeing currentNumActiveStreams ==
59+
// currentMaxConcurrentStreams,
60+
// but we need to ensure only one thread will add a new channel at a time so that we don't exceed
61+
// the max number of channels.
62+
private void addNewChannel() {
63+
final int updatedCount = this.currentNumStreamGrpcChannels.incrementAndGet();
64+
65+
if (updatedCount > this.maxStreamGrpcChannels) {
66+
this.currentNumStreamGrpcChannels.decrementAndGet();
67+
return;
68+
}
69+
70+
this.streamChannels.add(
71+
TopicGrpcConnectionPoolUtils.setupConnection(
72+
credentialProvider, configuration, connectionIdKey));
73+
this.streamStubs.add(
74+
new StreamStubWithCount(
75+
PubsubGrpc.newStub(
76+
TopicGrpcConnectionPoolUtils.setupConnection(
77+
credentialProvider, configuration, connectionIdKey))));
78+
}
79+
80+
@Override
81+
public StreamStubWithCount getNextStreamStub() {
82+
// Check if we've reached the current max number of active streams.
83+
if (this.currentNumActiveStreams.get() == this.currentMaxConcurrentStreams) {
84+
// If we have not yet reached the maximum number of channels, add a new channel.
85+
if (this.currentNumStreamGrpcChannels.get() < this.maxStreamGrpcChannels) {
86+
this.addNewChannel();
87+
} else {
88+
// Otherwise return an error because all channels and streams are occupied.
89+
throw new ClientSdkException(
90+
MomentoErrorCode.CLIENT_RESOURCE_EXHAUSTED,
91+
"Maximum number of active subscriptions reached");
92+
}
93+
}
94+
95+
// Try to get a client with capacity for another subscription
96+
// by round-robining through the stubs.
97+
// Allow up to maximumActiveSubscriptions attempts to account for large bursts of requests.
98+
final int maximumActiveSubscriptions =
99+
this.currentNumStreamGrpcChannels.get()
100+
* GrpcChannelOptions.NUM_CONCURRENT_STREAMS_PER_GRPC_CHANNEL;
101+
for (int i = 0; i < maximumActiveSubscriptions; i++) {
102+
final StreamStubWithCount stubWithCount =
103+
streamStubs.get(index.getAndIncrement() % this.currentNumStreamGrpcChannels.get());
104+
try {
105+
stubWithCount.acquireStubOrThrow();
106+
this.currentNumActiveStreams.incrementAndGet();
107+
return stubWithCount;
108+
} catch (ClientSdkException e) {
109+
// If the stub is at capacity, continue to the next one.
110+
continue;
111+
}
112+
}
113+
114+
// Otherwise return an error if no stubs have capacity.
115+
throw new ClientSdkException(
116+
MomentoErrorCode.CLIENT_RESOURCE_EXHAUSTED,
117+
"Maximum number of active subscriptions reached");
118+
}
119+
120+
@Override
121+
public void close() {
122+
streamChannels.forEach(ManagedChannel::shutdown);
123+
}
124+
}
Lines changed: 19 additions & 139 deletions
Original file line numberDiff line numberDiff line change
@@ -1,64 +1,11 @@
11
package momento.sdk;
22

33
import grpc.cache_client.pubsub.PubsubGrpc;
4-
import io.grpc.ClientInterceptor;
5-
import io.grpc.ManagedChannel;
6-
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
74
import java.io.Closeable;
8-
import java.time.Duration;
9-
import java.util.ArrayList;
10-
import java.util.Collections;
11-
import java.util.List;
125
import java.util.UUID;
13-
import java.util.concurrent.TimeUnit;
14-
import java.util.concurrent.atomic.AtomicInteger;
15-
import java.util.stream.Collectors;
16-
import java.util.stream.IntStream;
176
import javax.annotation.Nonnull;
187
import momento.sdk.auth.CredentialProvider;
198
import momento.sdk.config.TopicConfiguration;
20-
import momento.sdk.config.middleware.Middleware;
21-
import momento.sdk.config.middleware.MiddlewareRequestHandlerContext;
22-
import momento.sdk.exceptions.ClientSdkException;
23-
import momento.sdk.exceptions.MomentoErrorCode;
24-
import momento.sdk.internal.GrpcChannelOptions;
25-
26-
// Helper class for bookkeeping the number of active concurrent subscriptions.
27-
final class StreamStubWithCount {
28-
private final PubsubGrpc.PubsubStub stub;
29-
private final AtomicInteger count = new AtomicInteger(0);
30-
31-
StreamStubWithCount(PubsubGrpc.PubsubStub stub) {
32-
this.stub = stub;
33-
}
34-
35-
PubsubGrpc.PubsubStub getStub() {
36-
return stub;
37-
}
38-
39-
int getCount() {
40-
return count.get();
41-
}
42-
43-
int incrementCount() {
44-
return count.incrementAndGet();
45-
}
46-
47-
int decrementCount() {
48-
return count.decrementAndGet();
49-
}
50-
51-
void acquireStubOrThrow() throws ClientSdkException {
52-
if (count.incrementAndGet() <= 100) {
53-
return;
54-
} else {
55-
count.decrementAndGet();
56-
throw new ClientSdkException(
57-
MomentoErrorCode.CLIENT_RESOURCE_EXHAUSTED,
58-
"Maximum number of active subscriptions reached");
59-
}
60-
}
61-
}
629

6310
/**
6411
* Manager responsible for GRPC channels and stubs for the Topics.
@@ -68,110 +15,43 @@ void acquireStubOrThrow() throws ClientSdkException {
6815
* impacting the API business logic.
6916
*/
7017
final class ScsTopicGrpcStubsManager implements Closeable {
71-
72-
private final List<ManagedChannel> unaryChannels;
73-
private final List<PubsubGrpc.PubsubStub> unaryStubs;
74-
private final AtomicInteger unaryIndex = new AtomicInteger(0);
75-
76-
private final List<ManagedChannel> streamChannels;
77-
private final List<StreamStubWithCount> streamStubs;
78-
private final AtomicInteger streamIndex = new AtomicInteger(0);
79-
8018
public static final UUID CONNECTION_ID_KEY = UUID.randomUUID();
8119

82-
private final int numUnaryGrpcChannels;
83-
private final int numStreamGrpcChannels;
20+
private final UnaryTopicGrpcConnectionPool unaryConnectionPool;
21+
private final StreamTopicGrpcConnectionPool streamConnectionPool;
22+
8423
private final TopicConfiguration configuration;
85-
private final Duration deadline;
8624

8725
ScsTopicGrpcStubsManager(
8826
@Nonnull CredentialProvider credentialProvider, @Nonnull TopicConfiguration configuration) {
8927
this.configuration = configuration;
90-
this.deadline = configuration.getTransportStrategy().getGrpcConfiguration().getDeadline();
91-
this.numUnaryGrpcChannels =
92-
configuration.getTransportStrategy().getGrpcConfiguration().getNumUnaryGrpcChannels();
93-
this.numStreamGrpcChannels =
94-
configuration.getTransportStrategy().getGrpcConfiguration().getNumStreamGrpcChannels();
95-
96-
this.unaryChannels =
97-
IntStream.range(0, this.numUnaryGrpcChannels)
98-
.mapToObj(i -> setupConnection(credentialProvider, configuration))
99-
.collect(Collectors.toList());
100-
this.unaryStubs = unaryChannels.stream().map(PubsubGrpc::newStub).collect(Collectors.toList());
101-
102-
this.streamChannels =
103-
IntStream.range(0, this.numStreamGrpcChannels)
104-
.mapToObj(i -> setupConnection(credentialProvider, configuration))
105-
.collect(Collectors.toList());
106-
this.streamStubs =
107-
streamChannels.stream()
108-
.map(PubsubGrpc::newStub)
109-
.map(StreamStubWithCount::new)
110-
.collect(Collectors.toList());
111-
}
112-
113-
private static ManagedChannel setupConnection(
114-
CredentialProvider credentialProvider, TopicConfiguration configuration) {
115-
final NettyChannelBuilder channelBuilder =
116-
NettyChannelBuilder.forAddress(
117-
credentialProvider.getCacheEndpoint(), credentialProvider.getPort());
28+
this.unaryConnectionPool =
29+
new StaticUnaryGrpcConnectionPool(credentialProvider, configuration, CONNECTION_ID_KEY);
11830

119-
// set additional channel options (message size, keepalive, auth, etc)
120-
GrpcChannelOptions.applyGrpcConfigurationToChannelBuilder(
121-
configuration.getTransportStrategy().getGrpcConfiguration(),
122-
channelBuilder,
123-
credentialProvider.isEndpointSecure());
124-
125-
final List<ClientInterceptor> clientInterceptors = new ArrayList<>();
126-
127-
final List<Middleware> middlewares = configuration.getMiddlewares();
128-
final MiddlewareRequestHandlerContext context =
129-
() -> Collections.singletonMap(CONNECTION_ID_KEY.toString(), UUID.randomUUID().toString());
130-
clientInterceptors.add(new GrpcMiddlewareInterceptor(middlewares, context));
131-
132-
clientInterceptors.add(new UserHeaderInterceptor(credentialProvider.getAuthToken(), "topic"));
133-
channelBuilder.intercept(clientInterceptors);
134-
return channelBuilder.build();
31+
if (configuration.getIsNumStreamChannelsDynamic()) {
32+
this.streamConnectionPool =
33+
new DynamicStreamGrpcConnectionPool(credentialProvider, configuration, CONNECTION_ID_KEY);
34+
} else {
35+
this.streamConnectionPool =
36+
new StaticStreamGrpcConnectionPool(credentialProvider, configuration, CONNECTION_ID_KEY);
37+
}
13538
}
13639

137-
/** Round-robin publish stub. */
138-
PubsubGrpc.PubsubStub getNextUnaryStub() {
139-
return unaryStubs
140-
.get(unaryIndex.getAndIncrement() % this.numUnaryGrpcChannels)
141-
.withDeadlineAfter(deadline.toMillis(), TimeUnit.MILLISECONDS);
40+
TopicConfiguration getConfiguration() {
41+
return configuration;
14242
}
14343

144-
/** Round-robin subscribe stub. */
14544
StreamStubWithCount getNextStreamStub() {
146-
// Try to get a client with capacity for another subscription
147-
// by round-robining through the stubs.
148-
// Allow up to maximumActiveSubscriptions attempts to account for large bursts of requests.
149-
final int maximumActiveSubscriptions = this.numStreamGrpcChannels * 100;
150-
for (int i = 0; i < maximumActiveSubscriptions; i++) {
151-
final StreamStubWithCount stubWithCount =
152-
streamStubs.get(streamIndex.getAndIncrement() % this.numStreamGrpcChannels);
153-
try {
154-
stubWithCount.acquireStubOrThrow();
155-
return stubWithCount;
156-
} catch (ClientSdkException e) {
157-
// If the stub is at capacity, continue to the next one.
158-
continue;
159-
}
160-
}
161-
162-
// Otherwise return an error if no stubs have capacity.
163-
throw new ClientSdkException(
164-
MomentoErrorCode.CLIENT_RESOURCE_EXHAUSTED,
165-
"Maximum number of active subscriptions reached");
45+
return streamConnectionPool.getNextStreamStub();
16646
}
16747

168-
TopicConfiguration getConfiguration() {
169-
return configuration;
48+
PubsubGrpc.PubsubStub getNextUnaryStub() {
49+
return unaryConnectionPool.getNextUnaryStub();
17050
}
17151

17252
@Override
17353
public void close() {
174-
unaryChannels.forEach(ManagedChannel::shutdown);
175-
streamChannels.forEach(ManagedChannel::shutdown);
54+
unaryConnectionPool.close();
55+
streamConnectionPool.close();
17656
}
17757
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package momento.sdk;
2+
3+
import grpc.cache_client.pubsub.PubsubGrpc;
4+
import io.grpc.ManagedChannel;
5+
import java.io.Closeable;
6+
import java.util.List;
7+
import java.util.UUID;
8+
import java.util.concurrent.atomic.AtomicInteger;
9+
import java.util.stream.Collectors;
10+
import java.util.stream.IntStream;
11+
import momento.sdk.auth.CredentialProvider;
12+
import momento.sdk.config.TopicConfiguration;
13+
import momento.sdk.exceptions.ClientSdkException;
14+
import momento.sdk.exceptions.MomentoErrorCode;
15+
import momento.sdk.internal.GrpcChannelOptions;
16+
17+
class StaticStreamGrpcConnectionPool implements StreamTopicGrpcConnectionPool, Closeable {
18+
private final AtomicInteger index = new AtomicInteger(0);
19+
private final int numStreamGrpcChannels;
20+
private final List<ManagedChannel> streamChannels;
21+
private final List<StreamStubWithCount> streamStubs;
22+
23+
public StaticStreamGrpcConnectionPool(
24+
CredentialProvider credentialProvider,
25+
TopicConfiguration configuration,
26+
UUID connectionIdKey) {
27+
this.numStreamGrpcChannels =
28+
configuration.getTransportStrategy().getGrpcConfiguration().getNumStreamGrpcChannels();
29+
this.streamChannels =
30+
IntStream.range(0, this.numStreamGrpcChannels)
31+
.mapToObj(
32+
i ->
33+
TopicGrpcConnectionPoolUtils.setupConnection(
34+
credentialProvider, configuration, connectionIdKey))
35+
.collect(Collectors.toList());
36+
this.streamStubs =
37+
streamChannels.stream()
38+
.map(PubsubGrpc::newStub)
39+
.map(StreamStubWithCount::new)
40+
.collect(Collectors.toList());
41+
}
42+
43+
@Override
44+
public StreamStubWithCount getNextStreamStub() {
45+
// Try to get a client with capacity for another subscription
46+
// by round-robining through the stubs.
47+
// Allow up to maximumActiveSubscriptions attempts to account for large bursts of requests.
48+
final int maximumActiveSubscriptions =
49+
this.numStreamGrpcChannels * GrpcChannelOptions.NUM_CONCURRENT_STREAMS_PER_GRPC_CHANNEL;
50+
for (int i = 0; i < maximumActiveSubscriptions; i++) {
51+
final StreamStubWithCount stubWithCount =
52+
streamStubs.get(index.getAndIncrement() % this.numStreamGrpcChannels);
53+
try {
54+
stubWithCount.acquireStubOrThrow();
55+
return stubWithCount;
56+
} catch (ClientSdkException e) {
57+
// If the stub is at capacity, continue to the next one.
58+
continue;
59+
}
60+
}
61+
62+
// Otherwise return an error if no stubs have capacity.
63+
throw new ClientSdkException(
64+
MomentoErrorCode.CLIENT_RESOURCE_EXHAUSTED,
65+
"Maximum number of active subscriptions reached");
66+
}
67+
68+
@Override
69+
public void close() {
70+
streamChannels.forEach(ManagedChannel::shutdown);
71+
}
72+
}

0 commit comments

Comments
 (0)