Skip to content

Commit af76350

Browse files
authored
Merge pull request #775 from scalecube/add-container-host-container-port
Update services to support mapped host and port
2 parents eb2fa6a + 487adac commit af76350

File tree

9 files changed

+269
-93
lines changed

9 files changed

+269
-93
lines changed

pom.xml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
<parent>
99
<groupId>io.scalecube</groupId>
1010
<artifactId>scalecube-parent-pom</artifactId>
11-
<version>0.1.1</version>
11+
<version>0.1.2-RC3</version>
1212
</parent>
1313
<packaging>pom</packaging>
1414

@@ -23,11 +23,11 @@
2323

2424
<properties>
2525
<jackson.version>2.11.0</jackson.version>
26-
<scalecube-cluster.version>2.6.0-RC2</scalecube-cluster.version>
27-
<scalecube-commons.version>1.0.4</scalecube-commons.version>
26+
<scalecube-cluster.version>2.6.0-RC3</scalecube-cluster.version>
27+
<scalecube-commons.version>1.0.7</scalecube-commons.version>
2828
<scalecube-benchmarks.version>1.2.2</scalecube-benchmarks.version>
2929
<scalecube-config.version>0.4.3</scalecube-config.version>
30-
<reactor.version>Dysprosium-SR8</reactor.version>
30+
<reactor.version>Dysprosium-SR9</reactor.version>
3131
<rsocket.version>1.0.1</rsocket.version>
3232
<protostuff.version>1.6.0</protostuff.version>
3333
<netty.version>4.1.50.Final</netty.version>

services-api/src/main/java/io/scalecube/services/ServiceEndpoint.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ public static Builder builder() {
4848
return new Builder();
4949
}
5050

