Skip to content

Commit f9fb521

Browse files
committed
wip
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 000f6da commit f9fb521

File tree

3 files changed

+31
-1
lines changed

3 files changed

+31
-1
lines changed

rsocket-core/src/main/java/io/rsocket/RSocket.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,11 @@ default double availability() {
8585
}
8686

8787
@Override
88-
default void dispose() {}
88+
default void dispose() {
89+
dispose(true);
90+
}
91+
92+
default void dispose(boolean force) {}
8993

9094
@Override
9195
default boolean isDisposed() {

rsocket-core/src/main/java/io/rsocket/core/RequesterResponderSupport.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ class RequesterResponderSupport {
2424
@Nullable final StreamIdSupplier streamIdSupplier;
2525
final IntObjectMap<FrameHandler> activeStreams;
2626

27+
boolean terminating;
28+
2729
public RequesterResponderSupport(
2830
int mtu,
2931
int maxFrameLength,
@@ -88,6 +90,7 @@ public int getNextStreamId() {
8890
final StreamIdSupplier streamIdSupplier = this.streamIdSupplier;
8991
if (streamIdSupplier != null) {
9092
synchronized (this) {
93+
if
9194
return streamIdSupplier.nextStreamId(this.activeStreams);
9295
}
9396
} else {
@@ -119,6 +122,9 @@ public int addAndGetNextStreamId(FrameHandler frameHandler) {
119122
}
120123

121124
public synchronized boolean add(int streamId, FrameHandler frameHandler) {
125+
if (this.terminating) {
126+
throw
127+
}
122128
final IntObjectMap<FrameHandler> activeStreams = this.activeStreams;
123129
// copy of Map.putIfAbsent(key, value) without `streamId` boxing
124130
final FrameHandler previousHandler = activeStreams.get(streamId);
@@ -158,4 +164,8 @@ public synchronized boolean remove(int streamId, FrameHandler frameHandler) {
158164
activeStreams.remove(streamId);
159165
return true;
160166
}
167+
168+
public synchronized void terminate() {
169+
this.terminating = true;
170+
}
161171
}

rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalResumableTransportTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,28 @@
1616

1717
package io.rsocket.transport.local;
1818

19+
import io.netty.buffer.ByteBufUtil;
20+
import io.netty.buffer.Unpooled;
21+
import io.rsocket.frame.FrameHeaderCodec;
22+
import io.rsocket.frame.FrameType;
1923
import io.rsocket.test.TransportTest;
2024
import java.time.Duration;
2125
import java.util.UUID;
2226

2327
final class LocalResumableTransportTest implements TransportTest {
2428

29+
public static void main(String[] args) {
30+
31+
System.out.println(
32+
FrameHeaderCodec.frameType(
33+
Unpooled.copiedBuffer(
34+
ByteBufUtil.decodeHexDump("000000003800000000000007ef4a"))));
35+
System.out.println(
36+
FrameHeaderCodec.frameType(
37+
Unpooled.copiedBuffer(
38+
ByteBufUtil.decodeHexDump("000000002c0000000004"))));
39+
}
40+
2541
private final TransportPair transportPair =
2642
new TransportPair<>(
2743
() -> "test-" + UUID.randomUUID(),

0 commit comments

Comments
 (0)