|
10 | 10 | import reactor.netty.Connection; |
11 | 11 | import reactor.netty.channel.BootstrapHandlers; |
12 | 12 | import reactor.netty.resources.ConnectionProvider; |
13 | | -import reactor.netty.tcp.SslProvider; |
14 | 13 | import reactor.netty.tcp.TcpClient; |
15 | 14 |
|
16 | 15 | final class TcpSender implements Sender { |
@@ -42,18 +41,19 @@ public Mono<Void> send(Message message) { |
42 | 41 | } |
43 | 42 |
|
44 | 43 | private TcpClient newTcpClient(SenderContext context, Address address) { |
45 | | - return TcpClient.create(ConnectionProvider.newConnection()) |
46 | | - .runOn(context.loopResources()) |
47 | | - .host(address.host()) |
48 | | - .port(address.port()) |
49 | | - .option(ChannelOption.TCP_NODELAY, true) |
50 | | - .option(ChannelOption.SO_KEEPALIVE, true) |
51 | | - .option(ChannelOption.SO_REUSEADDR, true) |
52 | | - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.connectTimeout()) |
53 | | - .secure(config.isSecured() ? SslProvider.defaultClientProvider() : null) |
54 | | - .bootstrap( |
55 | | - b -> |
56 | | - BootstrapHandlers.updateConfiguration( |
57 | | - b, "outbound", new TcpChannelInitializer(config.maxFrameLength()))); |
| 44 | + TcpClient tcpClient = |
| 45 | + TcpClient.create(ConnectionProvider.newConnection()) |
| 46 | + .runOn(context.loopResources()) |
| 47 | + .host(address.host()) |
| 48 | + .port(address.port()) |
| 49 | + .option(ChannelOption.TCP_NODELAY, true) |
| 50 | + .option(ChannelOption.SO_KEEPALIVE, true) |
| 51 | + .option(ChannelOption.SO_REUSEADDR, true) |
| 52 | + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.connectTimeout()) |
| 53 | + .bootstrap( |
| 54 | + b -> |
| 55 | + BootstrapHandlers.updateConfiguration( |
| 56 | + b, "outbound", new TcpChannelInitializer(config.maxFrameLength()))); |
| 57 | + return config.isSecured() ? tcpClient.secure() : tcpClient; |
58 | 58 | } |
59 | 59 | } |
0 commit comments