@@ -214,14 +214,22 @@ public Flux<BackendMessage> exchange(Predicate<BackendMessage> takeUntil, Publis
214214 Assert .requireNonNull (takeUntil , "takeUntil must not be null" );
215215 Assert .requireNonNull (requests , "requests must not be null" );
216216
217- return this .messageSubscriber .addConversation (takeUntil , requests , this .requests ::next , this ::isConnected );
217+ if (!isConnected ()) {
218+ return Flux .error (this .messageSubscriber .createClientClosedException ());
219+ }
220+
221+ return this .messageSubscriber .addConversation (takeUntil , requests , this ::doSendRequest , this ::isConnected );
218222 }
219223
220224 @ Override
221225 public void send (FrontendMessage message ) {
222226 Assert .requireNonNull (message , "requests must not be null" );
223227
224- this .requests .next (Mono .just (message ));
228+ doSendRequest (Mono .just (message ));
229+ }
230+
231+ private void doSendRequest (Publisher <FrontendMessage > it ) {
232+ this .requests .next (it );
225233 }
226234
227235 private Mono <Void > resumeError (Throwable throwable ) {
@@ -695,7 +703,7 @@ public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil,
695703 sink .onRequest (value -> onRequest (conversation , value ));
696704
697705 if (!isConnected .get ()) {
698- sink .error (new PostgresConnectionClosedException ( "Cannot exchange messages because the connection is closed" ));
706+ sink .error (createClientClosedException ( ));
699707 return ;
700708 }
701709
@@ -708,12 +716,15 @@ public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil,
708716 sender .accept (requestMessages );
709717 } else {
710718 sink .error (new RequestQueueException ("Cannot exchange messages because the request queue limit is exceeded" ));
711-
712719 }
713720 }
714721 });
715722 }
716723
724+ PostgresConnectionClosedException createClientClosedException () {
725+ return new PostgresConnectionClosedException ("Cannot exchange messages because the connection is closed" );
726+ }
727+
717728 /**
718729 * {@link Subscription#request(long)} callback. Request more for a {@link Conversation}. Potentially, demands also more upstream elements.
719730 *
0 commit comments