Skip to content

Commit 471c978

Browse files
committed
Added RSocketClientTransportFactory and RSocketClientTransportFactory
1 parent b81866f commit 471c978

File tree

4 files changed

+55
-41
lines changed

4 files changed

+55
-41
lines changed

services-examples-parent/services-examples-runner/src/main/java/io/scalecube/services/examples/ExamplesRunner.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import io.scalecube.services.ServiceEndpoint;
1515
import io.scalecube.services.discovery.ScalecubeServiceDiscovery;
1616
import io.scalecube.services.discovery.api.ServiceDiscovery;
17+
import io.scalecube.services.transport.rsocket.RSocketServerTransportFactory;
1718
import io.scalecube.services.transport.rsocket.RSocketServiceTransport;
1819
import java.net.InetSocketAddress;
1920
import java.nio.file.Path;
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,30 @@
11
package io.scalecube.services.transport.rsocket;
22

33
import io.rsocket.transport.ClientTransport;
4+
import io.rsocket.transport.netty.client.TcpClientTransport;
45
import io.scalecube.net.Address;
6+
import java.util.function.Function;
7+
import reactor.netty.resources.LoopResources;
8+
import reactor.netty.tcp.TcpClient;
59

610
public interface RSocketClientTransportFactory {
711

12+
/**
13+
* Returns default rsocket tcp client transport factory.
14+
*
15+
* @see TcpClientTransport
16+
* @return factory function for {@link RSocketClientTransportFactory}
17+
*/
18+
static Function<LoopResources, RSocketClientTransportFactory> tcp() {
19+
return (LoopResources loopResources) ->
20+
(RSocketClientTransportFactory)
21+
address ->
22+
TcpClientTransport.create(
23+
TcpClient.newConnection()
24+
.host(address.host())
25+
.port(address.port())
26+
.runOn(loopResources));
27+
}
28+
829
ClientTransport clientTransport(Address address);
930
}

services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServerTransportFactory.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,39 @@
22

33
import io.rsocket.transport.ServerTransport;
44
import io.rsocket.transport.netty.server.CloseableChannel;
5+
import io.rsocket.transport.netty.server.TcpServerTransport;
6+
import java.net.InetSocketAddress;
7+
import java.util.function.Function;
8+
import reactor.netty.resources.LoopResources;
9+
import reactor.netty.tcp.TcpServer;
510

611
public interface RSocketServerTransportFactory {
712

13+
/**
14+
* Returns default rsocket tcp server transport factory (shall listen on port {@code 0}).
15+
*
16+
* @see TcpServerTransport
17+
* @return factory function for {@link RSocketServerTransportFactory}
18+
*/
19+
static Function<LoopResources, RSocketServerTransportFactory> tcp() {
20+
return tcp(0);
21+
}
22+
23+
/**
24+
* Returns default rsocket tcp server transport factory.
25+
*
26+
* @param port port
27+
* @see TcpServerTransport
28+
* @return factory function for {@link RSocketServerTransportFactory}
29+
*/
30+
static Function<LoopResources, RSocketServerTransportFactory> tcp(int port) {
31+
return (LoopResources loopResources) ->
32+
() ->
33+
TcpServerTransport.create(
34+
TcpServer.create()
35+
.runOn(loopResources)
36+
.bindAddress(() -> new InetSocketAddress(port)));
37+
}
38+
839
ServerTransport<CloseableChannel> serverTransport();
940
}

services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServiceTransport.java

Lines changed: 2 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -6,38 +6,29 @@
66
import io.netty.channel.nio.NioEventLoopGroup;
77
import io.netty.util.concurrent.DefaultThreadFactory;
88
import io.netty.util.concurrent.Future;
9-
import io.rsocket.transport.netty.client.TcpClientTransport;
10-
import io.rsocket.transport.netty.server.TcpServerTransport;
119
import io.scalecube.services.transport.api.ClientTransport;
1210
import io.scalecube.services.transport.api.DataCodec;
1311
import io.scalecube.services.transport.api.HeadersCodec;
1412
import io.scalecube.services.transport.api.ServerTransport;
1513
import io.scalecube.services.transport.api.ServiceMessageCodec;
1614
import io.scalecube.services.transport.api.ServiceTransport;
17-
import java.net.InetSocketAddress;
1815
import java.util.Collection;
1916
import java.util.concurrent.ThreadFactory;
2017
import java.util.function.Function;
21-
import org.slf4j.Logger;
22-
import org.slf4j.LoggerFactory;
2318
import reactor.core.publisher.Flux;
2419
import reactor.core.publisher.Mono;
2520
import reactor.netty.FutureMono;
2621
import reactor.netty.resources.LoopResources;
27-
import reactor.netty.tcp.TcpClient;
28-
import reactor.netty.tcp.TcpServer;
2922

3023
public class RSocketServiceTransport implements ServiceTransport {
3124

32-
private static final Logger LOGGER = LoggerFactory.getLogger(RSocketServiceTransport.class);
33-
3425
private int numOfWorkers = Runtime.getRuntime().availableProcessors();
3526
private HeadersCodec headersCodec;
3627
private Collection<DataCodec> dataCodecs;
3728
private Function<LoopResources, RSocketServerTransportFactory> serverTransportFactory =
38-
defaultServerTransportFactory();
29+
RSocketServerTransportFactory.tcp();
3930
private Function<LoopResources, RSocketClientTransportFactory> clientTransportFactory =
40-
defaultClientTransportFactory();
31+
RSocketClientTransportFactory.tcp();
4132

4233
// resources
4334
private EventLoopGroup eventLoopGroup;
@@ -182,36 +173,6 @@ private Mono<Void> shutdownEventLoopGroup() {
182173
return Mono.defer(() -> FutureMono.from((Future) eventLoopGroup.shutdownGracefully()));
183174
}
184175

185-
private Function<LoopResources, RSocketServerTransportFactory> defaultServerTransportFactory() {
186-
return (LoopResources serverLoopResources) ->
187-
() ->
188-
TcpServerTransport.create(
189-
TcpServer.create()
190-
.runOn(serverLoopResources)
191-
.bindAddress(() -> new InetSocketAddress(0))
192-
.doOnConnection(
193-
connection -> {
194-
LOGGER.debug(
195-
"[rsocket][server] Accepted connection on {}", connection.channel());
196-
connection.onDispose(
197-
() ->
198-
LOGGER.debug(
199-
"[rsocket][server] Connection closed on {}",
200-
connection.channel()));
201-
}));
202-
}
203-
204-
private Function<LoopResources, RSocketClientTransportFactory> defaultClientTransportFactory() {
205-
return (LoopResources loopResources) ->
206-
(RSocketClientTransportFactory)
207-
address ->
208-
TcpClientTransport.create(
209-
TcpClient.newConnection()
210-
.host(address.host())
211-
.port(address.port())
212-
.runOn(loopResources));
213-
}
214-
215176
@Override
216177
public String toString() {
217178
return "RSocketServiceTransport{"

0 commit comments

Comments
 (0)