|
21 | 21 | import io.netty.util.ReferenceCountUtil; |
22 | 22 | import io.r2dbc.postgresql.PostgresqlConnectionConfiguration; |
23 | 23 | import io.r2dbc.postgresql.PostgresqlConnectionFactory; |
| 24 | +import io.r2dbc.postgresql.api.ErrorDetails; |
24 | 25 | import io.r2dbc.postgresql.api.PostgresqlConnection; |
| 26 | +import io.r2dbc.postgresql.api.PostgresqlException; |
25 | 27 | import io.r2dbc.postgresql.authentication.PasswordAuthenticationHandler; |
26 | 28 | import io.r2dbc.postgresql.message.Format; |
27 | 29 | import io.r2dbc.postgresql.message.backend.BackendMessage; |
|
37 | 39 | import io.r2dbc.postgresql.message.frontend.FrontendMessage; |
38 | 40 | import io.r2dbc.postgresql.message.frontend.Query; |
39 | 41 | import io.r2dbc.postgresql.message.frontend.Sync; |
| 42 | +import io.r2dbc.postgresql.util.PgBouncer; |
40 | 43 | import io.r2dbc.postgresql.util.PostgresqlServerExtension; |
| 44 | +import io.r2dbc.spi.R2dbcBadGrammarException; |
41 | 45 | import io.r2dbc.spi.R2dbcNonTransientResourceException; |
42 | 46 | import io.r2dbc.spi.R2dbcPermissionDeniedException; |
43 | 47 | import org.junit.jupiter.api.AfterEach; |
@@ -394,29 +398,77 @@ public boolean verify(String s, SSLSession sslSession) { |
394 | 398 |
|
395 | 399 | @Nested |
396 | 400 | @TestInstance(TestInstance.Lifecycle.PER_CLASS) |
397 | | - final class StatementCacheSizeTests { |
| 401 | + final class PgBouncerTests { |
398 | 402 |
|
399 | 403 | @ParameterizedTest |
400 | | - @ValueSource(ints = {0, 2, -1}) |
401 | | - void multiplePreparedStatementsTest(int statementCacheSize) { |
402 | | - PostgresqlConnectionFactory connectionFactory = this.createConnectionFactory(statementCacheSize); |
| 404 | + @ValueSource(strings = {"transaction", "statement"}) |
| 405 | + void disabledCacheWorksWithTransactionAndStatementModes(String poolMode) { |
| 406 | + try (PgBouncer pgBouncer = new PgBouncer(SERVER, poolMode)) { |
| 407 | + PostgresqlConnectionFactory connectionFactory = this.createConnectionFactory(pgBouncer, 0); |
| 408 | + |
| 409 | + connectionFactory.create().flatMapMany(connection -> { |
| 410 | + Flux<Integer> q1 = connection.createStatement("SELECT 1 WHERE $1 = 1").bind(0, 1).execute().flatMap(r -> r.map((row, rowMetadata) -> row.get(0, Integer.class))); |
| 411 | + Flux<Integer> q2 = connection.createStatement("SELECT 2 WHERE $1 = 2").bind(0, 2).execute().flatMap(r -> r.map((row, rowMetadata) -> row.get(0, Integer.class))); |
| 412 | + Flux<Integer> q3 = connection.createStatement("SELECT 3 WHERE $1 = 3").bind(0, 3).execute().flatMap(r -> r.map((row, rowMetadata) -> row.get(0, Integer.class))); |
| 413 | + |
| 414 | + return Flux.concat(q1, q1, q2, q2, q3, q3, connection.close()); |
| 415 | + }) |
| 416 | + .as(StepVerifier::create) |
| 417 | + .expectNext(1, 1, 2, 2, 3, 3) |
| 418 | + .verifyComplete(); |
| 419 | + } |
| 420 | + } |
403 | 421 |
|
404 | | - connectionFactory.create().flatMapMany(connection -> { |
405 | | - Flux<Integer> firstQuery = connection.createStatement("SELECT 1 WHERE $1 = 1").bind(0, 1).execute().flatMap(r -> r.map((row, rowMetadata) -> row.get(0, Integer.class))); |
406 | | - Flux<Integer> secondQuery = connection.createStatement("SELECT 2 WHERE $1 = 2").bind(0, 2).execute().flatMap(r -> r.map((row, rowMetadata) -> row.get(0, Integer.class))); |
407 | | - Flux<Integer> thirdQuery = connection.createStatement("SELECT 3 WHERE $1 = 3").bind(0, 3).execute().flatMap(r -> r.map((row, rowMetadata) -> row.get(0, Integer.class))); |
| 422 | + @ParameterizedTest |
| 423 | + @ValueSource(ints = {-1, 0, 2}) |
| 424 | + void sessionModeWorksWithAllCaches(int statementCacheSize) { |
| 425 | + try (PgBouncer pgBouncer = new PgBouncer(SERVER, "session")) { |
| 426 | + PostgresqlConnectionFactory connectionFactory = this.createConnectionFactory(pgBouncer, statementCacheSize); |
| 427 | + |
| 428 | + connectionFactory.create().flatMapMany(connection -> { |
| 429 | + Flux<Integer> q1 = connection.createStatement("SELECT 1 WHERE $1 = 1").bind(0, 1).execute().flatMap(r -> r.map((row, rowMetadata) -> row.get(0, Integer.class))); |
| 430 | + Flux<Integer> q2 = connection.createStatement("SELECT 2 WHERE $1 = 2").bind(0, 2).execute().flatMap(r -> r.map((row, rowMetadata) -> row.get(0, Integer.class))); |
| 431 | + Flux<Integer> q3 = connection.createStatement("SELECT 3 WHERE $1 = 3").bind(0, 3).execute().flatMap(r -> r.map((row, rowMetadata) -> row.get(0, Integer.class))); |
| 432 | + |
| 433 | + return Flux.concat(q1, q1, q2, q2, q3, q3, connection.close()); |
| 434 | + }) |
| 435 | + .as(StepVerifier::create) |
| 436 | + .expectNext(1, 1, 2, 2, 3, 3) |
| 437 | + .verifyComplete(); |
| 438 | + } |
| 439 | + } |
408 | 440 |
|
409 | | - return Flux.concat(firstQuery, secondQuery, thirdQuery, connection.close()); |
410 | | - }) |
411 | | - .as(StepVerifier::create) |
412 | | - .expectNext(1, 2, 3) |
413 | | - .verifyComplete(); |
| 441 | + @ParameterizedTest |
| 442 | + @ValueSource(strings = {"transaction", "statement"}) |
| 443 | + void statementCacheDoesntWorkWithTransactionAndStatementModes(String poolMode) { |
| 444 | + try (PgBouncer pgBouncer = new PgBouncer(SERVER, poolMode)) { |
| 445 | + PostgresqlConnectionFactory connectionFactory = this.createConnectionFactory(pgBouncer, -1); |
| 446 | + |
| 447 | + connectionFactory.create().flatMapMany(connection -> { |
| 448 | + Flux<Integer> q1 = connection.createStatement("SELECT 1 WHERE $1 = 1").bind(0, 1).execute().flatMap(r -> r.map((row, rowMetadata) -> row.get(0, Integer.class))); |
| 449 | + |
| 450 | + return Flux.concat(q1, q1, connection.close()); |
| 451 | + }) |
| 452 | + .as(StepVerifier::create) |
| 453 | + .expectNext(1) |
| 454 | + .verifyErrorMatches(e -> { |
| 455 | + if (!(e instanceof R2dbcBadGrammarException)) { |
| 456 | + return false; |
| 457 | + } |
| 458 | + if (!(e instanceof PostgresqlException)) { |
| 459 | + return false; |
| 460 | + } |
| 461 | + PostgresqlException pgException = (PostgresqlException) e; |
| 462 | + ErrorDetails errorDetails = pgException.getErrorDetails(); |
| 463 | + return errorDetails.getCode().equals("26000") && errorDetails.getMessage().equals("prepared statement \"S_0\" does not exist"); |
| 464 | + }); |
| 465 | + } |
414 | 466 | } |
415 | 467 |
|
416 | | - private PostgresqlConnectionFactory createConnectionFactory(int statementCacheSize) { |
| 468 | + private PostgresqlConnectionFactory createConnectionFactory(PgBouncer pgBouncer, int statementCacheSize) { |
417 | 469 | return new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder() |
418 | | - .host(SERVER.getHost()) |
419 | | - .port(SERVER.getPort()) |
| 470 | + .host(pgBouncer.getHost()) |
| 471 | + .port(pgBouncer.getPort()) |
420 | 472 | .username(SERVER.getUsername()) |
421 | 473 | .password(SERVER.getPassword()) |
422 | 474 | .database(SERVER.getDatabase()) |
|
0 commit comments