Skip to content

Commit 0a5e7fd

Browse files
nand4011anitarua
andauthored
chore: improve subscription reconnection logic (#435)
Move the topic subscription reconnection logic into a retry strategy and eligibility strategy that can be configured by the client. Add the subscription retry strategy to TopicConfiguration. Add new constructors that take the retry strategy, and new ones that don't take a logger, which is only used inside the TopicConfiguration itself. Prevent potential race conditions that could occur when unsubscribing from a connection while it is reconnecting by checking if the user called unsubscribe before reconnecting, and by making the variables used in reconnection atomic. Call close in SubscriptionWrapper when a stream ends to stop a subscription from leaking threads from the retry executor service. We may want to share a thread pool between the subscriptions instead of making a new one per subscription, so that we don't need to make a new one each time. We could also use it to execute the callbacks, which are currently executed in the gRPC thread pool and could cause backpressure if they are long-running. Add empty default implementations for the test methods in IScsTopicConnection to clean up its use in the topic client. Remove SendSubscribeOptions, since we already have a class that contains the callbacks, and because it had an unused subscription object. Get rid of CancelableClientCallStreamObserver's ClientCallStreamObserver extension, because that class is meant for observing requests, not responses, and we only need a simple cancel method. Make TopicClientLocalTest's unrecoverable error 'not found', because the previous error is now recoverable. Unsubscribe should end with onCompleted instead of onError --------- Co-authored-by: anitarua <anita@momentohq.com>
1 parent 372aa0f commit 0a5e7fd

12 files changed

+447
-394
lines changed

momento-sdk/src/intTest/java/momento/sdk/retry/TopicClientLocalTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ void testSubscribe_shouldNotRetryWithUnrecoverableError() throws Exception {
119119
final int streamErrorMessageLimit = 3;
120120
final MomentoLocalMiddlewareArgs momentoLocalMiddlewareArgs =
121121
new MomentoLocalMiddlewareArgs.Builder(logger, UUID.randomUUID().toString())
122-
.streamError(MomentoErrorCode.INTERNAL_SERVER_ERROR)
122+
.streamError(MomentoErrorCode.NOT_FOUND_ERROR)
123123
.streamErrorRpcList(Collections.singletonList(MomentoRpcMethod.TOPIC_SUBSCRIBE))
124124
.streamErrorMessageLimit(streamErrorMessageLimit)
125125
.build();

momento-sdk/src/main/java/momento/sdk/CancelableClientCallStreamObserver.java

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,36 +5,18 @@
55
import javax.annotation.Nullable;
66

77
public abstract class CancelableClientCallStreamObserver<TResp>
8-
extends ClientCallStreamObserver<TResp> implements ClientResponseObserver<Object, TResp> {
8+
implements ClientResponseObserver<Object, TResp> {
99

10-
private ClientCallStreamObserver requestStream;
10+
private ClientCallStreamObserver<Object> requestStream;
1111

12-
@Override
13-
public boolean isReady() {
14-
return false;
15-
}
16-
17-
@Override
18-
public void setOnReadyHandler(Runnable onReadyHandler) {}
19-
20-
@Override
21-
public void request(int count) {}
22-
23-
@Override
24-
public void setMessageCompression(boolean enable) {}
25-
26-
@Override
27-
public void disableAutoInboundFlowControl() {}
28-
29-
@Override
3012
public void cancel(@Nullable String message, @Nullable Throwable cause) {
3113
if (requestStream != null) {
3214
requestStream.cancel(message, cause);
3315
}
3416
}
3517

3618
@Override
37-
public void beforeStart(ClientCallStreamObserver requestStream) {
19+
public void beforeStart(ClientCallStreamObserver<Object> requestStream) {
3820
this.requestStream = requestStream;
3921
}
4022
}

momento-sdk/src/main/java/momento/sdk/IScsTopicConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,15 @@ interface IScsTopicConnection {
1212
* <p>Note: This method is intended for testing purposes and should never be called from outside
1313
* of tests.
1414
*/
15-
void close();
15+
default void close() {}
1616

1717
/**
1818
* Opens the connection.
1919
*
2020
* <p>Note: This method is intended for testing purposes and should never be called from outside
2121
* of tests.
2222
*/
23-
void open();
23+
default void open() {}
2424

2525
/**
2626
* Subscribes to a specific topic using the provided subscription request and observer.

momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java

Lines changed: 24 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,8 @@
22

33
import com.google.protobuf.ByteString;
44
import grpc.cache_client.pubsub._PublishRequest;
5-
import grpc.cache_client.pubsub._SubscriptionItem;
6-
import grpc.cache_client.pubsub._SubscriptionRequest;
75
import grpc.cache_client.pubsub._TopicValue;
6+
import grpc.common._Empty;
87
import io.grpc.stub.StreamObserver;
98
import java.util.concurrent.CompletableFuture;
109
import javax.annotation.Nonnull;
@@ -14,19 +13,19 @@
1413
import momento.sdk.internal.SubscriptionState;
1514
import momento.sdk.responses.topic.TopicPublishResponse;
1615
import momento.sdk.responses.topic.TopicSubscribeResponse;
17-
import org.slf4j.Logger;
18-
import org.slf4j.LoggerFactory;
16+
import momento.sdk.retry.SubscriptionRetryStrategy;
1917

2018
public class ScsTopicClient extends ScsClientBase {
2119

22-
private final Logger logger = LoggerFactory.getLogger(ScsTopicClient.class);
2320
private final ScsTopicGrpcStubsManager topicGrpcStubsManager;
2421
private final long DEFAULT_REQUEST_TIMEOUT_SECONDS = 5;
22+
private final SubscriptionRetryStrategy subscriptionRetryStrategy;
2523

2624
public ScsTopicClient(
2725
@Nonnull CredentialProvider credentialProvider, @Nonnull TopicConfiguration configuration) {
2826
super(null);
2927
this.topicGrpcStubsManager = new ScsTopicGrpcStubsManager(credentialProvider, configuration);
28+
this.subscriptionRetryStrategy = configuration.getSubscriptionRetryStrategy();
3029
}
3130

3231
public CompletableFuture<TopicPublishResponse> publish(
@@ -60,7 +59,7 @@ public CompletableFuture<TopicPublishResponse> publish(
6059
}
6160

6261
public CompletableFuture<TopicSubscribeResponse> subscribe(
63-
String cacheName, String topicName, ISubscriptionCallbacks options) {
62+
String cacheName, String topicName, ISubscriptionCallbacks callbacks) {
6463
try {
6564
ValidationUtils.checkCacheNameValid(cacheName);
6665
ValidationUtils.checkTopicNameValid(topicName);
@@ -69,24 +68,7 @@ public CompletableFuture<TopicSubscribeResponse> subscribe(
6968
new TopicSubscribeResponse.Error(CacheServiceExceptionMapper.convert(e)));
7069
}
7170

72-
SubscriptionState subscriptionState = new SubscriptionState();
73-
TopicSubscribeResponse.Subscription subscription =
74-
new TopicSubscribeResponse.Subscription(subscriptionState);
75-
SendSubscribeOptions sendSubscribeOptions =
76-
new SendSubscribeOptions(
77-
cacheName,
78-
topicName,
79-
options::onItem,
80-
options::onCompleted,
81-
options::onError,
82-
options::onDiscontinuity,
83-
options::onHeartbeat,
84-
options::onConnectionLost,
85-
options::onConnectionRestored,
86-
subscriptionState,
87-
subscription);
88-
89-
return sendSubscribe(sendSubscribeOptions);
71+
return sendSubscribe(cacheName, topicName, callbacks);
9072
}
9173

9274
private CompletableFuture<TopicPublishResponse> sendPublish(
@@ -105,10 +87,10 @@ private CompletableFuture<TopicPublishResponse> sendPublish(
10587
.getNextUnaryStub()
10688
.publish(
10789
request,
108-
new StreamObserver() {
90+
new StreamObserver<_Empty>() {
10991

11092
@Override
111-
public void onNext(Object value) {
93+
public void onNext(_Empty value) {
11294
// Do nothing
11395
}
11496

@@ -133,28 +115,12 @@ public void onCompleted() {
133115
}
134116

135117
private CompletableFuture<TopicSubscribeResponse> sendSubscribe(
136-
SendSubscribeOptions sendSubscribeOptions) {
137-
SubscriptionWrapper subscriptionWrapper;
138-
139-
IScsTopicConnection connection =
140-
new IScsTopicConnection() {
141-
@Override
142-
public void close() {
143-
logger.warn("Closing the connection (for testing purposes only)");
144-
}
118+
String cacheName, String topicName, ISubscriptionCallbacks callbacks) {
119+
final SubscriptionState subscriptionState = new SubscriptionState();
145120

146-
@Override
147-
public void open() {
148-
logger.warn("Opening the connection (for testing purposes only)");
149-
}
150-
151-
@Override
152-
public void subscribe(
153-
_SubscriptionRequest subscriptionRequest,
154-
CancelableClientCallStreamObserver<_SubscriptionItem> subscription) {
155-
topicGrpcStubsManager.getNextStreamStub().subscribe(subscriptionRequest, subscription);
156-
}
157-
};
121+
final IScsTopicConnection connection =
122+
(request, subscription) ->
123+
topicGrpcStubsManager.getNextStreamStub().subscribe(request, subscription);
158124

159125
long configuredTimeoutSeconds =
160126
topicGrpcStubsManager
@@ -166,18 +132,24 @@ public void subscribe(
166132
long firstMessageSubscribeTimeoutSeconds =
167133
configuredTimeoutSeconds > 0 ? configuredTimeoutSeconds : DEFAULT_REQUEST_TIMEOUT_SECONDS;
168134

169-
subscriptionWrapper =
135+
@SuppressWarnings("resource") // the wrapper closes itself when a subscription ends.
136+
final SubscriptionWrapper subscriptionWrapper =
170137
new SubscriptionWrapper(
171-
connection, sendSubscribeOptions, firstMessageSubscribeTimeoutSeconds);
138+
cacheName,
139+
topicName,
140+
connection,
141+
callbacks,
142+
subscriptionState,
143+
firstMessageSubscribeTimeoutSeconds,
144+
subscriptionRetryStrategy);
172145
final CompletableFuture<Void> subscribeFuture = subscriptionWrapper.subscribeWithRetry();
173146
return subscribeFuture.handle(
174147
(v, ex) -> {
175148
if (ex != null) {
176149
return new TopicSubscribeResponse.Error(CacheServiceExceptionMapper.convert(ex));
177150
} else {
178-
sendSubscribeOptions.subscriptionState.setUnsubscribeFn(
179-
subscriptionWrapper::unsubscribe);
180-
return new TopicSubscribeResponse.Subscription(sendSubscribeOptions.subscriptionState);
151+
subscriptionState.setUnsubscribeFn(subscriptionWrapper::unsubscribe);
152+
return new TopicSubscribeResponse.Subscription(subscriptionState);
181153
}
182154
});
183155
}

momento-sdk/src/main/java/momento/sdk/SendSubscribeOptions.java

Lines changed: 0 additions & 151 deletions
This file was deleted.

0 commit comments

Comments
 (0)