@@ -210,14 +210,22 @@ public Flux<BackendMessage> exchange(Predicate<BackendMessage> takeUntil, Publis
210210 Assert .requireNonNull (takeUntil , "takeUntil must not be null" );
211211 Assert .requireNonNull (requests , "requests must not be null" );
212212
213- return this .messageSubscriber .addConversation (takeUntil , requests , it -> this .requestSink .emitNext (it , Sinks .EmitFailureHandler .FAIL_FAST ), this ::isConnected );
213+ if (!isConnected ()) {
214+ return Flux .error (this .messageSubscriber .createClientClosedException ());
215+ }
216+
217+ return this .messageSubscriber .addConversation (takeUntil , requests , this ::doSendRequest , this ::isConnected );
214218 }
215219
216220 @ Override
217221 public void send (FrontendMessage message ) {
218222 Assert .requireNonNull (message , "requests must not be null" );
219223
220- this .requestSink .emitNext (Mono .just (message ), Sinks .EmitFailureHandler .FAIL_FAST );
224+ doSendRequest (Mono .just (message ));
225+ }
226+
227+ private void doSendRequest (Publisher <FrontendMessage > it ) {
228+ this .requestSink .emitNext (it , Sinks .EmitFailureHandler .FAIL_FAST );
221229 }
222230
223231 private Mono <Void > resumeError (Throwable throwable ) {
@@ -676,7 +684,7 @@ public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil,
676684 sink .onRequest (value -> onRequest (conversation , value ));
677685
678686 if (!isConnected .get ()) {
679- sink .error (new PostgresConnectionClosedException ( "Cannot exchange messages because the connection is closed" ));
687+ sink .error (createClientClosedException ( ));
680688 return ;
681689 }
682690
@@ -689,12 +697,15 @@ public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil,
689697 sender .accept (requestMessages );
690698 } else {
691699 sink .error (new RequestQueueException ("Cannot exchange messages because the request queue limit is exceeded" ));
692-
693700 }
694701 }
695702 });
696703 }
697704
705+ PostgresConnectionClosedException createClientClosedException () {
706+ return new PostgresConnectionClosedException ("Cannot exchange messages because the connection is closed" );
707+ }
708+
698709 /**
699710 * {@link Subscription#request(long)} callback. Request more for a {@link Conversation}. Potentially, demands also more upstream elements.
700711 *
0 commit comments