2727import  org .junit .jupiter .api .Test ;
2828import  org .junit .jupiter .api .extension .RegisterExtension ;
2929import  org .testcontainers .containers .PostgreSQLContainer ;
30+ import  reactor .core .publisher .Flux ;
3031import  reactor .core .publisher .Mono ;
32+ import  reactor .netty .DisposableChannel ;
33+ import  reactor .netty .DisposableServer ;
34+ import  reactor .netty .tcp .TcpServer ;
3135import  reactor .test .StepVerifier ;
3236
3337import  static  org .assertj .core .api .Assertions .assertThat ;
@@ -119,6 +123,53 @@ void testTargetPreferSecondaryConnectedToStandby() {
119123            .verifyComplete ();
120124    }
121125
126+     @ Test 
127+     void  testTargetPreferSecondaryConnectedToMasterOnStandbyFailure () {
128+         DisposableServer  failingServer  = newServer ();
129+         try  {
130+             isConnectedToPrimary (MultiHostConnectionStrategy .TargetServerType .PREFER_SECONDARY , SERVERS .getPrimary (), failingServer )
131+                 .as (StepVerifier ::create )
132+                 .expectNext (true )
133+                 .verifyComplete ();
134+         } finally  {
135+             failingServer .dispose ();
136+         }
137+     }
138+ 
139+     @ Test 
140+     void  testMultipleCallsWithTargetPreferSecondaryConnectedToStandby () {
141+         PostgresqlConnectionFactory  connectionFactory  = this .multiHostConnectionFactory (MultiHostConnectionStrategy .TargetServerType .PREFER_SECONDARY , SERVERS .getPrimary (), SERVERS .getStandby ());
142+ 
143+         Mono <Boolean > allocator  = Mono .usingWhen (connectionFactory .create (), this ::isPrimary , Connection ::close );
144+         Flux <Boolean > connectionPool  = Flux .merge (allocator , allocator );
145+ 
146+         connectionPool 
147+             .as (StepVerifier ::create )
148+             .expectNext (false )
149+             .expectNext (false )
150+             .verifyComplete ();
151+     }
152+ 
153+     @ Test 
154+     void  testMultipleCallsWithTargetPreferSecondaryConnectedToMasterOnStandbyFailure () {
155+         DisposableServer  failingServer  = newServer ();
156+         try  {
157+             PostgresqlConnectionFactory  connectionFactory  = this .multiHostConnectionFactoryWithFailingServer (MultiHostConnectionStrategy .TargetServerType .PREFER_SECONDARY , SERVERS .getPrimary (),
158+                 failingServer );
159+ 
160+             Mono <Boolean > allocator  = Mono .usingWhen (connectionFactory .create (), this ::isPrimary , Connection ::close );
161+             Flux <Boolean > connectionPool  = Flux .merge (allocator , allocator );
162+ 
163+             connectionPool 
164+                 .as (StepVerifier ::create )
165+                 .expectNext (true )
166+                 .expectNext (true )
167+                 .verifyComplete ();
168+         } finally  {
169+             failingServer .dispose ();
170+         }
171+     }
172+ 
122173    @ Test 
123174    void  testTargetPrimaryChoosePrimary () {
124175        isConnectedToPrimary (MultiHostConnectionStrategy .TargetServerType .PRIMARY , SERVERS .getPrimary (), SERVERS .getStandby ())
@@ -181,6 +232,12 @@ private Mono<Boolean> isConnectedToPrimary(MultiHostConnectionStrategy.TargetSer
181232        return  Mono .usingWhen (connectionFactory .create (), this ::isPrimary , Connection ::close );
182233    }
183234
235+     private  Mono <Boolean > isConnectedToPrimary (MultiHostConnectionStrategy .TargetServerType  targetServerType , PostgreSQLContainer <?> primaryServer , DisposableServer  failingServer ) {
236+         PostgresqlConnectionFactory  connectionFactory  = this .multiHostConnectionFactoryWithFailingServer (targetServerType , primaryServer , failingServer );
237+ 
238+         return  Mono .usingWhen (connectionFactory .create (), this ::isPrimary , Connection ::close );
239+     }
240+ 
184241    private  Mono <Boolean > isPrimary (PostgresqlConnection  connection ) {
185242        return  connection .createStatement ("SHOW TRANSACTION_READ_ONLY" )
186243            .execute ()
@@ -203,4 +260,25 @@ private PostgresqlConnectionFactory multiHostConnectionFactory(MultiHostConnecti
203260        return  new  PostgresqlConnectionFactory (configuration );
204261    }
205262
263+     private  PostgresqlConnectionFactory  multiHostConnectionFactoryWithFailingServer (MultiHostConnectionStrategy .TargetServerType  targetServerType , PostgreSQLContainer <?> primaryServer ,
264+                                                                                     DisposableServer  failingServer ) {
265+         PostgresqlConnectionConfiguration .Builder  builder  = PostgresqlConnectionConfiguration .builder ();
266+         builder .addHost (primaryServer .getHost (), primaryServer .getMappedPort (5432 ));
267+         builder .addHost (failingServer .host (), failingServer .port ());
268+ 
269+         PostgresqlConnectionConfiguration  configuration  = builder 
270+             .targetServerType (targetServerType )
271+             .username (primaryServer .getUsername ())
272+             .password (primaryServer .getPassword ())
273+             .build ();
274+         return  new  PostgresqlConnectionFactory (configuration );
275+     }
276+ 
277+     // Simulate server downtime, where connections are accepted and then closed immediately 
278+     static  DisposableServer  newServer () {
279+         return  TcpServer .create ()
280+             .doOnConnection (DisposableChannel ::dispose )
281+             .bindNow ();
282+     }
283+ 
206284}
0 commit comments