Skip to content

Commit b91721d

Browse files
steveguryrobertroeser
authored andcommitted
Loadbalancer: closing doesn't subscribe to the underlying (#148)
* Loadbalancer: closing doesn't subscribe to the underlying ***Problem*** Closing the loadbalancer doesn't properly subscribe to the `Publisher`s returned by the `close()` methods of the underlying `ReactiveSocket`. Thus, the close event is lost at the LoadBalancer level. ***Solution*** Properly subscribe to the close `Publisher`s and propagate the `onComplete` events when all `ReactiveSocket` are closed. (I choose to ignore any exception that happened during the close, i.e. only propagate 1 `onComplete`). * Address comments
1 parent 52a670f commit b91721d

File tree

3 files changed

+192
-14
lines changed

3 files changed

+192
-14
lines changed

reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancer.java

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import io.reactivesocket.internal.EmptySubject;
2828
import io.reactivesocket.internal.Publishers;
2929
import io.reactivesocket.internal.Publishers;
30+
import io.reactivesocket.internal.rx.EmptySubscriber;
31+
import io.reactivesocket.internal.rx.EmptySubscription;
3032
import io.reactivesocket.rx.Completable;
3133
import io.reactivesocket.client.stat.FrugalQuantile;
3234
import io.reactivesocket.client.stat.Quantile;
@@ -42,6 +44,7 @@
4244
import java.util.concurrent.ThreadLocalRandom;
4345
import java.util.concurrent.TimeUnit;
4446
import java.util.concurrent.atomic.AtomicBoolean;
47+
import java.util.concurrent.atomic.AtomicInteger;
4548
import java.util.concurrent.atomic.AtomicLong;
4649
import java.util.function.Consumer;
4750

