11package io .netifi .proteus ;
22
3- import io .micrometer .core .instrument .MeterRegistry ;
43import io .netifi .proteus .rsocket .ProteusSocket ;
54import io .rsocket .rpc .rsocket .RequestHandlingRSocket ;
65import io .rsocket .rpc .RSocketRpcService ;
76import io .netty .buffer .ByteBuf ;
87import io .netty .buffer .Unpooled ;
8+ import io .netty .handler .ssl .OpenSsl ;
9+ import io .netty .handler .ssl .SslContext ;
10+ import io .netty .handler .ssl .SslContextBuilder ;
11+ import io .netty .handler .ssl .SslProvider ;
12+ import io .netty .handler .ssl .util .InsecureTrustManagerFactory ;
913import io .opentracing .Tracer ;
1014import io .rsocket .Closeable ;
1115import io .rsocket .transport .ClientTransport ;
1216import io .rsocket .transport .netty .client .TcpClientTransport ;
1317import org .slf4j .Logger ;
1418import org .slf4j .LoggerFactory ;
19+ import reactor .core .Exceptions ;
1520import reactor .core .publisher .Mono ;
1621import reactor .core .publisher .MonoProcessor ;
22+ import reactor .ipc .netty .tcp .TcpClient ;
1723
1824import java .net .InetSocketAddress ;
1925import java .net .SocketAddress ;
@@ -135,10 +141,7 @@ public static class Builder {
135141 private int missedAcks = DefaultBuilderConfig .getMissedAcks ();
136142 private DestinationNameFactory destinationNameFactory ;
137143
138- private MeterRegistry registry = null ;
139- private int batchSize = DefaultBuilderConfig .getBatchSize ();
140- private Function <SocketAddress , ClientTransport > clientTransportFactory =
141- address -> TcpClientTransport .create ((InetSocketAddress ) address );
144+ private Function <SocketAddress , ClientTransport > clientTransportFactory = null ;
142145 private int poolSize = Runtime .getRuntime ().availableProcessors ();
143146 private Supplier <Tracer > tracerSupplier = () -> null ;
144147
@@ -153,13 +156,8 @@ public Builder poolSize(int poolSize) {
153156 return this ;
154157 }
155158
156- public Builder metricBatchSize (int batchSize ) {
157- this .batchSize = batchSize ;
158- return this ;
159- }
160-
161159 public Builder keepalive (boolean useKeepAlive ) {
162- this .keepalive = keepalive ;
160+ this .keepalive = useKeepAlive ;
163161 return this ;
164162 }
165163
@@ -197,9 +195,7 @@ public Builder seedAddresses(Collection<SocketAddress> addresses) {
197195 if (addresses instanceof List ) {
198196 this .seedAddresses = (List <SocketAddress >) addresses ;
199197 } else {
200- List <SocketAddress > list = new ArrayList <>();
201- list .addAll (addresses );
202- this .seedAddresses = list ;
198+ this .seedAddresses = new ArrayList <>(addresses );
203199 }
204200
205201 return this ;
@@ -246,6 +242,35 @@ public Proteus build() {
246242 Objects .requireNonNull (accessToken , "account token is required" );
247243 Objects .requireNonNull (group , "group is required" );
248244
245+ if (clientTransportFactory == null ) {
246+ logger .info ("Client transport factory not provided; using TCP transport." );
247+ try {
248+ final SslProvider sslProvider ;
249+ if (OpenSsl .isAvailable ()) {
250+ logger .info ("Native SSL provider is available; will use native provider." );
251+ sslProvider = SslProvider .OPENSSL_REFCNT ;
252+ } else {
253+ logger .info ("Native SSL provider not available; will use JDK SSL provider." );
254+ sslProvider = SslProvider .JDK ;
255+ }
256+ final SslContext sslContext =
257+ SslContextBuilder .forClient ()
258+ .trustManager (InsecureTrustManagerFactory .INSTANCE )
259+ .sslProvider (sslProvider )
260+ .build ();
261+ clientTransportFactory = address -> {
262+ TcpClient client =
263+ TcpClient .create (
264+ opts ->
265+ opts .connectAddress (() -> address )
266+ .sslContext (sslContext ));
267+ return TcpClientTransport .create (client );
268+ };
269+ } catch (Exception sslException ) {
270+ throw Exceptions .bubble (sslException );
271+ }
272+ }
273+
249274 this .accessTokenBytes = Base64 .getDecoder ().decode (accessToken );
250275
251276 if (destinationNameFactory == null ) {
@@ -260,7 +285,7 @@ public Proteus build() {
260285 if (seedAddresses == null ) {
261286 Objects .requireNonNull (host , "host is required" );
262287 Objects .requireNonNull (port , "port is required" );
263- socketAddresses = Arrays . asList (InetSocketAddress .createUnresolved (host , port ));
288+ socketAddresses = Collections . singletonList (InetSocketAddress .createUnresolved (host , port ));
264289 } else {
265290 socketAddresses = seedAddresses ;
266291 }
0 commit comments