Skip to content

Commit 92a3653

Browse files
authored
clear connections map after streams are cleaned up to signal subscribersa (#452)
1 parent 4632234 commit 92a3653

File tree

2 files changed

+47
-20
lines changed

2 files changed

+47
-20
lines changed

rsocket-core/src/main/java/io/rsocket/RSocketClient.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -432,21 +432,25 @@ private boolean contains(int streamId) {
432432
}
433433

434434
protected void cleanup() {
435-
Collection<Subscriber<Payload>> subscribers;
436-
Collection<LimitableRequestPublisher> publishers;
437-
synchronized (RSocketClient.this) {
438-
subscribers = receivers.values();
439-
publishers = senders.values();
440-
441-
senders.clear();
442-
receivers.clear();
443-
}
435+
try {
436+
Collection<Subscriber<Payload>> subscribers;
437+
Collection<LimitableRequestPublisher> publishers;
438+
synchronized (RSocketClient.this) {
439+
subscribers = receivers.values();
440+
publishers = senders.values();
441+
}
444442

445-
subscribers.forEach(this::cleanUpSubscriber);
446-
publishers.forEach(this::cleanUpLimitableRequestPublisher);
443+
subscribers.forEach(this::cleanUpSubscriber);
444+
publishers.forEach(this::cleanUpLimitableRequestPublisher);
447445

448-
if (null != keepAliveSendSub) {
449-
keepAliveSendSub.dispose();
446+
if (null != keepAliveSendSub) {
447+
keepAliveSendSub.dispose();
448+
}
449+
} finally {
450+
synchronized (this) {
451+
senders.clear();
452+
receivers.clear();
453+
}
450454
}
451455
}
452456

rsocket-core/src/test/java/io/rsocket/RSocketTest.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,11 @@
1616

1717
package io.rsocket;
1818

19-
import static org.hamcrest.Matchers.empty;
20-
import static org.hamcrest.Matchers.is;
21-
import static org.mockito.ArgumentMatchers.any;
22-
import static org.mockito.Mockito.verify;
23-
2419
import io.rsocket.exceptions.ApplicationException;
2520
import io.rsocket.test.util.LocalDuplexConnection;
2621
import io.rsocket.test.util.TestSubscriber;
2722
import io.rsocket.util.DefaultPayload;
2823
import io.rsocket.util.EmptyPayload;
29-
import java.util.ArrayList;
30-
import java.util.concurrent.CountDownLatch;
3124
import org.hamcrest.MatcherAssert;
3225
import org.junit.Assert;
3326
import org.junit.Rule;
@@ -41,6 +34,15 @@
4134
import reactor.core.publisher.Flux;
4235
import reactor.core.publisher.Mono;
4336

37+
import java.nio.channels.ClosedChannelException;
38+
import java.util.ArrayList;
39+
import java.util.concurrent.CountDownLatch;
40+
41+
import static org.hamcrest.Matchers.empty;
42+
import static org.hamcrest.Matchers.is;
43+
import static org.mockito.ArgumentMatchers.any;
44+
import static org.mockito.Mockito.verify;
45+
4446
public class RSocketTest {
4547

4648
@Rule public final SocketRule rule = new SocketRule();
@@ -85,6 +87,22 @@ public void testChannel() throws Exception {
8587

8688
latch.await();
8789
}
90+
91+
@Test(timeout = 2_000L)
92+
public void testCleanup() throws Exception {
93+
CountDownLatch latch = new CountDownLatch(1);
94+
rule.crs
95+
.requestStream(DefaultPayload.create("hi"))
96+
.doOnError(t -> {
97+
Assert.assertTrue(t instanceof ClosedChannelException);
98+
latch.countDown();
99+
})
100+
.subscribe();
101+
102+
rule.crs.cleanup();
103+
104+
latch.await();
105+
}
88106

89107
public static class SocketRule extends ExternalResource {
90108

@@ -124,6 +142,11 @@ protected void init() {
124142
public Mono<Payload> requestResponse(Payload payload) {
125143
return Mono.just(payload);
126144
}
145+
146+
@Override
147+
public Flux<Payload> requestStream(Payload payload) {
148+
return Flux.never();
149+
}
127150

128151
@Override
129152
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {

0 commit comments

Comments
 (0)