Skip to content

Commit bd74f95

Browse files
committed
PersistentStreams must report error on reconnect request
By reporting an error to downstream consumers, they are notified of the fact that the disconnect didn't happen for normal reasons, and the stream should be reopened if the application wishes to continue processing events.
1 parent b7f5f80 commit bd74f95

File tree

3 files changed

+40
-20
lines changed

3 files changed

+40
-20
lines changed

src/main/java/io/axoniq/axonserver/connector/event/impl/BufferedPersistentStreamSegment.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,6 @@ public int segment() {
113113

114114
@Override
115115
public void close() {
116-
super.close();
117116
if (closed.compareAndSet(false, true)) {
118117
logger.info("{}: Close segment {}", streamId, segment);
119118
localOnAvailableCallback.run();

src/main/java/io/axoniq/axonserver/connector/event/impl/EventChannelImpl.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,18 +129,20 @@ public synchronized void connect() {
129129
@Override
130130
public void reconnect() {
131131
closeEventStreams();
132+
persistedEventStreams.forEach(PersistentStreamImpl::triggerReconnect);
133+
persistedEventStreams.clear();
132134
}
133135

134136
@Override
135137
public void disconnect() {
136138
closeEventStreams();
139+
persistedEventStreams.forEach(PersistentStreamImpl::close);
140+
persistedEventStreams.clear();
137141
}
138142

139143
private void closeEventStreams() {
140144
buffers.forEach(BufferedEventStream::close);
141145
buffers.clear();
142-
persistedEventStreams.forEach(PersistentStreamImpl::close);
143-
persistedEventStreams.clear();
144146
}
145147

146148
@Override

src/main/java/io/axoniq/axonserver/connector/event/impl/PersistentStreamImpl.java

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package io.axoniq.axonserver.connector.event.impl;
1717

18+
import io.axoniq.axonserver.connector.AxonServerException;
19+
import io.axoniq.axonserver.connector.ErrorCategory;
1820
import io.axoniq.axonserver.connector.event.PersistentStream;
1921
import io.axoniq.axonserver.connector.event.PersistentStreamCallbacks;
2022
import io.axoniq.axonserver.connector.event.PersistentStreamSegment;
@@ -27,6 +29,7 @@
2729
import io.axoniq.axonserver.grpc.streams.StreamSignal;
2830
import io.grpc.stub.ClientCallStreamObserver;
2931
import io.grpc.stub.ClientResponseObserver;
32+
import io.grpc.stub.StreamObserver;
3033
import org.slf4j.Logger;
3134
import org.slf4j.LoggerFactory;
3235

@@ -41,6 +44,8 @@
4144
import java.util.function.Consumer;
4245
import javax.annotation.Nullable;
4346

47+
import static io.axoniq.axonserver.connector.impl.ObjectUtils.doIfNotNull;
48+
4449
/**
4550
* Implementation of the {@link PersistentStream}.
4651
*/
@@ -120,6 +125,16 @@ public void openConnection(InitializationProperties initializationProperties) {
120125
outboundStreamHolder.get().onNext(StreamRequest.newBuilder().setOpen(openRequest.build()).build());
121126
}
122127

128+
public void triggerReconnect() {
129+
// first, close gracefully
130+
close();
131+
AxonServerException reconnectRequested = new AxonServerException(ErrorCategory.OTHER,
132+
"Client initiated reconnect",
133+
"client");
134+
// notify clients that the connection "failed" and should be reconnected
135+
onClosedCallback.get().accept(reconnectRequested);
136+
}
137+
123138
@Override
124139
public void close() {
125140
if (closed.compareAndSet(false, true)) {
@@ -175,8 +190,8 @@ private BufferedPersistentStreamSegment getPersistentStreamSegment(int segmentNr
175190
segmentNr,
176191
bufferSize,
177192
refillBatch,
178-
progress -> acknowledge(s,progress),
179-
error -> sendError(s,error));
193+
progress -> acknowledge(s, progress),
194+
error -> sendError(s, error));
180195
stream.beforeStart(outboundStreamHolder.get());
181196
stream.enableFlowControl();
182197
return stream;
@@ -191,27 +206,31 @@ private BufferedPersistentStreamSegment getPersistentStreamSegment(int segmentNr
191206
}
192207

193208
private void acknowledge(int segment, long progress) {
194-
outboundStreamHolder.get().onNext(StreamRequest.newBuilder()
195-
.setAcknowledgeProgress(ProgressAcknowledgement.newBuilder()
196-
.setSegment(
197-
segment)
198-
.setPosition(
199-
progress)
200-
.build())
201-
.build());
209+
try {
210+
doIfNotNull(outboundStreamHolder.get(), call -> call.onNext(
211+
StreamRequest.newBuilder()
212+
.setAcknowledgeProgress(ProgressAcknowledgement.newBuilder()
213+
.setSegment(segment)
214+
.setPosition(progress)
215+
.build())
216+
.build()));
217+
} catch (Exception e) {
218+
logger.debug("Failed to send acknowledgement.", e);
219+
}
202220
if (progress == PersistentStreamSegment.PENDING_WORK_DONE_MARKER) {
203221
logger.info("{}: Close confirmed for segment {}", streamId, segment);
204222
closeConfirmationsSent.add(segment);
205223
}
206224
}
207225

208226
private void sendError(int segment, String error) {
209-
outboundStreamHolder.get().onNext(StreamRequest.newBuilder()
210-
.setError(SegmentError.newBuilder()
211-
.setSegment(segment)
212-
.setError(error)
213-
.build())
214-
.build());
227+
doIfNotNull(outboundStreamHolder.get(),
228+
osh -> osh.onNext(StreamRequest.newBuilder()
229+
.setError(SegmentError.newBuilder()
230+
.setSegment(segment)
231+
.setError(error)
232+
.build())
233+
.build()));
215234
}
216235

217236
@Override
@@ -228,7 +247,7 @@ public void onCompleted() {
228247

229248
private void sendCompleted() {
230249
try {
231-
outboundStreamHolder.get().onCompleted();
250+
doIfNotNull(outboundStreamHolder.getAndSet(null), StreamObserver::onCompleted);
232251
} catch (Exception ex) {
233252
// Ignore exception
234253
}

0 commit comments

Comments
 (0)