Skip to content

Commit b5acdd4

Browse files
authored
fix: add bookkeeping for number of active subscriptions (#448)
Found that the java sdk does not keep track of the number of concurrent subscriptions while auditing the implementations of separate grpc channels for publish and subscribe. Also added a new retry test file to verify that the TopicClient does not allow >100 streams on a single grpc channel and that it is capable of handling bursts of subscribe and unsubscribe requests.
1 parent 0a5e7fd commit b5acdd4

File tree

10 files changed

+645
-39
lines changed

10 files changed

+645
-39
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,4 @@ examples/lib/.classpath
4646
examples/lib/.project
4747
examples/lib/bin
4848
.vscode/
49+
momento-sdk/bin

Makefile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ test-leaderboard-service:
4242
test-topics-service:
4343
@CONSISTENT_READS=1 ./gradlew test-topics-service
4444

45+
## Run the topics subscription initialization tests
46+
test-topics-subscription-initialization:
47+
@CONSISTENT_READS=1 ./gradlew test-topics-subscription-initialization
48+
4549
## Run the http service tests
4650
test-http-service:
4751
@echo "No tests for http service."

momento-sdk/build.gradle.kts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,3 +109,8 @@ registerIntegrationTestTask(
109109
"test-retries",
110110
listOf("momento.sdk.retry.*")
111111
)
112+
113+
registerIntegrationTestTask(
114+
"test-topics-subscription-initialization",
115+
listOf("momento.sdk.retry.TopicsSubscriptionInitializationTest")
116+
)

momento-sdk/src/intTest/java/momento/sdk/retry/BaseMomentoLocalTestClass.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
import momento.sdk.config.Configurations;
1515
import momento.sdk.config.TopicConfiguration;
1616
import momento.sdk.config.TopicConfigurations;
17+
import momento.sdk.config.transport.GrpcConfiguration;
18+
import momento.sdk.config.transport.StaticTransportStrategy;
1719
import momento.sdk.responses.cache.control.CacheCreateResponse;
1820
import momento.sdk.retry.utils.MomentoLocalMiddleware;
1921
import momento.sdk.retry.utils.MomentoLocalMiddlewareArgs;
@@ -126,6 +128,39 @@ public static void withCacheAndTopicClient(
126128
}
127129
}
128130

131+
public static void withCacheAndTopicClientWithNumStreamChannels(
132+
int numStreamChannels,
133+
MomentoLocalMiddlewareArgs testMetricsMiddlewareArgs,
134+
TopicTestCallback testCallback)
135+
throws Exception {
136+
137+
final String cacheName = testCacheName();
138+
final String hostname =
139+
Optional.ofNullable(System.getenv("MOMENTO_HOSTNAME")).orElse("127.0.0.1");
140+
final int port =
141+
Optional.ofNullable(System.getenv("MOMENTO_PORT")).map(Integer::parseInt).orElse(8080);
142+
final CredentialProvider credentialProvider = new MomentoLocalProvider(hostname, port);
143+
144+
final GrpcConfiguration grpcConfig =
145+
new GrpcConfiguration(Duration.ofMillis(15000))
146+
.withNumStreamGrpcChannels(numStreamChannels);
147+
final TopicConfiguration topicConfiguration =
148+
new TopicConfiguration(new StaticTransportStrategy(grpcConfig))
149+
.withMiddleware(new MomentoLocalMiddleware(testMetricsMiddlewareArgs));
150+
151+
try (final CacheClient cacheClient =
152+
CacheClient.builder(
153+
credentialProvider, Configurations.Laptop.latest(), DEFAULT_TTL_SECONDS)
154+
.build();
155+
final TopicClient topicClient =
156+
TopicClient.builder(credentialProvider, topicConfiguration).build()) {
157+
if (cacheClient.createCache(cacheName).join() instanceof CacheCreateResponse.Error) {
158+
throw new RuntimeException("Failed to create cache: " + cacheName);
159+
}
160+
testCallback.run(topicClient, cacheName);
161+
}
162+
}
163+
129164
@FunctionalInterface
130165
public interface CacheTestCallback {
131166
void run(CacheClient cc, String cacheName) throws Exception;

0 commit comments

Comments
 (0)