2020import io .r2dbc .postgresql .PostgresqlConnectionConfiguration ;
2121import io .r2dbc .postgresql .PostgresqlConnectionFactory ;
2222import io .r2dbc .postgresql .api .PostgresqlConnection ;
23+ import io .r2dbc .postgresql .util .Disposable ;
2324import io .r2dbc .postgresql .util .PostgresqlHighAvailabilityClusterExtension ;
2425import io .r2dbc .spi .Connection ;
2526import io .r2dbc .spi .R2dbcException ;
2930import org .testcontainers .containers .PostgreSQLContainer ;
3031import reactor .core .publisher .Flux ;
3132import reactor .core .publisher .Mono ;
32- import reactor .netty .DisposableChannel ;
33- import reactor .netty .DisposableServer ;
34- import reactor .netty .tcp .TcpServer ;
3533import reactor .test .StepVerifier ;
3634
35+ import java .net .InetSocketAddress ;
36+ import java .util .function .Consumer ;
37+
3738import static org .assertj .core .api .Assertions .assertThat ;
3839
3940/**
@@ -52,7 +53,7 @@ void testPrimaryAndStandbyStartup() {
5253
5354 @ Test
5455 void testMultipleCallsOnSameFactory () {
55- PostgresqlConnectionFactory connectionFactory = this .multiHostConnectionFactory (MultiHostConnectionStrategy .TargetServerType .PREFER_SECONDARY , SERVERS .getPrimary (), SERVERS .getStandby ());
56+ PostgresqlConnectionFactory connectionFactory = this .configure (MultiHostConnectionStrategy .TargetServerType .PREFER_SECONDARY , SERVERS .getPrimary (), SERVERS .getStandby ());
5657
5758 Mono .usingWhen (connectionFactory .create (), this ::isPrimary , Connection ::close )
5859 .as (StepVerifier ::create )
@@ -124,21 +125,16 @@ void testTargetPreferSecondaryConnectedToStandby() {
124125 }
125126
126127 @ Test
127- void testTargetPreferSecondaryConnectedToMasterOnStandbyFailure () {
128- DisposableServer failingServer = newServer ();
129- try {
130- isConnectedToPrimary (MultiHostConnectionStrategy .TargetServerType .PREFER_SECONDARY , SERVERS .getPrimary (), failingServer )
131- .as (StepVerifier ::create )
132- .expectNext (true )
133- .verifyComplete ();
134- } finally {
135- failingServer .dispose ();
136- }
128+ void testTargetPreferSecondaryConnectedToMasterOnStandbyFailure (@ Disposable InetSocketAddress faulty ) {
129+ isConnectedToPrimary (MultiHostConnectionStrategy .TargetServerType .PREFER_SECONDARY , SERVERS .getPrimary (), faulty )
130+ .as (StepVerifier ::create )
131+ .expectNext (true )
132+ .verifyComplete ();
137133 }
138134
139135 @ Test
140136 void testMultipleCallsWithTargetPreferSecondaryConnectedToStandby () {
141- PostgresqlConnectionFactory connectionFactory = this .multiHostConnectionFactory (MultiHostConnectionStrategy .TargetServerType .PREFER_SECONDARY , SERVERS .getPrimary (), SERVERS .getStandby ());
137+ PostgresqlConnectionFactory connectionFactory = this .configure (MultiHostConnectionStrategy .TargetServerType .PREFER_SECONDARY , SERVERS .getPrimary (), SERVERS .getStandby ());
142138
143139 Mono <Boolean > allocator = Mono .usingWhen (connectionFactory .create (), this ::isPrimary , Connection ::close );
144140 Flux <Boolean > connectionPool = Flux .merge (allocator , allocator );
@@ -151,23 +147,28 @@ void testMultipleCallsWithTargetPreferSecondaryConnectedToStandby() {
151147 }
152148
153149 @ Test
154- void testMultipleCallsWithTargetPreferSecondaryConnectedToMasterOnStandbyFailure () {
155- DisposableServer failingServer = newServer ();
156- try {
157- PostgresqlConnectionFactory connectionFactory = this .multiHostConnectionFactoryWithFailingServer (MultiHostConnectionStrategy .TargetServerType .PREFER_SECONDARY , SERVERS .getPrimary (),
158- failingServer );
159-
160- Mono <Boolean > allocator = Mono .usingWhen (connectionFactory .create (), this ::isPrimary , Connection ::close );
161- Flux <Boolean > connectionPool = Flux .merge (allocator , allocator );
162-
163- connectionPool
164- .as (StepVerifier ::create )
165- .expectNext (true )
166- .expectNext (true )
167- .verifyComplete ();
168- } finally {
169- failingServer .dispose ();
170- }
150+ void testAllFaulty (@ Disposable InetSocketAddress faulty1 , @ Disposable InetSocketAddress faulty2 ) {
151+ PostgresqlConnectionFactory connectionFactory = this .configure (MultiHostConnectionStrategy .TargetServerType .SECONDARY , SERVERS .getPrimary (),
152+ faulty1 , faulty2 );
153+
154+ connectionFactory .create ()
155+ .as (StepVerifier ::create )
156+ .expectError (R2dbcNonTransientResourceException .class );
157+ }
158+
159+ @ Test
160+ void testMultipleCallsWithTargetPreferSecondaryConnectedToMasterOnStandbyFailure (@ Disposable InetSocketAddress faulty ) {
161+ PostgresqlConnectionFactory connectionFactory = this .configure (MultiHostConnectionStrategy .TargetServerType .PREFER_SECONDARY , SERVERS .getPrimary (),
162+ faulty );
163+
164+ Mono <Boolean > allocator = Mono .usingWhen (connectionFactory .create (), this ::isPrimary , Connection ::close );
165+ Flux <Boolean > connectionPool = Flux .merge (allocator , allocator );
166+
167+ connectionPool
168+ .as (StepVerifier ::create )
169+ .expectNext (true )
170+ .expectNext (true )
171+ .verifyComplete ();
171172 }
172173
173174 @ Test
@@ -227,13 +228,13 @@ void testTargetSecondaryFailedOnPrimary() {
227228 }
228229
229230 private Mono <Boolean > isConnectedToPrimary (MultiHostConnectionStrategy .TargetServerType targetServerType , PostgreSQLContainer <?>... servers ) {
230- PostgresqlConnectionFactory connectionFactory = this .multiHostConnectionFactory (targetServerType , servers );
231+ PostgresqlConnectionFactory connectionFactory = this .configure (targetServerType , servers );
231232
232233 return Mono .usingWhen (connectionFactory .create (), this ::isPrimary , Connection ::close );
233234 }
234235
235- private Mono <Boolean > isConnectedToPrimary (MultiHostConnectionStrategy .TargetServerType targetServerType , PostgreSQLContainer <?> primaryServer , DisposableServer failingServer ) {
236- PostgresqlConnectionFactory connectionFactory = this .multiHostConnectionFactoryWithFailingServer (targetServerType , primaryServer , failingServer );
236+ private Mono <Boolean > isConnectedToPrimary (MultiHostConnectionStrategy .TargetServerType targetServerType , PostgreSQLContainer <?> primaryServer , InetSocketAddress failingServer ) {
237+ PostgresqlConnectionFactory connectionFactory = this .configure (targetServerType , primaryServer , failingServer );
237238
238239 return Mono .usingWhen (connectionFactory .create (), this ::isPrimary , Connection ::close );
239240 }
@@ -246,25 +247,36 @@ private Mono<Boolean> isPrimary(PostgresqlConnection connection) {
246247 .next ();
247248 }
248249
249- private PostgresqlConnectionFactory multiHostConnectionFactory (MultiHostConnectionStrategy .TargetServerType targetServerType , PostgreSQLContainer <?>... servers ) {
250- PostgreSQLContainer <?> firstServer = servers [0 ];
251- PostgresqlConnectionConfiguration .Builder builder = PostgresqlConnectionConfiguration .builder ();
252- for (PostgreSQLContainer <?> server : servers ) {
253- builder .addHost (server .getHost (), server .getMappedPort (5432 ));
254- }
255- PostgresqlConnectionConfiguration configuration = builder
256- .targetServerType (targetServerType )
257- .username (firstServer .getUsername ())
258- .password (firstServer .getPassword ())
259- .build ();
260- return new PostgresqlConnectionFactory (configuration );
250+ private PostgresqlConnectionFactory configure (MultiHostConnectionStrategy .TargetServerType targetServerType , PostgreSQLContainer <?>... servers ) {
251+ return configure (targetServerType , servers [0 ], builder -> {
252+
253+
254+ for (PostgreSQLContainer <?> server : servers ) {
255+
256+ if (server == servers [0 ]) {
257+ continue ;
258+ }
259+ builder .addHost (server .getHost (), server .getMappedPort (5432 ));
260+ }
261+ });
262+ }
263+
264+ private PostgresqlConnectionFactory configure (MultiHostConnectionStrategy .TargetServerType targetServerType , PostgreSQLContainer <?> primaryServer ,
265+ InetSocketAddress ... addresses ) {
266+
267+ return configure (targetServerType , primaryServer , builder -> {
268+
269+ for (InetSocketAddress address : addresses ) {
270+ builder .addHost (address .getHostName (), address .getPort ());
271+ }
272+ });
261273 }
262274
263- private PostgresqlConnectionFactory multiHostConnectionFactoryWithFailingServer (MultiHostConnectionStrategy .TargetServerType targetServerType , PostgreSQLContainer <?> primaryServer ,
264- DisposableServer failingServer ) {
275+ private PostgresqlConnectionFactory configure (MultiHostConnectionStrategy .TargetServerType targetServerType , PostgreSQLContainer <?> primaryServer ,
276+ Consumer < PostgresqlConnectionConfiguration . Builder > builderCustomizer ) {
265277 PostgresqlConnectionConfiguration .Builder builder = PostgresqlConnectionConfiguration .builder ();
266278 builder .addHost (primaryServer .getHost (), primaryServer .getMappedPort (5432 ));
267- builder . addHost ( failingServer . host (), failingServer . port () );
279+ builderCustomizer . accept ( builder );
268280
269281 PostgresqlConnectionConfiguration configuration = builder
270282 .targetServerType (targetServerType )
@@ -274,11 +286,4 @@ private PostgresqlConnectionFactory multiHostConnectionFactoryWithFailingServer(
274286 return new PostgresqlConnectionFactory (configuration );
275287 }
276288
277- // Simulate server downtime, where connections are accepted and then closed immediately
278- static DisposableServer newServer () {
279- return TcpServer .create ()
280- .doOnConnection (DisposableChannel ::dispose )
281- .bindNow ();
282- }
283-
284289}
0 commit comments