1515 */
1616package io .axoniq .axonserver .connector .event .impl ;
1717
18+ import io .axoniq .axonserver .connector .AxonServerException ;
19+ import io .axoniq .axonserver .connector .ErrorCategory ;
1820import io .axoniq .axonserver .connector .event .PersistentStream ;
1921import io .axoniq .axonserver .connector .event .PersistentStreamCallbacks ;
2022import io .axoniq .axonserver .connector .event .PersistentStreamSegment ;
2729import io .axoniq .axonserver .grpc .streams .StreamSignal ;
2830import io .grpc .stub .ClientCallStreamObserver ;
2931import io .grpc .stub .ClientResponseObserver ;
32+ import io .grpc .stub .StreamObserver ;
3033import org .slf4j .Logger ;
3134import org .slf4j .LoggerFactory ;
3235
4144import java .util .function .Consumer ;
4245import javax .annotation .Nullable ;
4346
47+ import static io .axoniq .axonserver .connector .impl .ObjectUtils .doIfNotNull ;
48+
4449/**
4550 * Implementation of the {@link PersistentStream}.
4651 */
@@ -120,6 +125,21 @@ public void openConnection(InitializationProperties initializationProperties) {
120125 outboundStreamHolder .get ().onNext (StreamRequest .newBuilder ().setOpen (openRequest .build ()).build ());
121126 }
122127
128+ /**
129+ * Close this stream and signal downstream consumer that the stream is closed "erroneously" in order to trigger
130+ * reconnect mechanisms. This is to ensure that consumers of this stream can distinguish between regularly closed
131+ * streams, and those closed with the intent to re-establish a connection.
132+ */
133+ public void triggerReconnect () {
134+ // first, close gracefully
135+ close ();
136+ AxonServerException reconnectRequested = new AxonServerException (ErrorCategory .OTHER ,
137+ "Client initiated reconnect" ,
138+ "client" );
139+ // notify clients that the connection "failed" and should be reconnected
140+ onClosedCallback .get ().accept (reconnectRequested );
141+ }
142+
123143 @ Override
124144 public void close () {
125145 if (closed .compareAndSet (false , true )) {
@@ -175,8 +195,8 @@ private BufferedPersistentStreamSegment getPersistentStreamSegment(int segmentNr
175195 segmentNr ,
176196 bufferSize ,
177197 refillBatch ,
178- progress -> acknowledge (s ,progress ),
179- error -> sendError (s ,error ));
198+ progress -> acknowledge (s , progress ),
199+ error -> sendError (s , error ));
180200 stream .beforeStart (outboundStreamHolder .get ());
181201 stream .enableFlowControl ();
182202 return stream ;
@@ -191,27 +211,31 @@ private BufferedPersistentStreamSegment getPersistentStreamSegment(int segmentNr
191211 }
192212
193213 private void acknowledge (int segment , long progress ) {
194- outboundStreamHolder .get ().onNext (StreamRequest .newBuilder ()
195- .setAcknowledgeProgress (ProgressAcknowledgement .newBuilder ()
196- .setSegment (
197- segment )
198- .setPosition (
199- progress )
200- .build ())
201- .build ());
214+ try {
215+ doIfNotNull (outboundStreamHolder .get (), call -> call .onNext (
216+ StreamRequest .newBuilder ()
217+ .setAcknowledgeProgress (ProgressAcknowledgement .newBuilder ()
218+ .setSegment (segment )
219+ .setPosition (progress )
220+ .build ())
221+ .build ()));
222+ } catch (Exception e ) {
223+ logger .debug ("Failed to send acknowledgement." , e );
224+ }
202225 if (progress == PersistentStreamSegment .PENDING_WORK_DONE_MARKER ) {
203226 logger .info ("{}: Close confirmed for segment {}" , streamId , segment );
204227 closeConfirmationsSent .add (segment );
205228 }
206229 }
207230
208231 private void sendError (int segment , String error ) {
209- outboundStreamHolder .get ().onNext (StreamRequest .newBuilder ()
210- .setError (SegmentError .newBuilder ()
211- .setSegment (segment )
212- .setError (error )
213- .build ())
214- .build ());
232+ doIfNotNull (outboundStreamHolder .get (),
233+ osh -> osh .onNext (StreamRequest .newBuilder ()
234+ .setError (SegmentError .newBuilder ()
235+ .setSegment (segment )
236+ .setError (error )
237+ .build ())
238+ .build ()));
215239 }
216240
217241 @ Override
@@ -228,7 +252,7 @@ public void onCompleted() {
228252
229253 private void sendCompleted () {
230254 try {
231- outboundStreamHolder .get (). onCompleted ( );
255+ doIfNotNull ( outboundStreamHolder .getAndSet ( null ), StreamObserver :: onCompleted );
232256 } catch (Exception ex ) {
233257 // Ignore exception
234258 }
0 commit comments