Skip to content

Commit 3d1c814

Browse files
committed
Apply new rsocket API
1 parent 047756b commit 3d1c814

File tree

3 files changed

+16
-18
lines changed

3 files changed

+16
-18
lines changed

pom.xml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
<properties>
2525
<jackson.version>2.11.0</jackson.version>
26-
<scalecube-cluster.version>2.4.11-SNAPSHOT</scalecube-cluster.version>
26+
<scalecube-cluster.version>2.4.11</scalecube-cluster.version>
2727
<scalecube-commons.version>1.0.2</scalecube-commons.version>
2828
<scalecube-benchmarks.version>1.2.2</scalecube-benchmarks.version>
2929
<scalecube-config.version>0.4.3</scalecube-config.version>
@@ -96,6 +96,11 @@
9696
</dependency>
9797

9898
<!-- Logging -->
99+
<dependency>
100+
<groupId>org.slf4j</groupId>
101+
<artifactId>slf4j-api</artifactId>
102+
<version>${slf4j.version}</version>
103+
</dependency>
99104
<dependency>
100105
<groupId>org.apache.logging.log4j</groupId>
101106
<artifactId>log4j-bom</artifactId>

services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientTransport.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package io.scalecube.services.transport.rsocket;
22

33
import io.rsocket.RSocket;
4-
import io.rsocket.RSocketFactory;
4+
import io.rsocket.core.RSocketConnector;
55
import io.rsocket.frame.decoder.PayloadDecoder;
66
import io.rsocket.transport.netty.client.TcpClientTransport;
77
import io.scalecube.net.Address;
@@ -46,16 +46,10 @@ public ClientChannel create(Address address) {
4646

4747
private Mono<RSocket> connect(Address address, Map<Address, Mono<RSocket>> monoMap) {
4848
TcpClient tcpClient = this.tcpClient.host(address.host()).port(address.port());
49-
50-
Mono<RSocket> rsocketMono =
51-
RSocketFactory.connect()
52-
.frameDecoder(PayloadDecoder.DEFAULT)
53-
.errorConsumer(
54-
th -> LOGGER.warn("Exception occurred at rsocket client transport: " + th))
55-
.transport(() -> TcpClientTransport.create(tcpClient))
56-
.start();
57-
58-
return rsocketMono
49+
return RSocketConnector.create()
50+
.payloadDecoder(PayloadDecoder.DEFAULT)
51+
.errorConsumer(th1 -> LOGGER.warn("Exception occurred at rsocket client transport: " + th1))
52+
.connect(() -> TcpClientTransport.create(tcpClient))
5953
.doOnSuccess(
6054
rsocket -> {
6155
LOGGER.info("Connected successfully on {}", address);

services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServerTransport.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.scalecube.services.transport.rsocket;
22

33
import io.rsocket.RSocketFactory;
4+
import io.rsocket.core.RSocketServer;
45
import io.rsocket.frame.decoder.PayloadDecoder;
56
import io.rsocket.transport.netty.server.CloseableChannel;
67
import io.rsocket.transport.netty.server.TcpServerTransport;
@@ -52,14 +53,12 @@ public Mono<ServerTransport> bind(ServiceMethodRegistry methodRegistry) {
5253
connection.onDispose(
5354
() -> LOGGER.info("Connection closed on {}", connection.channel()));
5455
});
55-
56-
return RSocketFactory.receive()
57-
.frameDecoder(PayloadDecoder.DEFAULT)
56+
return RSocketServer.create()
57+
.acceptor(new RSocketServiceAcceptor(codec, methodRegistry))
58+
.payloadDecoder(PayloadDecoder.DEFAULT)
5859
.errorConsumer(
5960
th -> LOGGER.warn("Exception occurred at rsocket server transport: " + th))
60-
.acceptor(new RSocketServiceAcceptor(codec, methodRegistry))
61-
.transport(() -> TcpServerTransport.create(tcpServer))
62-
.start()
61+
.bind(TcpServerTransport.create(tcpServer))
6362
.doOnSuccess(channel -> serverChannel = channel)
6463
.thenReturn(this);
6564
});

0 commit comments

Comments
 (0)