Skip to content

Commit 0861dbc

Browse files
committed
add mid-stream error test
1 parent 69250c6 commit 0861dbc

File tree

1 file changed

+66
-3
lines changed

1 file changed

+66
-3
lines changed

momento-sdk/src/intTest/java/momento/sdk/retry/TopicsSubscriptionInitializationTest.java

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import static org.slf4j.LoggerFactory.getLogger;
77

88
import java.util.ArrayList;
9+
import java.util.Collections;
910
import java.util.List;
1011
import java.util.UUID;
1112
import java.util.concurrent.CompletableFuture;
@@ -88,7 +89,7 @@ public void oneStreamChannel_doesNotSilentlyQueueSubscribeRequestOnFullChannel()
8889
subscriptions.get(0).unsubscribe();
8990
// Wait for the subscription to end
9091
try {
91-
Thread.sleep(200);
92+
Thread.sleep(500);
9293
} catch (InterruptedException e) {
9394
Thread.currentThread().interrupt();
9495
throw new RuntimeException("Test interrupted while waiting for subscriptions", e);
@@ -153,7 +154,7 @@ public void multipleStreamChannels_handlesBurstOfSubscribeAndUnsubscribeRequests
153154
}
154155
// Wait a bit for the subscription to end
155156
try {
156-
Thread.sleep(200);
157+
Thread.sleep(500);
157158
} catch (InterruptedException e) {
158159
Thread.currentThread().interrupt();
159160
throw new RuntimeException("Test interrupted while waiting for subscriptions", e);
@@ -292,7 +293,7 @@ public void multipleStreamChannels_handlesBurstOfSubscribeRequestsAtOverMaxCapac
292293
}
293294

294295
@ParameterizedTest
295-
@ValueSource(ints = {2, 10})
296+
@ValueSource(ints = {2, 10, 20})
296297
@Timeout(30)
297298
public void multipleStreamChannels_handlesBurstOfSubscribeRequestsAtHalfOfMaxCapacity(
298299
int numGrpcChannels) throws Exception {
@@ -395,4 +396,66 @@ public void onError(Throwable t) {
395396
}
396397
});
397398
}
399+
400+
@Test
401+
@Timeout(30)
402+
public void oneStreamChannel_properlyDecrementsWhenErrorOccursMidStream() throws Exception {
403+
unsubscribeCounter = 0;
404+
final AtomicInteger unsubscribeOnErrorCounter = new AtomicInteger(0);
405+
final ISubscriptionCallbacks callbacks =
406+
new ISubscriptionCallbacks() {
407+
@Override
408+
public void onItem(TopicMessage message) {}
409+
410+
@Override
411+
public void onCompleted() {
412+
System.out.println("onCompleted");
413+
unsubscribeCounter++;
414+
}
415+
416+
@Override
417+
public void onError(Throwable t) {
418+
System.out.println("onError");
419+
unsubscribeOnErrorCounter.incrementAndGet();
420+
}
421+
};
422+
423+
final MomentoLocalMiddlewareArgs middlewareArgs =
424+
new MomentoLocalMiddlewareArgs.Builder(logger, UUID.randomUUID().toString())
425+
.streamError(MomentoErrorCode.NOT_FOUND_ERROR)
426+
.streamErrorRpcList(Collections.singletonList(MomentoRpcMethod.TOPIC_SUBSCRIBE))
427+
.streamErrorMessageLimit(3)
428+
.build();
429+
430+
withCacheAndTopicClientWithNumStreamChannels(
431+
1,
432+
middlewareArgs,
433+
(topicClient, cacheName) -> {
434+
List<TopicSubscribeResponse.Subscription> subscriptions = new ArrayList<>();
435+
436+
// Subscribe but expecting an error after a couple of heartbeats
437+
final TopicSubscribeResponse response =
438+
topicClient.subscribe(cacheName, "topic", callbacks).join();
439+
assertThat(response).isInstanceOf(TopicSubscribeResponse.Subscription.class);
440+
subscriptions.add((TopicSubscribeResponse.Subscription) response);
441+
442+
// Wait for the subscription that ran into the error to be closed
443+
try {
444+
Thread.sleep(3000);
445+
} catch (InterruptedException e) {
446+
Thread.currentThread().interrupt();
447+
throw new RuntimeException("Test interrupted while waiting for subscriptions", e);
448+
}
449+
450+
// Cleanup
451+
for (TopicSubscribeResponse.Subscription sub : subscriptions) {
452+
if (sub != null) {
453+
sub.unsubscribe();
454+
}
455+
}
456+
457+
assertEquals(0, unsubscribeCounter);
458+
assertEquals(1, unsubscribeOnErrorCounter.get());
459+
});
460+
}
398461
}

0 commit comments

Comments
 (0)