Skip to content

Commit b81866f

Browse files
committed
Added RSocketClientTransportFactory and RSocketClientTransportFactory
1 parent 2ca23e4 commit b81866f

File tree

3 files changed

+25
-9
lines changed

3 files changed

+25
-9
lines changed

services-examples-parent/services-examples-runner/src/main/java/io/scalecube/services/examples/ExamplesRunner.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import io.scalecube.services.discovery.ScalecubeServiceDiscovery;
1616
import io.scalecube.services.discovery.api.ServiceDiscovery;
1717
import io.scalecube.services.transport.rsocket.RSocketServiceTransport;
18+
import java.net.InetSocketAddress;
1819
import java.nio.file.Path;
1920
import java.util.List;
2021
import java.util.Optional;
@@ -76,9 +77,10 @@ public static void main(String[] args) {
7677
() ->
7778
TcpServerTransport.create(
7879
TcpServer.create()
79-
.wiretap(false)
80-
.port(config.servicePort())
80+
.bindAddress(
81+
() -> new InetSocketAddress(config.servicePort()))
8182
.runOn(loopResources)
83+
.wiretap(false)
8284
.noSSL())))
8385
.services(new BenchmarkServiceImpl(), new GreetingServiceImpl())
8486
.startAwait();

services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServerTransport.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ public RSocketServerTransport(
3636

3737
@Override
3838
public Address address() {
39-
InetSocketAddress address = serverChannel.address();
40-
return Address.create(address.getHostString(), address.getPort());
39+
InetSocketAddress socketAddress = serverChannel.address();
40+
return Address.create(socketAddress.getAddress().getHostAddress(), socketAddress.getPort());
4141
}
4242

4343
@Override

services/src/main/java/io/scalecube/services/Microservices.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import io.scalecube.services.transport.api.ServiceMessageDataDecoder;
3030
import io.scalecube.services.transport.api.ServiceTransport;
3131
import java.lang.management.ManagementFactory;
32+
import java.net.InetAddress;
33+
import java.net.UnknownHostException;
3234
import java.util.ArrayList;
3335
import java.util.Arrays;
3436
import java.util.Collections;
@@ -53,6 +55,7 @@
5355
import org.slf4j.LoggerFactory;
5456
import reactor.core.Disposable;
5557
import reactor.core.Disposables;
58+
import reactor.core.Exceptions;
5659
import reactor.core.publisher.DirectProcessor;
5760
import reactor.core.publisher.Flux;
5861
import reactor.core.publisher.FluxSink;
@@ -790,10 +793,7 @@ private Mono<ServiceTransportBootstrap> start(Microservices microservices) {
790793
.doOnSuccess(transport -> serverTransport = transport)
791794
.map(
792795
transport -> {
793-
this.transportAddress =
794-
Address.create(
795-
Address.getLocalIpAddress().getHostAddress(),
796-
serverTransport.address().port());
796+
this.transportAddress = prepareAddress(serverTransport.address());
797797
this.clientTransport = serviceTransport.clientTransport();
798798
return this;
799799
})
@@ -804,7 +804,7 @@ private Mono<ServiceTransportBootstrap> start(Microservices microservices) {
804804
LOGGER.info(
805805
"[{}][serviceTransport][start] Started, address: {}",
806806
microservices.id(),
807-
this.transportAddress))
807+
this.serverTransport.address()))
808808
.doOnError(
809809
ex ->
810810
LOGGER.error(
@@ -813,6 +813,20 @@ private Mono<ServiceTransportBootstrap> start(Microservices microservices) {
813813
ex.toString()));
814814
}
815815

816+
private static Address prepareAddress(Address address) {
817+
final InetAddress inetAddress;
818+
try {
819+
inetAddress = InetAddress.getByName(address.host());
820+
} catch (UnknownHostException e) {
821+
throw Exceptions.propagate(e);
822+
}
823+
if (inetAddress.isAnyLocalAddress()) {
824+
return Address.create(Address.getLocalIpAddress().getHostAddress(), address.port());
825+
} else {
826+
return Address.create(inetAddress.getHostAddress(), address.port());
827+
}
828+
}
829+
816830
private Mono<Void> shutdown() {
817831
return Mono.defer(
818832
() ->

0 commit comments

Comments
 (0)