5454import reactor .core .publisher .Sinks ;
5555import reactor .core .scheduler .Schedulers ;
5656import reactor .netty .Connection ;
57+ import reactor .netty .channel .AbortedException ;
5758import reactor .netty .tcp .TcpClient ;
5859import reactor .util .Logger ;
5960import reactor .util .Loggers ;
@@ -496,6 +497,11 @@ private void handleClose() {
496497 }
497498
498499 private void handleConnectionError (Throwable error ) {
500+
501+ if (AbortedException .isConnectionReset (error ) && !isConnected ()) {
502+ drainError (() -> this .messageSubscriber .createClientClosedException (error ));
503+ }
504+
499505 drainError (() -> new PostgresConnectionException (error ));
500506 }
501507
@@ -535,6 +541,10 @@ public PostgresConnectionClosedException(String reason) {
535541 super (reason );
536542 }
537543
544+ public PostgresConnectionClosedException (String reason , @ Nullable Throwable cause ) {
545+ super (reason , cause );
546+ }
547+
538548 }
539549
540550 static class PostgresConnectionException extends R2dbcNonTransientResourceException {
@@ -670,7 +680,7 @@ private class BackendMessageSubscriber implements CoreSubscriber<BackendMessage>
670680
671681 private Subscription upstream ;
672682
673- public Flux <BackendMessage > addConversation (Predicate <BackendMessage > takeUntil , Publisher <FrontendMessage > requests , Consumer <Flux <FrontendMessage >> sender ,
683+ public Flux <BackendMessage > addConversation (Predicate <BackendMessage > takeUntil , Publisher <FrontendMessage > requests , Consumer <Publisher <FrontendMessage >> sender ,
674684 Supplier <Boolean > isConnected ) {
675685
676686 return Flux .create (sink -> {
@@ -688,13 +698,7 @@ public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil,
688698 return ;
689699 }
690700
691- Flux <FrontendMessage > requestMessages = Flux .from (requests ).doOnNext (m -> {
692- if (!isConnected .get ()) {
693- sink .error (new PostgresConnectionClosedException ("Cannot exchange messages because the connection is closed" ));
694- }
695- });
696-
697- sender .accept (requestMessages );
701+ sender .accept (requests );
698702 } else {
699703 sink .error (new RequestQueueException ("Cannot exchange messages because the request queue limit is exceeded" ));
700704 }
@@ -703,7 +707,11 @@ public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil,
703707 }
704708
705709 PostgresConnectionClosedException createClientClosedException () {
706- return new PostgresConnectionClosedException ("Cannot exchange messages because the connection is closed" );
710+ return createClientClosedException (null );
711+ }
712+
713+ PostgresConnectionClosedException createClientClosedException (@ Nullable Throwable cause ) {
714+ return new PostgresConnectionClosedException ("Cannot exchange messages because the connection is closed" , cause );
707715 }
708716
709717 /**
0 commit comments