Skip to content

Commit 934f042

Browse files
harry-haomp911de
authored andcommitted
Add Option<LoopResources> tcpLoopResources
Add that to configure the `LoopResources` for tcp sockets. Unix domain socket still use `SocketLoopResources`. [resolves #319]
1 parent f545764 commit 934f042

File tree

6 files changed

+102
-13
lines changed

6 files changed

+102
-13
lines changed

README.md

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,13 @@ Mono<Connection> connectionMono = Mono.from(connectionFactory.create());
6767

6868
| Option | Description
6969
| ----------------- | -----------
70-
| `ssl` | Enables SSL usage (`SSLMode.VERIFY_FULL`)
70+
| `ssl` | Enables SSL usage (`SSLMode.VERIFY_FULL`).
7171
| `driver` | Must be `postgresql`.
72-
| `host` | Server hostname to connect to
72+
| `host` | Server hostname to connect to.
7373
| `port` | Server port to connect to. Defaults to `5432`. _(Optional)_
7474
| `socket` | Unix Domain Socket path to connect to as alternative to TCP. _(Optional)_
75-
| `username` | Login username
76-
| `password` | Login password _(Optional when using TLS Certificate authentication)_
75+
| `username` | Login username.
76+
| `password` | Login password. _(Optional when using TLS Certificate authentication)_
7777
| `database` | Database to select. _(Optional)_
7878
| `applicationName` | The name of the application connecting to the database. Defaults to `r2dbc-postgresql`. _(Optional)_
7979
| `autodetectExtensions` | Whether to auto-detect and register `Extension`s from the class path. Defaults to `true`. _(Optional)_
@@ -89,7 +89,8 @@ Mono<Connection> connectionMono = Mono.from(connectionFactory.create());
8989
| `sslPassword` | Key password to decrypt SSL key. _(Optional)_
9090
| `sslHostnameVerifier` | `javax.net.ssl.HostnameVerifier` implementation. _(Optional)_
9191
| `tcpNoDelay` | Enabled/disable TCP NoDelay. Disabled by default. _(Optional)_
92-
| `tcpKeepAlive` | Enabled/disable TCP KeepAlive. Disabled by default _(Optional)_
92+
| `tcpKeepAlive` | Enabled/disable TCP KeepAlive. Disabled by default. _(Optional)_
93+
| `tcpLoopResources`| TCP LoopResources. _(Optional)_
9394

9495
**Programmatic Configuration**
9596

src/main/java/io/r2dbc/postgresql/PostgresqlConnectionConfiguration.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.r2dbc.postgresql.extension.CodecRegistrar;
2626
import io.r2dbc.postgresql.extension.Extension;
2727
import io.r2dbc.postgresql.util.Assert;
28+
import reactor.netty.resources.LoopResources;
2829
import reactor.netty.tcp.SslProvider;
2930
import reactor.util.annotation.Nullable;
3031

@@ -89,10 +90,12 @@ public final class PostgresqlConnectionConfiguration {
8990

9091
private final boolean tcpNoDelay;
9192

93+
private final LoopResources tcpLoopResources;
94+
9295
private PostgresqlConnectionConfiguration(String applicationName, boolean autodetectExtensions, @Nullable Duration connectTimeout, @Nullable String database, List<Extension> extensions,
9396
ToIntFunction<String> fetchSize, boolean forceBinary, @Nullable String host, @Nullable Map<String, String> options, @Nullable CharSequence password,
9497
int port, int preparedStatementCacheQueries, @Nullable String schema, @Nullable String socket, boolean tcpKeepAlive, boolean tcpNoDelay,
95-
String username, SSLConfig sslConfig) {
98+
String username, SSLConfig sslConfig, LoopResources tcpLoopResources) {
9699
this.applicationName = Assert.requireNonNull(applicationName, "applicationName must not be null");
97100
this.autodetectExtensions = autodetectExtensions;
98101
this.connectTimeout = connectTimeout;
@@ -115,6 +118,7 @@ private PostgresqlConnectionConfiguration(String applicationName, boolean autode
115118
this.sslConfig = sslConfig;
116119
this.tcpKeepAlive = tcpKeepAlive;
117120
this.tcpNoDelay = tcpNoDelay;
121+
this.tcpLoopResources = tcpLoopResources;
118122
}
119123

120124
/**
@@ -244,10 +248,15 @@ boolean isTcpNoDelay() {
244248
boolean isUseSocket() {
245249
return getSocket() != null;
246250
}
251+
247252
SSLConfig getSslConfig() {
248253
return this.sslConfig;
249254
}
250255

256+
LoopResources getTcpLoopResources() {
257+
return this.tcpLoopResources;
258+
}
259+
251260
private static String obfuscate(int length) {
252261

253262
StringBuilder builder = new StringBuilder();
@@ -318,9 +327,12 @@ public static final class Builder {
318327

319328
private Function<SslContextBuilder, SslContextBuilder> sslContextBuilderCustomizer = Function.identity();
320329

321-
private boolean tcpKeepAlive;
330+
private boolean tcpKeepAlive = false;
322331

323-
private boolean tcpNoDelay;
332+
private boolean tcpNoDelay = false;
333+
334+
@Nullable
335+
private LoopResources tcpLoopResources = null;
324336

325337
@Nullable
326338
private String username;
@@ -373,7 +385,7 @@ public PostgresqlConnectionConfiguration build() {
373385
return new PostgresqlConnectionConfiguration(this.applicationName, this.autodetectExtensions, this.connectTimeout, this.database, this.extensions, this.fetchSize, this.forceBinary,
374386
this.host, this.options, this.password, this.port, this.preparedStatementCacheQueries, this.schema, this.socket, this.tcpKeepAlive, this.tcpNoDelay, this.username,
375387
this.createSslConfig()
376-
);
388+
, this.tcpLoopResources);
377389
}
378390

379391
/**
@@ -680,6 +692,18 @@ public Builder username(String username) {
680692
return this;
681693
}
682694

695+
/**
696+
* Configure TCP {@link LoopResources}.
697+
*
698+
* @param loopResources the {@link LoopResources}
699+
* @return this {@link Builder}
700+
* @since 1.0.0
701+
*/
702+
public Builder tcpLoopResources(LoopResources loopResources) {
703+
this.tcpLoopResources = Assert.requireNonNull(loopResources, "tcpLoopResources must not be null");
704+
return this;
705+
}
706+
683707
@Override
684708
public String toString() {
685709
return "Builder{" +

src/main/java/io/r2dbc/postgresql/PostgresqlConnectionFactoryProvider.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.r2dbc.spi.ConnectionFactoryOptions;
2424
import io.r2dbc.spi.ConnectionFactoryProvider;
2525
import io.r2dbc.spi.Option;
26+
import reactor.netty.resources.LoopResources;
2627

2728
import javax.net.ssl.HostnameVerifier;
2829
import java.util.LinkedHashMap;
@@ -139,6 +140,13 @@ public final class PostgresqlConnectionFactoryProvider implements ConnectionFact
139140
*/
140141
public static final Option<Boolean> TCP_NODELAY = Option.valueOf("tcpNoDelay");
141142

143+
/**
144+
* TCP {@link LoopResources}.
145+
*
146+
* @since 1.0.0
147+
*/
148+
public static final Option<LoopResources> TCP_LOOP_RESOURCES = Option.valueOf("tcpLoopResources");
149+
142150
/**
143151
* Determine the number of queries that are cached in each connection.
144152
* The default is {@code -1}, meaning there's no limit. The value of {@code 0} disables the cache. Any other value specifies the cache size.
@@ -211,6 +219,7 @@ private static PostgresqlConnectionConfiguration.Builder fromConnectionFactoryOp
211219
});
212220
mapper.from(TCP_KEEPALIVE).map(OptionMapper::toBoolean).to(builder::tcpKeepAlive);
213221
mapper.from(TCP_NODELAY).map(OptionMapper::toBoolean).to(builder::tcpNoDelay);
222+
mapper.from(TCP_LOOP_RESOURCES).to(builder::tcpLoopResources);
214223
builder.username(options.getRequiredValue(USER));
215224

216225
return builder;

src/main/java/io/r2dbc/postgresql/client/ConnectionSettings.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.r2dbc.postgresql.client;
1818

19+
import reactor.netty.resources.LoopResources;
1920
import reactor.util.annotation.Nullable;
2021

2122
import java.time.Duration;
@@ -34,10 +35,14 @@ public final class ConnectionSettings {
3435

3536
private final boolean tcpNoDelay;
3637

37-
public ConnectionSettings(@Nullable Duration connectTimeout, boolean tcpKeepAlive, boolean tcpNoDelay) {
38+
@Nullable
39+
private final LoopResources tcpLoopResources;
40+
41+
public ConnectionSettings(@Nullable Duration connectTimeout, boolean tcpKeepAlive, boolean tcpNoDelay, @Nullable LoopResources tcpLoopResources) {
3842
this.tcpKeepAlive = tcpKeepAlive;
3943
this.tcpNoDelay = tcpNoDelay;
4044
this.connectTimeout = connectTimeout;
45+
this.tcpLoopResources = tcpLoopResources;
4146
}
4247

4348
@Nullable
@@ -53,4 +58,12 @@ boolean isTcpNoDelay() {
5358
return this.tcpNoDelay;
5459
}
5560

61+
public boolean hasTcpLoopResources() {
62+
return this.tcpLoopResources != null;
63+
}
64+
65+
LoopResources getTcpLoopResources() {
66+
return this.tcpLoopResources;
67+
}
68+
5669
}

src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,9 +334,30 @@ public static Mono<ReactorNettyClient> connect(String host, int port) {
334334
* @param connectTimeout connect timeout
335335
* @param sslConfig SSL configuration
336336
* @throws IllegalArgumentException if {@code host} is {@code null}
337+
* @throws IllegalArgumentException if {@code sslConfig} is {@code null}
337338
*/
338339
public static Mono<ReactorNettyClient> connect(String host, int port, @Nullable Duration connectTimeout, SSLConfig sslConfig) {
339-
return connect(ConnectionProvider.newConnection(), InetSocketAddress.createUnresolved(host, port), new ConnectionSettings(connectTimeout, false, false), sslConfig);
340+
Assert.requireNonNull(host, "host must not be null");
341+
Assert.requireNonNull(sslConfig, "sslConfig must not be null");
342+
343+
return connect(host, port, connectTimeout, sslConfig, null);
344+
}
345+
346+
/**
347+
* Create a new frame processor connected to a given host.
348+
*
349+
* @param host the host to connect to
350+
* @param port the port to connect to
351+
* @param connectTimeout connect timeout
352+
* @param sslConfig SSL configuration
353+
* @param tcpLoopResources tcp loop resources
354+
* @throws IllegalArgumentException if {@code host} is {@code null}
355+
*/
356+
public static Mono<ReactorNettyClient> connect(String host, int port, @Nullable Duration connectTimeout, @Nullable SSLConfig sslConfig, @Nullable LoopResources tcpLoopResources) {
357+
Assert.requireNonNull(host, "host must not be null");
358+
359+
ConnectionSettings settings = new ConnectionSettings(connectTimeout, false, false, tcpLoopResources);
360+
return connect(ConnectionProvider.newConnection(), InetSocketAddress.createUnresolved(host, port), settings, sslConfig);
340361
}
341362

342363
/**
@@ -358,6 +379,9 @@ public static Mono<ReactorNettyClient> connect(ConnectionProvider connectionProv
358379
if (!(socketAddress instanceof InetSocketAddress)) {
359380
tcpClient = tcpClient.runOn(new SocketLoopResources(), true);
360381
} else {
382+
if (connectionSettings.hasTcpLoopResources()) {
383+
tcpClient = tcpClient.runOn(connectionSettings.getTcpLoopResources());
384+
}
361385
tcpClient = tcpClient.option(ChannelOption.SO_KEEPALIVE, connectionSettings.isTcpKeepAlive());
362386
tcpClient = tcpClient.option(ChannelOption.TCP_NODELAY, connectionSettings.isTcpNoDelay());
363387
}

src/test/java/io/r2dbc/postgresql/PostgresqlConnectionConfigurationUnitTests.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818

1919
import io.r2dbc.postgresql.client.SSLMode;
2020
import org.junit.jupiter.api.Test;
21+
import reactor.netty.resources.LoopResources;
2122

2223
import java.time.Duration;
2324
import java.util.HashMap;
2425
import java.util.Map;
2526

2627
import static org.assertj.core.api.Assertions.assertThat;
2728
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
29+
import static org.mockito.Mockito.mock;
2830

2931
/**
3032
* Unit tests for {@link PostgresqlConnectionConfiguration}.
@@ -61,11 +63,18 @@ void builderNegativeFetchSize() {
6163
.withMessage("fetch size must be greater or equal zero");
6264
}
6365

66+
@Test
67+
void builderNoTcpLoopResources() {
68+
assertThatIllegalArgumentException().isThrownBy(() -> PostgresqlConnectionConfiguration.builder().tcpLoopResources(null))
69+
.withMessage("tcpLoopResources must not be null");
70+
}
71+
6472
@Test
6573
void configuration() {
6674
Map<String, String> options = new HashMap<>();
6775
options.put("lock_timeout", "10s");
6876
options.put("statement_timeout", "60000"); // [ms]
77+
LoopResources loopResources = mock(LoopResources.class);
6978

7079
PostgresqlConnectionConfiguration configuration = PostgresqlConnectionConfiguration.builder()
7180
.applicationName("test-application-name")
@@ -77,6 +86,9 @@ void configuration() {
7786
.schema("test-schema")
7887
.username("test-username")
7988
.sslMode(SSLMode.ALLOW)
89+
.tcpKeepAlive(true)
90+
.tcpNoDelay(true)
91+
.tcpLoopResources(loopResources)
8092
.build();
8193

8294
assertThat(configuration)
@@ -88,7 +100,10 @@ void configuration() {
88100
.hasFieldOrPropertyWithValue("password", null)
89101
.hasFieldOrPropertyWithValue("port", 100)
90102
.hasFieldOrPropertyWithValue("username", "test-username")
91-
.hasFieldOrProperty("sslConfig");
103+
.hasFieldOrProperty("sslConfig")
104+
.hasFieldOrPropertyWithValue("tcpKeepAlive", true)
105+
.hasFieldOrPropertyWithValue("tcpNoDelay", true)
106+
.hasFieldOrPropertyWithValue("tcpLoopResources", loopResources);
92107

93108
assertThat(configuration.getOptions())
94109
.containsEntry("lock_timeout", "10s")
@@ -114,7 +129,10 @@ void configurationDefaults() {
114129
.hasFieldOrPropertyWithValue("port", 5432)
115130
.hasFieldOrProperty("options")
116131
.hasFieldOrPropertyWithValue("username", "test-username")
117-
.hasFieldOrProperty("sslConfig");
132+
.hasFieldOrProperty("sslConfig")
133+
.hasFieldOrPropertyWithValue("tcpKeepAlive", false)
134+
.hasFieldOrPropertyWithValue("tcpNoDelay", false)
135+
.hasFieldOrPropertyWithValue("tcpLoopResources", null);
118136

119137
assertThat(configuration.getOptions())
120138
.containsEntry("search_path", "test-schema");

0 commit comments

Comments
 (0)