Skip to content

Commit 9f244e4

Browse files
committed
Made websocket rsocket transport to be default service-transport implementation
1 parent 471c978 commit 9f244e4

File tree

3 files changed

+68
-3
lines changed

3 files changed

+68
-3
lines changed

services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientTransportFactory.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package io.scalecube.services.transport.rsocket;
22

3+
import io.netty.channel.ChannelOption;
34
import io.rsocket.transport.ClientTransport;
45
import io.rsocket.transport.netty.client.TcpClientTransport;
6+
import io.rsocket.transport.netty.client.WebsocketClientTransport;
57
import io.scalecube.net.Address;
68
import java.util.function.Function;
9+
import reactor.netty.http.client.HttpClient;
710
import reactor.netty.resources.LoopResources;
811
import reactor.netty.tcp.TcpClient;
912

@@ -26,5 +29,29 @@ static Function<LoopResources, RSocketClientTransportFactory> tcp() {
2629
.runOn(loopResources));
2730
}
2831

32+
/**
33+
* Returns default rsocket websocket client transport factory.
34+
*
35+
* @see WebsocketClientTransport
36+
* @return factory function for {@link RSocketClientTransportFactory}
37+
*/
38+
static Function<LoopResources, RSocketClientTransportFactory> websocket() {
39+
return (LoopResources loopResources) ->
40+
(RSocketClientTransportFactory)
41+
address ->
42+
WebsocketClientTransport.create(
43+
HttpClient.newConnection()
44+
.tcpConfiguration(
45+
tcpClient ->
46+
tcpClient
47+
.runOn(loopResources)
48+
.host(address.host())
49+
.port(address.port())
50+
.option(ChannelOption.TCP_NODELAY, true)
51+
.option(ChannelOption.SO_KEEPALIVE, true)
52+
.option(ChannelOption.SO_REUSEADDR, true)),
53+
"/");
54+
}
55+
2956
ClientTransport clientTransport(Address address);
3057
}

services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServerTransportFactory.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package io.scalecube.services.transport.rsocket;
22

3+
import io.netty.channel.ChannelOption;
34
import io.rsocket.transport.ServerTransport;
45
import io.rsocket.transport.netty.server.CloseableChannel;
56
import io.rsocket.transport.netty.server.TcpServerTransport;
7+
import io.rsocket.transport.netty.server.WebsocketServerTransport;
68
import java.net.InetSocketAddress;
79
import java.util.function.Function;
10+
import reactor.netty.http.server.HttpServer;
811
import reactor.netty.resources.LoopResources;
912
import reactor.netty.tcp.TcpServer;
1013

@@ -33,7 +36,42 @@ static Function<LoopResources, RSocketServerTransportFactory> tcp(int port) {
3336
TcpServerTransport.create(
3437
TcpServer.create()
3538
.runOn(loopResources)
36-
.bindAddress(() -> new InetSocketAddress(port)));
39+
.bindAddress(() -> new InetSocketAddress(port))
40+
.option(ChannelOption.TCP_NODELAY, true)
41+
.option(ChannelOption.SO_KEEPALIVE, true)
42+
.option(ChannelOption.SO_REUSEADDR, true));
43+
}
44+
45+
/**
46+
* Returns default rsocket websocket server transport factory (shall listen on port {@code 0}).
47+
*
48+
* @see WebsocketServerTransport
49+
* @return factory function for {@link RSocketServerTransportFactory}
50+
*/
51+
static Function<LoopResources, RSocketServerTransportFactory> websocket() {
52+
return websocket(0);
53+
}
54+
55+
/**
56+
* Returns default rsocket websocket server transport factory.
57+
*
58+
* @param port port
59+
* @see WebsocketServerTransport
60+
* @return factory function for {@link RSocketServerTransportFactory}
61+
*/
62+
static Function<LoopResources, RSocketServerTransportFactory> websocket(int port) {
63+
return loopResources ->
64+
() ->
65+
WebsocketServerTransport.create(
66+
HttpServer.create()
67+
.tcpConfiguration(
68+
tcpServer ->
69+
tcpServer
70+
.runOn(loopResources)
71+
.bindAddress(() -> new InetSocketAddress(port))
72+
.option(ChannelOption.TCP_NODELAY, true)
73+
.option(ChannelOption.SO_KEEPALIVE, true)
74+
.option(ChannelOption.SO_REUSEADDR, true)));
3775
}
3876

3977
ServerTransport<CloseableChannel> serverTransport();

services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServiceTransport.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ public class RSocketServiceTransport implements ServiceTransport {
2626
private HeadersCodec headersCodec;
2727
private Collection<DataCodec> dataCodecs;
2828
private Function<LoopResources, RSocketServerTransportFactory> serverTransportFactory =
29-
RSocketServerTransportFactory.tcp();
29+
RSocketServerTransportFactory.websocket();
3030
private Function<LoopResources, RSocketClientTransportFactory> clientTransportFactory =
31-
RSocketClientTransportFactory.tcp();
31+
RSocketClientTransportFactory.websocket();
3232

3333
// resources
3434
private EventLoopGroup eventLoopGroup;

0 commit comments

Comments
 (0)