diff --git a/src/main/java/io/axoniq/axonserver/connector/event/impl/BufferedPersistentStreamSegment.java b/src/main/java/io/axoniq/axonserver/connector/event/impl/BufferedPersistentStreamSegment.java index b5c6649b..20312287 100644 --- a/src/main/java/io/axoniq/axonserver/connector/event/impl/BufferedPersistentStreamSegment.java +++ b/src/main/java/io/axoniq/axonserver/connector/event/impl/BufferedPersistentStreamSegment.java @@ -113,7 +113,6 @@ public int segment() { @Override public void close() { - super.close(); if (closed.compareAndSet(false, true)) { logger.info("{}: Close segment {}", streamId, segment); localOnAvailableCallback.run(); diff --git a/src/main/java/io/axoniq/axonserver/connector/event/impl/EventChannelImpl.java b/src/main/java/io/axoniq/axonserver/connector/event/impl/EventChannelImpl.java index 6a174de2..1f24f2d9 100644 --- a/src/main/java/io/axoniq/axonserver/connector/event/impl/EventChannelImpl.java +++ b/src/main/java/io/axoniq/axonserver/connector/event/impl/EventChannelImpl.java @@ -129,18 +129,20 @@ public synchronized void connect() { @Override public void reconnect() { closeEventStreams(); + persistedEventStreams.forEach(PersistentStreamImpl::triggerReconnect); + persistedEventStreams.clear(); } @Override public void disconnect() { closeEventStreams(); + persistedEventStreams.forEach(PersistentStreamImpl::close); + persistedEventStreams.clear(); } private void closeEventStreams() { buffers.forEach(BufferedEventStream::close); buffers.clear(); - persistedEventStreams.forEach(PersistentStreamImpl::close); - persistedEventStreams.clear(); } @Override diff --git a/src/main/java/io/axoniq/axonserver/connector/event/impl/PersistentStreamImpl.java b/src/main/java/io/axoniq/axonserver/connector/event/impl/PersistentStreamImpl.java index 7e69f39a..f080085e 100644 --- a/src/main/java/io/axoniq/axonserver/connector/event/impl/PersistentStreamImpl.java +++ b/src/main/java/io/axoniq/axonserver/connector/event/impl/PersistentStreamImpl.java @@ -15,6 +15,8 @@ */ package io.axoniq.axonserver.connector.event.impl; +import io.axoniq.axonserver.connector.AxonServerException; +import io.axoniq.axonserver.connector.ErrorCategory; import io.axoniq.axonserver.connector.event.PersistentStream; import io.axoniq.axonserver.connector.event.PersistentStreamCallbacks; import io.axoniq.axonserver.connector.event.PersistentStreamSegment; @@ -27,6 +29,7 @@ import io.axoniq.axonserver.grpc.streams.StreamSignal; import io.grpc.stub.ClientCallStreamObserver; import io.grpc.stub.ClientResponseObserver; +import io.grpc.stub.StreamObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +44,8 @@ import java.util.function.Consumer; import javax.annotation.Nullable; +import static io.axoniq.axonserver.connector.impl.ObjectUtils.doIfNotNull; + /** * Implementation of the {@link PersistentStream}. */ @@ -120,6 +125,21 @@ public void openConnection(InitializationProperties initializationProperties) { outboundStreamHolder.get().onNext(StreamRequest.newBuilder().setOpen(openRequest.build()).build()); } + /** + * Close this stream and signal downstream consumer that the stream is closed "erroneously" in order to trigger + * reconnect mechanisms. This is to ensure that consumers of this stream can distinguish between regularly closed + * streams, and those closed with the intent to re-establish a connection. + */ + public void triggerReconnect() { + // first, close gracefully + close(); + AxonServerException reconnectRequested = new AxonServerException(ErrorCategory.OTHER, + "Client initiated reconnect", + "client"); + // notify clients that the connection "failed" and should be reconnected + onClosedCallback.get().accept(reconnectRequested); + } + @Override public void close() { if (closed.compareAndSet(false, true)) { @@ -175,8 +195,8 @@ private BufferedPersistentStreamSegment getPersistentStreamSegment(int segmentNr segmentNr, bufferSize, refillBatch, - progress -> acknowledge(s,progress), - error -> sendError(s,error)); + progress -> acknowledge(s, progress), + error -> sendError(s, error)); stream.beforeStart(outboundStreamHolder.get()); stream.enableFlowControl(); return stream; @@ -191,14 +211,17 @@ private BufferedPersistentStreamSegment getPersistentStreamSegment(int segmentNr } private void acknowledge(int segment, long progress) { - outboundStreamHolder.get().onNext(StreamRequest.newBuilder() - .setAcknowledgeProgress(ProgressAcknowledgement.newBuilder() - .setSegment( - segment) - .setPosition( - progress) - .build()) - .build()); + try { + doIfNotNull(outboundStreamHolder.get(), call -> call.onNext( + StreamRequest.newBuilder() + .setAcknowledgeProgress(ProgressAcknowledgement.newBuilder() + .setSegment(segment) + .setPosition(progress) + .build()) + .build())); + } catch (Exception e) { + logger.debug("Failed to send acknowledgement.", e); + } if (progress == PersistentStreamSegment.PENDING_WORK_DONE_MARKER) { logger.info("{}: Close confirmed for segment {}", streamId, segment); closeConfirmationsSent.add(segment); @@ -206,12 +229,13 @@ private void acknowledge(int segment, long progress) { } private void sendError(int segment, String error) { - outboundStreamHolder.get().onNext(StreamRequest.newBuilder() - .setError(SegmentError.newBuilder() - .setSegment(segment) - .setError(error) - .build()) - .build()); + doIfNotNull(outboundStreamHolder.get(), + osh -> osh.onNext(StreamRequest.newBuilder() + .setError(SegmentError.newBuilder() + .setSegment(segment) + .setError(error) + .build()) + .build())); } @Override @@ -228,7 +252,7 @@ public void onCompleted() { private void sendCompleted() { try { - outboundStreamHolder.get().onCompleted(); + doIfNotNull(outboundStreamHolder.getAndSet(null), StreamObserver::onCompleted); } catch (Exception ex) { // Ignore exception }