Skip to content

Commit 6beb91b

Browse files
committed
Added example
1 parent 801313d commit 6beb91b

File tree

5 files changed

+109
-53
lines changed

5 files changed

+109
-53
lines changed

cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.scalecube.cluster.transport.api.Message;
1919
import io.scalecube.cluster.transport.api.Transport;
2020
import io.scalecube.cluster.transport.api.TransportConfig;
21+
import io.scalecube.cluster.transport.api.TransportFactory;
2122
import io.scalecube.net.Address;
2223
import io.scalecube.transport.netty.TransportImpl;
2324
import io.scalecube.utils.ServiceLoaderUtil;
@@ -30,6 +31,7 @@
3031
import java.util.Optional;
3132
import java.util.Set;
3233
import java.util.function.Function;
34+
import java.util.function.Supplier;
3335
import java.util.function.UnaryOperator;
3436
import java.util.regex.Pattern;
3537
import java.util.stream.Collectors;
@@ -158,6 +160,19 @@ public ClusterImpl transport(UnaryOperator<TransportConfig> options) {
158160
return cluster;
159161
}
160162

163+
/**
164+
* Returns a new cluster's instance which will apply the given options.
165+
*
166+
* @param supplier transport factory supplier
167+
* @return new {@code ClusterImpl} instance
168+
*/
169+
public ClusterImpl transportFactory(Supplier<TransportFactory> supplier) {
170+
Objects.requireNonNull(supplier);
171+
ClusterImpl cluster = new ClusterImpl(this);
172+
cluster.config = config.transport(opts -> opts.transportFactory(supplier.get()));
173+
return cluster;
174+
}
175+
161176
/**
162177
* Returns a new cluster's instance which will apply the given options.
163178
*
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package io.scalecube.examples;
2+
3+
import io.scalecube.cluster.Cluster;
4+
import io.scalecube.cluster.ClusterImpl;
5+
import io.scalecube.cluster.ClusterMessageHandler;
6+
import io.scalecube.cluster.transport.api.Message;
7+
import io.scalecube.transport.netty.websocket.WebsocketTransportFactory;
8+
import reactor.core.publisher.Flux;
9+
10+
/**
11+
* Exactly the same example as {@link MessagingExample} but cluster transport is websocket.
12+
*
13+
* @see io.scalecube.transport.netty.websocket.WebsocketTransportFactory
14+
* @author arvy
15+
*/
16+
public class WebsocketMessagingExample {
17+
18+
/** Main method. */
19+
public static void main(String[] args) throws Exception {
20+
// Start cluster node Alice to listen and respond for incoming greeting messages
21+
Cluster alice =
22+
new ClusterImpl()
23+
.transportFactory(WebsocketTransportFactory::new)
24+
.handler(
25+
cluster -> {
26+
return new ClusterMessageHandler() {
27+
@Override
28+
public void onMessage(Message msg) {
29+
System.out.println("Alice received: " + msg.data());
30+
cluster
31+
.send(msg.sender(), Message.fromData("Greetings from Alice"))
32+
.subscribe(null, Throwable::printStackTrace);
33+
}
34+
};
35+
})
36+
.startAwait();
37+
38+
// Join cluster node Bob to cluster with Alice, listen and respond for incoming greeting
39+
// messages
40+
Cluster bob =
41+
new ClusterImpl()
42+
.transportFactory(WebsocketTransportFactory::new)
43+
.membership(opts -> opts.seedMembers(alice.address()))
44+
.handler(
45+
cluster -> {
46+
return new ClusterMessageHandler() {
47+
@Override
48+
public void onMessage(Message msg) {
49+
System.out.println("Bob received: " + msg.data());
50+
cluster
51+
.send(msg.sender(), Message.fromData("Greetings from Bob"))
52+
.subscribe(null, Throwable::printStackTrace);
53+
}
54+
};
55+
})
56+
.startAwait();
57+
58+
// Join cluster node Carol to cluster with Alice and Bob
59+
Cluster carol =
60+
new ClusterImpl()
61+
.transportFactory(WebsocketTransportFactory::new)
62+
.membership(opts -> opts.seedMembers(alice.address(), bob.address()))
63+
.handler(
64+
cluster -> {
65+
return new ClusterMessageHandler() {
66+
@Override
67+
public void onMessage(Message msg) {
68+
System.out.println("Carol received: " + msg.data());
69+
}
70+
};
71+
})
72+
.startAwait();
73+
74+
// Send from Carol greeting message to all other cluster members (which is Alice and Bob)
75+
Message greetingMsg = Message.fromData("Greetings from Carol");
76+
77+
Flux.fromIterable(carol.otherMembers())
78+
.flatMap(member -> carol.send(member, greetingMsg))
79+
.subscribe(null, Throwable::printStackTrace);
80+
81+
// Avoid exit main thread immediately ]:->
82+
Thread.sleep(1000);
83+
}
84+
}

examples/src/main/java/io/scalecube/examples/WebsocketTransportExample.java

Lines changed: 0 additions & 44 deletions
This file was deleted.

examples/src/main/resources/log4j2.xml

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,15 @@
1212
<Loggers>
1313
<Logger name="io.netty" level="info"/>
1414
<Logger name="reactor.netty" level="info"/>
15-
<Logger name="io.scalecube.cluster.transport" level="debug"/>
16-
<Logger name="io.scalecube.cluster.fdetector" level="debug"/>
17-
<Logger name="io.scalecube.cluster.gossip" level="debug"/>
18-
<Logger name="io.scalecube.cluster.membership" level="debug"/>
19-
<Logger name="io.scalecube.cluster.metadata" level="debug"/>
20-
<Logger name="io.scalecube.cluster" level="debug"/>
21-
<Logger name="io.scalecube.services" level="debug"/>
15+
<Logger name="io.scalecube.cluster.transport" level="info"/>
16+
<Logger name="io.scalecube.cluster.fdetector" level="info"/>
17+
<Logger name="io.scalecube.cluster.gossip" level="info"/>
18+
<Logger name="io.scalecube.cluster.membership" level="info"/>
19+
<Logger name="io.scalecube.cluster.metadata" level="info"/>
20+
<Logger name="io.scalecube.cluster" level="info"/>
21+
<Logger name="io.scalecube.services" level="info"/>
2222

23-
<Root level="debug">
23+
<Root level="info">
2424
<AppenderRef ref="console"/>
2525
</Root>
2626
</Loggers>

transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io.scalecube.cluster.transport.api.TransportConfig;
1313
import io.scalecube.cluster.transport.api.TransportFactory;
1414
import io.scalecube.net.Address;
15+
import io.scalecube.transport.netty.tcp.TcpTransportFactory;
1516
import java.net.InetAddress;
1617
import java.util.Map;
1718
import java.util.Objects;
@@ -137,7 +138,7 @@ public static Mono<Transport> bind(TransportConfig config) {
137138
TransportFactory transportFactory =
138139
Optional.ofNullable(config.transportFactory())
139140
.or(() -> Optional.ofNullable(TransportFactory.INSTANCE))
140-
.orElseThrow(() -> new IllegalStateException("No TransportFactory is defined"));
141+
.orElse(new TcpTransportFactory());
141142
return transportFactory.createTransport(config).start();
142143
}
143144

0 commit comments

Comments
 (0)