Skip to content

Commit 2ca23e4

Browse files
committed
Added RSocketClientTransportFactory and RSocketClientTransportFactory
1 parent 4c4e658 commit 2ca23e4

File tree

6 files changed

+109
-71
lines changed

6 files changed

+109
-71
lines changed

services-examples-parent/services-examples-runner/src/main/java/io/scalecube/services/examples/ExamplesRunner.java

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.scalecube.services.examples;
22

3+
import io.rsocket.transport.netty.client.TcpClientTransport;
4+
import io.rsocket.transport.netty.server.TcpServerTransport;
35
import io.scalecube.config.ConfigRegistry;
46
import io.scalecube.config.ConfigRegistrySettings;
57
import io.scalecube.config.audit.Slf4JConfigEventListener;
@@ -58,20 +60,26 @@ public static void main(String[] args) {
5860
.transport(
5961
() ->
6062
new RSocketServiceTransport()
61-
.tcpClient(
63+
.clientTransportFactory(
6264
loopResources ->
63-
TcpClient.newConnection()
64-
.runOn(loopResources)
65-
.wiretap(false)
66-
.noProxy()
67-
.noSSL())
68-
.tcpServer(
65+
address ->
66+
TcpClientTransport.create(
67+
TcpClient.newConnection()
68+
.host(address.host())
69+
.port(address.port())
70+
.runOn(loopResources)
71+
.wiretap(false)
72+
.noProxy()
73+
.noSSL()))
74+
.serverTransportFactory(
6975
loopResources ->
70-
TcpServer.create()
71-
.wiretap(false)
72-
.port(config.servicePort())
73-
.runOn(loopResources)
74-
.noSSL()))
76+
() ->
77+
TcpServerTransport.create(
78+
TcpServer.create()
79+
.wiretap(false)
80+
.port(config.servicePort())
81+
.runOn(loopResources)
82+
.noSSL())))
7583
.services(new BenchmarkServiceImpl(), new GreetingServiceImpl())
7684
.startAwait();
7785

services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientTransport.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import io.rsocket.RSocket;
44
import io.rsocket.core.RSocketConnector;
55
import io.rsocket.frame.decoder.PayloadDecoder;
6-
import io.rsocket.transport.netty.client.TcpClientTransport;
76
import io.scalecube.net.Address;
87
import io.scalecube.services.transport.api.ClientChannel;
98
import io.scalecube.services.transport.api.ClientTransport;
@@ -14,7 +13,6 @@
1413
import org.slf4j.Logger;
1514
import org.slf4j.LoggerFactory;
1615
import reactor.core.publisher.Mono;
17-
import reactor.netty.tcp.TcpClient;
1816