51+
public static Builder from(ServiceEndpoint serviceEndpoint) {
52+
return new Builder(serviceEndpoint);
53+
}
54+
5155
public String id() {
5256
return id;
5357
}
@@ -163,6 +167,14 @@ public static class Builder {
163167

164168
private Builder() {}
165169

170+
private Builder(ServiceEndpoint other) {
171+
this.id = other.id;
172+
this.address = other.address;
173+
this.contentTypes = new HashSet<>(other.contentTypes);
174+
this.tags = new HashMap<>(other.tags);
175+
this.serviceRegistrations = new ArrayList<>(other.serviceRegistrations);
176+
}
177+
166178
public Builder id(String id) {
167179
this.id = Objects.requireNonNull(id, "id");
168180
return this;
@@ -174,12 +186,12 @@ public Builder address(Address address) {
174186
}
175187

176188
public Builder contentTypes(Set<String> contentTypes) {
177-
this.contentTypes = Objects.requireNonNull(contentTypes, "contentTypes");
189+
this.contentTypes = new HashSet<>(Objects.requireNonNull(contentTypes, "contentTypes"));
178190
return this;
179191
}
180192

181193
public Builder tags(Map<String, String> tags) {
182-
this.tags = Objects.requireNonNull(tags, "tags");
194+
this.tags = new HashMap<>(Objects.requireNonNull(tags, "tags"));
183195
return this;
184196
}
185197

@@ -204,7 +216,7 @@ public Builder appendServiceRegistrations(
204216
*/
205217
public Builder serviceRegistrations(Collection<ServiceRegistration> serviceRegistrations) {
206218
this.serviceRegistrations =
207-
Objects.requireNonNull(serviceRegistrations, "serviceRegistrations");
219+
new ArrayList<>(Objects.requireNonNull(serviceRegistrations, "serviceRegistrations"));
208220
return this;
209221
}
210222

services-examples-parent/services-examples-runner/src/main/java/io/scalecube/services/examples/ExamplesRunner.java

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.scalecube.services.examples;
22

3+
import io.rsocket.transport.netty.client.TcpClientTransport;
4+
import io.rsocket.transport.netty.server.TcpServerTransport;
35
import io.scalecube.config.ConfigRegistry;
46
import io.scalecube.config.ConfigRegistrySettings;
57
import io.scalecube.config.audit.Slf4JConfigEventListener;
@@ -12,7 +14,9 @@
1214
import io.scalecube.services.ServiceEndpoint;
1315
import io.scalecube.services.discovery.ScalecubeServiceDiscovery;
1416
import io.scalecube.services.discovery.api.ServiceDiscovery;
17+
import io.scalecube.services.transport.rsocket.RSocketServerTransportFactory;
1518
import io.scalecube.services.transport.rsocket.RSocketServiceTransport;
19+
import java.net.InetSocketAddress;
1620
import java.nio.file.Path;
1721
import java.util.List;
1822
import java.util.Optional;
@@ -58,20 +62,27 @@ public static void main(String[] args) {
5862
.transport(
5963
() ->
6064
new RSocketServiceTransport()
61-
.tcpClient(
65+
.clientTransportFactory(
6266
loopResources ->
63-
TcpClient.newConnection()
64-
.runOn(loopResources)
65-
.wiretap(false)
66-
.noProxy()
67-
.noSSL())
68-
.tcpServer(
67+
address ->
68+
TcpClientTransport.create(
69+
TcpClient.newConnection()
70+
.host(address.host())
71+
.port(address.port())
72+
.runOn(loopResources)
73+
.wiretap(false)
74+
.noProxy()
75+
.noSSL()))
76+
.serverTransportFactory(
6977
loopResources ->
70-
TcpServer.create()
71-
.wiretap(false)
72-
.port(config.servicePort())
73-
.runOn(loopResources)
74-
.noSSL()))
78+
() ->
79+
TcpServerTransport.create(
80+
TcpServer.create()
81+
.bindAddress(
82+
() -> new InetSocketAddress(config.servicePort()))
83+
.runOn(loopResources)
84+
.wiretap(false)
85+
.noSSL())))
7586
.services(new BenchmarkServiceImpl(), new GreetingServiceImpl())
7687
.startAwait();
7788

services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientTransport.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import io.rsocket.RSocket;
44
import io.rsocket.core.RSocketConnector;
55
import io.rsocket.frame.decoder.PayloadDecoder;
6-
import io.rsocket.transport.netty.client.TcpClientTransport;
76
import io.scalecube.net.Address;
87
import io.scalecube.services.transport.api.ClientChannel;
98
import io.scalecube.services.transport.api.ClientTransport;
@@ -14,7 +13,6 @@
1413
import org.slf4j.Logger;
1514
import org.slf4j.LoggerFactory;
1615
import reactor.core.publisher.Mono;
17-
import reactor.netty.tcp.TcpClient;
1816

1917
public class RSocketClientTransport implements ClientTransport {
2018

@@ -24,17 +22,18 @@ public class RSocketClientTransport implements ClientTransport {
2422
ThreadLocal.withInitial(ConcurrentHashMap::new);
2523

2624
private final ServiceMessageCodec messageCodec;
27-
private final TcpClient tcpClient;
25+
private final RSocketClientTransportFactory clientTransportFactory;
2826

2927
/**
3028
* Constructor for this transport.
3129
*
32-
* @param messageCodec message codec
33-
* @param tcpClient tcp client
30+
* @param messageCodec messageCodec
31+
* @param clientTransportFactory clientTransportFactory
3432
*/
35-
public RSocketClientTransport(ServiceMessageCodec messageCodec, TcpClient tcpClient) {
33+
public RSocketClientTransport(
34+
ServiceMessageCodec messageCodec, RSocketClientTransportFactory clientTransportFactory) {
3635
this.messageCodec = messageCodec;
37-
this.tcpClient = tcpClient;
36+
this.clientTransportFactory = clientTransportFactory;
3837
}
3938

4039
@Override
@@ -46,10 +45,9 @@ public ClientChannel create(Address address) {
4645
}
4746

4847
private Mono<RSocket> connect(Address address, Map<Address, Mono<RSocket>> monoMap) {
49-
TcpClient tcpClient = this.tcpClient.host(address.host()).port(address.port());
5048
return RSocketConnector.create()
5149
.payloadDecoder(PayloadDecoder.DEFAULT)
52-
.connect(() -> TcpClientTransport.create(tcpClient))
50+
.connect(() -> clientTransportFactory.clientTransport(address))
5351
.doOnSuccess(
5452
rsocket -> {
5553
LOGGER.debug("[rsocket][client] Connected successfully on {}", address);
@@ -80,7 +78,6 @@ private Mono<RSocket> connect(Address address, Map<Address, Mono<RSocket>> monoM
8078
public String toString() {
8179
return new StringJoiner(", ", RSocketClientTransport.class.getSimpleName() + "[", "]")
8280
.add("messageCodec=" + messageCodec)
83-
.add("tcpClient=" + tcpClient)
8481
.toString();
8582
}
8683
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package io.scalecube.services.transport.rsocket;
2+
3+
import io.netty.channel.ChannelOption;
4+
import io.rsocket.transport.ClientTransport;
5+
import io.rsocket.transport.netty.client.TcpClientTransport;
6+
import io.rsocket.transport.netty.client.WebsocketClientTransport;
7+
import io.scalecube.net.Address;
8+
import java.util.function.Function;
9+
import reactor.netty.http.client.HttpClient;
10+
import reactor.netty.resources.LoopResources;
11+
import reactor.netty.tcp.TcpClient;
12+
13+
public interface RSocketClientTransportFactory {
14+
15+
/**
16+
* Returns default rsocket tcp client transport factory.
17+
*
18+
* @see TcpClientTransport
19+
* @return factory function for {@link RSocketClientTransportFactory}
20+
*/
21+
static Function<LoopResources, RSocketClientTransportFactory> tcp() {
22+
return (LoopResources loopResources) ->
23+
(RSocketClientTransportFactory)
24+
address ->
25+
TcpClientTransport.create(
26+
TcpClient.newConnection()
27+
.host(address.host())
28+
.port(address.port())
29+
.runOn(loopResources));
30+
}
31+
32+
/**
33+
* Returns default rsocket websocket client transport factory.
34+
*
35+
* @see WebsocketClientTransport
36+
* @return factory function for {@link RSocketClientTransportFactory}
37+
*/
38+
static Function<LoopResources, RSocketClientTransportFactory> websocket() {
39+
return (LoopResources loopResources) ->
40+
(RSocketClientTransportFactory)
41+
address ->
42+
WebsocketClientTransport.create(
43+
HttpClient.newConnection()
44+
.tcpConfiguration(
45+
tcpClient ->
46+
tcpClient
47+
.runOn(loopResources)
48+
.host(address.host())
49+
.port(address.port())
50+
.option(ChannelOption.TCP_NODELAY, true)
51+
.option(ChannelOption.SO_KEEPALIVE, true)
52+
.option(ChannelOption.SO_REUSEADDR, true)),
53+
"/");
54+
}
55+
56+
ClientTransport clientTransport(Address address);
57+
}

services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServerTransport.java

Lines changed: 15 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import io.rsocket.core.RSocketServer;
44
import io.rsocket.frame.decoder.PayloadDecoder;
55
import io.rsocket.transport.netty.server.CloseableChannel;
6-
import io.rsocket.transport.netty.server.TcpServerTransport;
76
import io.scalecube.net.Address;
87
import io.scalecube.services.methods.ServiceMethodRegistry;
98
import io.scalecube.services.transport.api.ServerTransport;
@@ -13,56 +12,44 @@
1312
import org.slf4j.Logger;
1413
import org.slf4j.LoggerFactory;
1514
import reactor.core.publisher.Mono;
16-
import reactor.netty.tcp.TcpServer;
1715

18-
/** RSocket server transport implementation. */
1916
public class RSocketServerTransport implements ServerTransport {
2017

2118
private static final Logger LOGGER = LoggerFactory.getLogger(RSocketServerTransport.class);
2219

2320
private final ServiceMessageCodec messageCodec;
24-
private final TcpServer tcpServer;
21+
private final RSocketServerTransportFactory serverTransportFactory;
2522

2623
private CloseableChannel serverChannel; // calculated
2724

2825
/**
2926
* Constructor for this server transport.
3027
*
31-
* @param messageCodec message codec
32-
* @param tcpServer tcp server
28+
* @param messageCodec messageCodec
29+
* @param serverTransportFactory serverTransportFactory
3330
*/
34-
public RSocketServerTransport(ServiceMessageCodec messageCodec, TcpServer tcpServer) {
31+
public RSocketServerTransport(
32+
ServiceMessageCodec messageCodec, RSocketServerTransportFactory serverTransportFactory) {
3533
this.messageCodec = messageCodec;
36-
this.tcpServer = tcpServer;
34+
this.serverTransportFactory = serverTransportFactory;
3735
}
3836

3937
@Override
4038
public Address address() {
41-
InetSocketAddress address = serverChannel.address();
42-
return Address.create(address.getHostString(), address.getPort());
39+
InetSocketAddress socketAddress = serverChannel.address();
40+
return Address.create(socketAddress.getAddress().getHostAddress(), socketAddress.getPort());
4341
}
4442

4543
@Override
4644
public Mono<ServerTransport> bind(ServiceMethodRegistry methodRegistry) {
4745
return Mono.defer(
48-
() -> {
49-
TcpServer tcpServer =
50-
this.tcpServer.doOnConnection(
51-
connection -> {
52-
LOGGER.debug(
53-
"[rsocket][server] Accepted connection on {}", connection.channel());
54-
connection.onDispose(
55-
() ->
56-
LOGGER.debug(
57-
"[rsocket][server] Connection closed on {}", connection.channel()));
58-
});
59-
return RSocketServer.create()
60-
.acceptor(new RSocketServiceAcceptor(messageCodec, methodRegistry))
61-
.payloadDecoder(PayloadDecoder.DEFAULT)
62-
.bind(TcpServerTransport.create(tcpServer))
63-
.doOnSuccess(channel -> serverChannel = channel)
64-
.thenReturn(this);
65-
});
46+
() ->
47+
RSocketServer.create()
48+
.acceptor(new RSocketServiceAcceptor(messageCodec, methodRegistry))
49+
.payloadDecoder(PayloadDecoder.DEFAULT)
50+
.bind(serverTransportFactory.serverTransport())
51+
.doOnSuccess(channel -> serverChannel = channel)
52+
.thenReturn(this));
6653
}
6754

6855
@Override
@@ -88,7 +75,6 @@ public Mono<Void> stop() {
8875
public String toString() {
8976
return new StringJoiner(", ", RSocketServerTransport.class.getSimpleName() + "[", "]")
9077
.add("messageCodec=" + messageCodec)
91-
.add("tcpServer=" + tcpServer)
9278
.add("serverChannel=" + serverChannel)
9379
.toString();
9480
}

0 commit comments

Comments
 (0)