@@ -468,22 +471,41 @@ public synchronized String toString() {
468471

469472
@Override
470473
public Publisher<Void> close() {
471-
return s -> {
472-
Publishers.afterTerminate(onClose(), () -> {
473-
synchronized (this) {
474-
factoryRefresher.close();
475-
activeFactories.clear();
476-
activeSockets.forEach(rs -> {
477-
try {
478-
rs.close();
479-
} catch (Exception e) {
480-
logger.warn("Exception while closing a ReactiveSocket", e);
474+
return subscriber -> {
475+
subscriber.onSubscribe(EmptySubscription.INSTANCE);
476+
477+
synchronized (this) {
478+
factoryRefresher.close();
479+
activeFactories.clear();
480+
AtomicInteger n = new AtomicInteger(activeSockets.size());
481+
482+
activeSockets.forEach(rs -> {
483+
rs.close().subscribe(new Subscriber<Void>() {
484+
@Override
485+
public void onSubscribe(Subscription s) {
486+
s.request(Long.MAX_VALUE);
487+
}
488+
489+
@Override
490+
public void onNext(Void aVoid) {}
491+
492+
@Override
493+
public void onError(Throwable t) {
494+
logger.warn("Exception while closing a ReactiveSocket", t);
495+
onComplete();
496+
}
497+
498+
@Override
499+
public void onComplete() {
500+
if (n.decrementAndGet() == 0) {
501+
subscriber.onComplete();
502+
closeSubject.subscribe(EmptySubscriber.INSTANCE);
503+
closeSubject.onComplete();
504+
}
481505
}
482506
});
483-
}
484-
});
485-
closeSubject.subscribe(s);
486-
closeSubject.onComplete();
507+
});
508+
}
487509
};
488510
}
489511

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package io.reactivesocket.internal.rx;
2+
3+
import org.reactivestreams.Subscriber;
4+
import org.reactivestreams.Subscription;
5+
6+
public enum EmptySubscriber implements Subscriber<Object> {
7+
INSTANCE();
8+
9+
@Override
10+
public void onSubscribe(Subscription s) {
11+
12+
}
13+
14+
@Override
15+
public void onNext(Object t) {}
16+
17+
@Override
18+
public void onError(Throwable t) {}
19+
20+
@Override
21+
public void onComplete() {}
22+
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package io.reactivesocket.integration;
2+
3+
import io.reactivesocket.*;
4+
import io.reactivesocket.client.ClientBuilder;
5+
import io.reactivesocket.internal.Publishers;
6+
import io.reactivesocket.test.TestUtil;
7+
import io.reactivesocket.transport.tcp.client.TcpReactiveSocketConnector;
8+
import io.reactivesocket.transport.tcp.server.TcpReactiveSocketServer;
9+
import io.reactivesocket.util.Unsafe;
10+
import org.junit.Test;
11+
import org.reactivestreams.Publisher;
12+
import org.reactivestreams.Subscriber;
13+
import org.reactivestreams.Subscription;
14+
15+
import java.net.InetSocketAddress;
16+
import java.net.SocketAddress;
17+
import java.util.Collections;
18+
import java.util.List;
19+
import java.util.concurrent.ExecutionException;
20+
import java.util.concurrent.TimeUnit;
21+
import java.util.concurrent.TimeoutException;
22+
import java.util.concurrent.atomic.AtomicInteger;
23+
24+
import static org.junit.Assert.*;
25+
import static rx.RxReactiveStreams.toObservable;
26+
27+
public class IntegrationTest {
28+
29+
private interface TestingServer {
30+
int requestCount();
31+
int disconnectionCount();
32+
SocketAddress getListeningAddress();
33+
}
34+
35+
private TestingServer createServer() {
36+
AtomicInteger requestCounter = new AtomicInteger();
37+
AtomicInteger disconnectionCounter = new AtomicInteger();
38+
39+
ConnectionSetupHandler setupHandler = (setupPayload, reactiveSocket) -> {
40+
reactiveSocket.onClose().subscribe(new Subscriber<Void>() {
41+
@Override
42+
public void onSubscribe(Subscription s) {
43+
s.request(Long.MAX_VALUE);
44+
}
45+
46+
@Override
47+
public void onNext(Void aVoid) {}
48+
49+
@Override
50+
public void onError(Throwable t) {}
51+
52+
@Override
53+
public void onComplete() {
54+
disconnectionCounter.incrementAndGet();
55+
}
56+
});
57+
return new RequestHandler.Builder()
58+
.withRequestResponse(
59+
payload -> subscriber -> subscriber.onSubscribe(new Subscription() {
60+
@Override
61+
public void request(long n) {
62+
requestCounter.incrementAndGet();
63+
subscriber.onNext(TestUtil.utf8EncodedPayload("RESPONSE", "NO_META"));
64+
subscriber.onComplete();
65+
}
66+
67+
@Override
68+
public void cancel() {}
69+
})
70+
)
71+
.build();
72+
};
73+
74+
SocketAddress addr = new InetSocketAddress("127.0.0.1", 0);
75+
TcpReactiveSocketServer.StartedServer server =
76+
TcpReactiveSocketServer.create(addr).start(setupHandler);
77+
78+
return new TestingServer() {
79+
@Override
80+
public int requestCount() {
81+
return requestCounter.get();
82+
}
83+
84+
@Override
85+
public int disconnectionCount() {
86+
return disconnectionCounter.get();
87+
}
88+
89+
@Override
90+
public SocketAddress getListeningAddress() {
91+
return server.getServerAddress();
92+
}
93+
};
94+
}
95+
96+
private ReactiveSocket createClient(SocketAddress addr) throws InterruptedException, ExecutionException, TimeoutException {
97+
List<SocketAddress> addrs = Collections.singletonList(addr);
98+
Publisher<List<SocketAddress>> src = Publishers.just(addrs);
99+
100+
ConnectionSetupPayload setupPayload =
101+
ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.HONOR_LEASE);
102+
TcpReactiveSocketConnector tcp = TcpReactiveSocketConnector.create(setupPayload, Throwable::printStackTrace);
103+
104+
Publisher<ReactiveSocket> socketPublisher =
105+
ClientBuilder.<SocketAddress>instance()
106+
.withSource(src)
107+
.withConnector(tcp)
108+
.build();
109+
110+
return Unsafe.blockingSingleWait(socketPublisher, 5, TimeUnit.SECONDS);
111+
}
112+
113+
@Test(timeout = 2_000L)
114+
public void testRequest() throws ExecutionException, InterruptedException, TimeoutException {
115+
TestingServer server = createServer();
116+
ReactiveSocket client = createClient(server.getListeningAddress());
117+
118+
toObservable(client.requestResponse(TestUtil.utf8EncodedPayload("RESPONSE", "NO_META")))
119+
.toBlocking()
120+
.subscribe();
121+
assertTrue("Server see the request", server.requestCount() > 0);
122+
}
123+
124+
@Test(timeout = 2_000L)
125+
public void testClose() throws ExecutionException, InterruptedException, TimeoutException {
126+
TestingServer server = createServer();
127+
ReactiveSocket client = createClient(server.getListeningAddress());
128+
129+
toObservable(client.close()).toBlocking().subscribe();
130+
131+
Thread.sleep(100);
132+
assertTrue("Server see disconnection", server.disconnectionCount() > 0);
133+
}
134+
}

0 commit comments

Comments
 (0)