1917
public class RSocketClientTransport implements ClientTransport {
2018

@@ -24,17 +22,18 @@ public class RSocketClientTransport implements ClientTransport {
2422
ThreadLocal.withInitial(ConcurrentHashMap::new);
2523

2624
private final ServiceMessageCodec messageCodec;
27-
private final TcpClient tcpClient;
25+
private final RSocketClientTransportFactory clientTransportFactory;
2826

2927
/**
3028
* Constructor for this transport.
3129
*
32-
* @param messageCodec message codec
33-
* @param tcpClient tcp client
30+
* @param messageCodec messageCodec
31+
* @param clientTransportFactory clientTransportFactory
3432
*/
35-
public RSocketClientTransport(ServiceMessageCodec messageCodec, TcpClient tcpClient) {
33+
public RSocketClientTransport(
34+
ServiceMessageCodec messageCodec, RSocketClientTransportFactory clientTransportFactory) {
3635
this.messageCodec = messageCodec;
37-
this.tcpClient = tcpClient;
36+
this.clientTransportFactory = clientTransportFactory;
3837
}
3938

4039
@Override
@@ -46,10 +45,9 @@ public ClientChannel create(Address address) {
4645
}
4746

4847
private Mono<RSocket> connect(Address address, Map<Address, Mono<RSocket>> monoMap) {
49-
TcpClient tcpClient = this.tcpClient.host(address.host()).port(address.port());
5048
return RSocketConnector.create()
5149
.payloadDecoder(PayloadDecoder.DEFAULT)
52-
.connect(() -> TcpClientTransport.create(tcpClient))
50+
.connect(() -> clientTransportFactory.clientTransport(address))
5351
.doOnSuccess(
5452
rsocket -> {
5553
LOGGER.debug("[rsocket][client] Connected successfully on {}", address);
@@ -80,7 +78,6 @@ private Mono<RSocket> connect(Address address, Map<Address, Mono<RSocket>> monoM
8078
public String toString() {
8179
return new StringJoiner(", ", RSocketClientTransport.class.getSimpleName() + "[", "]")
8280
.add("messageCodec=" + messageCodec)
83-
.add("tcpClient=" + tcpClient)
8481
.toString();
8582
}
8683
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package io.scalecube.services.transport.rsocket;
2+
3+
import io.rsocket.transport.ClientTransport;
4+
import io.scalecube.net.Address;
5+
6+
public interface RSocketClientTransportFactory {
7+
8+
ClientTransport clientTransport(Address address);
9+
}

services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServerTransport.java

Lines changed: 13 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import io.rsocket.core.RSocketServer;
44
import io.rsocket.frame.decoder.PayloadDecoder;
55
import io.rsocket.transport.netty.server.CloseableChannel;
6-
import io.rsocket.transport.netty.server.TcpServerTransport;
76
import io.scalecube.net.Address;
87
import io.scalecube.services.methods.ServiceMethodRegistry;
98
import io.scalecube.services.transport.api.ServerTransport;
@@ -13,27 +12,26 @@
1312
import org.slf4j.Logger;
1413
import org.slf4j.LoggerFactory;
1514
import reactor.core.publisher.Mono;
16-
import reactor.netty.tcp.TcpServer;
1715

18-
/** RSocket server transport implementation. */
1916
public class RSocketServerTransport implements ServerTransport {
2017

2118
private static final Logger LOGGER = LoggerFactory.getLogger(RSocketServerTransport.class);
2219

2320
private final ServiceMessageCodec messageCodec;
24-
private final TcpServer tcpServer;
21+
private final RSocketServerTransportFactory serverTransportFactory;
2522

2623
private CloseableChannel serverChannel; // calculated
2724

2825
/**
2926
* Constructor for this server transport.
3027
*
31-
* @param messageCodec message codec
32-
* @param tcpServer tcp server
28+
* @param messageCodec messageCodec
29+
* @param serverTransportFactory serverTransportFactory
3330
*/
34-
public RSocketServerTransport(ServiceMessageCodec messageCodec, TcpServer tcpServer) {
31+
public RSocketServerTransport(
32+
ServiceMessageCodec messageCodec, RSocketServerTransportFactory serverTransportFactory) {
3533
this.messageCodec = messageCodec;
36-
this.tcpServer = tcpServer;
34+
this.serverTransportFactory = serverTransportFactory;
3735
}
3836

3937
@Override
@@ -45,24 +43,13 @@ public Address address() {
4543
@Override
4644
public Mono<ServerTransport> bind(ServiceMethodRegistry methodRegistry) {
4745
return Mono.defer(
48-
() -> {
49-
TcpServer tcpServer =
50-
this.tcpServer.doOnConnection(
51-
connection -> {
52-
LOGGER.debug(
53-
"[rsocket][server] Accepted connection on {}", connection.channel());
54-
connection.onDispose(
55-
() ->
56-
LOGGER.debug(
57-
"[rsocket][server] Connection closed on {}", connection.channel()));
58-
});
59-
return RSocketServer.create()
60-
.acceptor(new RSocketServiceAcceptor(messageCodec, methodRegistry))
61-
.payloadDecoder(PayloadDecoder.DEFAULT)
62-
.bind(TcpServerTransport.create(tcpServer))
63-
.doOnSuccess(channel -> serverChannel = channel)
64-
.thenReturn(this);
65-
});
46+
() ->
47+
RSocketServer.create()
48+
.acceptor(new RSocketServiceAcceptor(messageCodec, methodRegistry))
49+
.payloadDecoder(PayloadDecoder.DEFAULT)
50+
.bind(serverTransportFactory.serverTransport())
51+
.doOnSuccess(channel -> serverChannel = channel)
52+
.thenReturn(this));
6653
}
6754

6855
@Override
@@ -88,7 +75,6 @@ public Mono<Void> stop() {
8875
public String toString() {
8976
return new StringJoiner(", ", RSocketServerTransport.class.getSimpleName() + "[", "]")
9077
.add("messageCodec=" + messageCodec)
91-
.add("tcpServer=" + tcpServer)
9278
.add("serverChannel=" + serverChannel)
9379
.toString();
9480
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package io.scalecube.services.transport.rsocket;
2+
3+
import io.rsocket.transport.ServerTransport;
4+
import io.rsocket.transport.netty.server.CloseableChannel;
5+
6+
public interface RSocketServerTransportFactory {
7+
8+
ServerTransport<CloseableChannel> serverTransport();
9+
}

services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServiceTransport.java

Lines changed: 51 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import io.netty.channel.nio.NioEventLoopGroup;
77
import io.netty.util.concurrent.DefaultThreadFactory;
88
import io.netty.util.concurrent.Future;
9+
import io.rsocket.transport.netty.client.TcpClientTransport;
10+
import io.rsocket.transport.netty.server.TcpServerTransport;
911
import io.scalecube.services.transport.api.ClientTransport;
1012
import io.scalecube.services.transport.api.DataCodec;
1113
import io.scalecube.services.transport.api.HeadersCodec;
@@ -16,21 +18,26 @@
1618
import java.util.Collection;
1719
import java.util.concurrent.ThreadFactory;
1820
import java.util.function.Function;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
1923
import reactor.core.publisher.Flux;
2024
import reactor.core.publisher.Mono;
2125
import reactor.netty.FutureMono;
2226
import reactor.netty.resources.LoopResources;
2327
import reactor.netty.tcp.TcpClient;
2428
import reactor.netty.tcp.TcpServer;
2529

26-
/** RSocket service transport. */
2730
public class RSocketServiceTransport implements ServiceTransport {
2831

32+
private static final Logger LOGGER = LoggerFactory.getLogger(RSocketServiceTransport.class);
33+
2934
private int numOfWorkers = Runtime.getRuntime().availableProcessors();
3035
private HeadersCodec headersCodec;
3136
private Collection<DataCodec> dataCodecs;
32-
private Function<LoopResources, TcpServer> tcpServerProvider = defaultTcpServerProvider();
33-
private Function<LoopResources, TcpClient> tcpClientProvider = defaultTcpClientProvider();
37+
private Function<LoopResources, RSocketServerTransportFactory> serverTransportFactory =
38+
defaultServerTransportFactory();
39+
private Function<LoopResources, RSocketClientTransportFactory> clientTransportFactory =
40+
defaultClientTransportFactory();
3441

3542
// resources
3643
private EventLoopGroup eventLoopGroup;
@@ -52,8 +59,8 @@ private RSocketServiceTransport(RSocketServiceTransport other) {
5259
this.eventLoopGroup = other.eventLoopGroup;
5360
this.clientLoopResources = other.clientLoopResources;
5461
this.serverLoopResources = other.serverLoopResources;
55-
this.tcpServerProvider = other.tcpServerProvider;
56-
this.tcpClientProvider = other.tcpClientProvider;
62+
this.serverTransportFactory = other.serverTransportFactory;
63+
this.clientTransportFactory = other.clientTransportFactory;
5764
}
5865

5966
/**
@@ -93,26 +100,28 @@ public RSocketServiceTransport dataCodecs(Collection<DataCodec> dataCodecs) {
93100
}
94101

95102
/**
96-
* Setter for {@code tcpServerProvider}.
103+
* Setter for {@code serverTransportFactory}.
97104
*
98-
* @param factory {@code TcpServer} provider function
105+
* @param serverTransportFactory serverTransportFactory
99106
* @return new {@code RSocketServiceTransport} instance
100107
*/
101-
public RSocketServiceTransport tcpServer(Function<LoopResources, TcpServer> factory) {
108+
public RSocketServiceTransport serverTransportFactory(
109+
Function<LoopResources, RSocketServerTransportFactory> serverTransportFactory) {
102110
RSocketServiceTransport rst = new RSocketServiceTransport(this);
103-
rst.tcpServerProvider = factory;
111+
rst.serverTransportFactory = serverTransportFactory;
104112
return rst;
105113
}
106114

107115
/**
108-
* Setter for {@code tcpClientProvider}.
116+
* Setter for {@code clientTransportFactory}.
109117
*
110-
* @param factory {@code TcpClient} provider function
118+
* @param clientTransportFactory clientTransportFactory
111119
* @return new {@code RSocketServiceTransport} instance
112120
*/
113-
public RSocketServiceTransport tcpClient(Function<LoopResources, TcpClient> factory) {
121+
public RSocketServiceTransport clientTransportFactory(
122+
Function<LoopResources, RSocketClientTransportFactory> clientTransportFactory) {
114123
RSocketServiceTransport rst = new RSocketServiceTransport(this);
115-
rst.tcpClientProvider = factory;
124+
rst.clientTransportFactory = clientTransportFactory;
116125
return rst;
117126
}
118127

@@ -125,7 +134,7 @@ public RSocketServiceTransport tcpClient(Function<LoopResources, TcpClient> fact
125134
public ClientTransport clientTransport() {
126135
return new RSocketClientTransport(
127136
new ServiceMessageCodec(headersCodec, dataCodecs),
128-
tcpClientProvider.apply(clientLoopResources));
137+
clientTransportFactory.apply(clientLoopResources));
129138
}
130139

131140
/**
@@ -137,7 +146,7 @@ public ClientTransport clientTransport() {
137146
public ServerTransport serverTransport() {
138147
return new RSocketServerTransport(
139148
new ServiceMessageCodec(headersCodec, dataCodecs),
140-
tcpServerProvider.apply(serverLoopResources));
149+
serverTransportFactory.apply(serverLoopResources));
141150
}
142151

143152
@Override
@@ -173,14 +182,34 @@ private Mono<Void> shutdownEventLoopGroup() {
173182
return Mono.defer(() -> FutureMono.from((Future) eventLoopGroup.shutdownGracefully()));
174183
}
175184

176-
private Function<LoopResources, TcpServer> defaultTcpServerProvider() {
185+
private Function<LoopResources, RSocketServerTransportFactory> defaultServerTransportFactory() {
177186
return (LoopResources serverLoopResources) ->
178-
TcpServer.create().runOn(serverLoopResources).bindAddress(() -> new InetSocketAddress(0));
179-
}
180-
181-
private Function<LoopResources, TcpClient> defaultTcpClientProvider() {
182-
return (LoopResources clientLoopResources) ->
183-
TcpClient.newConnection().runOn(clientLoopResources);
187+
() ->
188+
TcpServerTransport.create(
189+
TcpServer.create()
190+
.runOn(serverLoopResources)
191+
.bindAddress(() -> new InetSocketAddress(0))
192+
.doOnConnection(
193+
connection -> {
194+
LOGGER.debug(
195+
"[rsocket][server] Accepted connection on {}", connection.channel());
196+
connection.onDispose(
197+
() ->
198+
LOGGER.debug(
199+
"[rsocket][server] Connection closed on {}",
200+
connection.channel()));
201+
}));
202+
}
203+
204+
private Function<LoopResources, RSocketClientTransportFactory> defaultClientTransportFactory() {
205+
return (LoopResources loopResources) ->
206+
(RSocketClientTransportFactory)
207+
address ->
208+
TcpClientTransport.create(
209+
TcpClient.newConnection()
210+
.host(address.host())
211+
.port(address.port())
212+
.runOn(loopResources));
184213
}
185214

186215
@Override

0 commit comments

Comments
 (0)