Skip to content

Commit 67161c0

Browse files
authored
use MonoProcessor to close and not depend on netty (#428)
1 parent d2c8d13 commit 67161c0

File tree

2 files changed

+82
-69
lines changed

2 files changed

+82
-69
lines changed

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/NettyDuplexConnection.java

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@
1414
* limitations under the License.
1515
*/
1616
package io.rsocket.transport.netty;
17-
1817
import io.rsocket.DuplexConnection;
1918
import io.rsocket.Frame;
2019
import org.reactivestreams.Publisher;
2120
import reactor.core.publisher.Flux;
2221
import reactor.core.publisher.Mono;
22+
import reactor.core.publisher.MonoProcessor;
2323
import reactor.ipc.netty.NettyContext;
2424
import reactor.ipc.netty.NettyInbound;
2525
import reactor.ipc.netty.NettyOutbound;
@@ -28,44 +28,51 @@ public class NettyDuplexConnection implements DuplexConnection {
2828
private final NettyInbound in;
2929
private final NettyOutbound out;
3030
private final NettyContext context;
31-
31+
private final MonoProcessor<Void> onClose;
32+
3233
public NettyDuplexConnection(NettyInbound in, NettyOutbound out, NettyContext context) {
3334
this.in = in;
3435
this.out = out;
3536
this.context = context;
37+
this.onClose = MonoProcessor.create();
38+
39+
context.onClose(onClose::onComplete);
40+
this.onClose
41+
.doFinally(
42+
s -> {
43+
this.context.dispose();
44+
this.context.channel().close();
45+
})
46+
.subscribe();
3647
}
37-
48+
3849
@Override
3950
public Mono<Void> send(Publisher<Frame> frames) {
4051
return Flux.from(frames).concatMap(this::sendOne).then();
4152
}
42-
53+
4354
@Override
4455
public Mono<Void> sendOne(Frame frame) {
4556
return out.sendObject(frame.content()).then();
4657
}
47-
58+
4859
@Override
4960
public Flux<Frame> receive() {
5061
return in.receive().map(buf -> Frame.from(buf.retain()));
5162
}
52-
63+
5364
@Override
5465
public Mono<Void> close() {
55-
return Mono.fromRunnable(
56-
() -> {
57-
context.dispose();
58-
context.channel().close();
59-
});
66+
return Mono.fromRunnable(onClose::onComplete);
6067
}
61-
68+
6269
@Override
6370
public Mono<Void> onClose() {
64-
return context.onClose();
71+
return onClose;
6572
}
66-
73+
6774
@Override
6875
public double availability() {
69-
return context.isDisposed() ? 0.0 : 1.0;
76+
return onClose.isTerminated() ? 0.0 : 1.0;
7077
}
7178
}

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java

Lines changed: 60 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
* limitations under the License.
1515
*/
1616
package io.rsocket.transport.netty;
17-
1817
import static io.netty.buffer.Unpooled.wrappedBuffer;
1918
import static io.rsocket.frame.FrameHeaderFlyweight.FRAME_LENGTH_SIZE;
2019

@@ -27,6 +26,7 @@
2726
import org.reactivestreams.Publisher;
2827
import reactor.core.publisher.Flux;
2928
import reactor.core.publisher.Mono;
29+
import reactor.core.publisher.MonoProcessor;
3030
import reactor.ipc.netty.NettyContext;
3131
import reactor.ipc.netty.NettyInbound;
3232
import reactor.ipc.netty.NettyOutbound;
@@ -39,57 +39,63 @@
3939
* back on for frames received.
4040
*/
4141
public class WebsocketDuplexConnection implements DuplexConnection {
42-
private final NettyInbound in;
43-
private final NettyOutbound out;
44-
private final NettyContext context;
45-
46-
public WebsocketDuplexConnection(NettyInbound in, NettyOutbound out, NettyContext context) {
47-
this.in = in;
48-
this.out = out;
49-
this.context = context;
50-
}
51-
52-
@Override
53-
public Mono<Void> send(Publisher<Frame> frames) {
54-
return Flux.from(frames).concatMap(this::sendOne).then();
55-
}
56-
57-
@Override
58-
public Mono<Void> sendOne(Frame frame) {
59-
return out.sendObject(new BinaryWebSocketFrame(frame.content().skipBytes(FRAME_LENGTH_SIZE)))
60-
.then();
61-
}
62-
63-
@Override
64-
public Flux<Frame> receive() {
65-
return in.receive()
66-
.map(
67-
buf -> {
68-
CompositeByteBuf composite = context.channel().alloc().compositeBuffer();
69-
ByteBuf length = wrappedBuffer(new byte[FRAME_LENGTH_SIZE]);
70-
FrameHeaderFlyweight.encodeLength(length, 0, buf.readableBytes());
71-
composite.addComponents(true, length, buf.retain());
72-
return Frame.from(composite);
73-
});
74-
}
75-
76-
@Override
77-
public Mono<Void> close() {
78-
return Mono.fromRunnable(
79-
() -> {
80-
if (!context.isDisposed()) {
81-
context.channel().close();
82-
}
83-
});
84-
}
85-
86-
@Override
87-
public Mono<Void> onClose() {
88-
return context.onClose();
89-
}
90-
91-
@Override
92-
public double availability() {
93-
return context.isDisposed() ? 0.0 : 1.0;
94-
}
42+
private final NettyInbound in;
43+
private final NettyOutbound out;
44+
private final NettyContext context;
45+
private final MonoProcessor<Void> onClose;
46+
47+
public WebsocketDuplexConnection(NettyInbound in, NettyOutbound out, NettyContext context) {
48+
this.in = in;
49+
this.out = out;
50+
this.context = context;
51+
this.onClose = MonoProcessor.create();
52+
53+
context.onClose(onClose::onComplete);
54+
this.onClose
55+
.doFinally(
56+
s -> {
57+
this.context.dispose();
58+
this.context.channel().close();
59+
})
60+
.subscribe();
61+
}
62+
63+
@Override
64+
public Mono<Void> send(Publisher<Frame> frames) {
65+
return Flux.from(frames).concatMap(this::sendOne).then();
66+
}
67+
68+
@Override
69+
public Mono<Void> sendOne(Frame frame) {
70+
return out.sendObject(new BinaryWebSocketFrame(frame.content().skipBytes(FRAME_LENGTH_SIZE)))
71+
.then();
72+
}
73+
74+
@Override
75+
public Flux<Frame> receive() {
76+
return in.receive()
77+
.map(
78+
buf -> {
79+
CompositeByteBuf composite = context.channel().alloc().compositeBuffer();
80+
ByteBuf length = wrappedBuffer(new byte[FRAME_LENGTH_SIZE]);
81+
FrameHeaderFlyweight.encodeLength(length, 0, buf.readableBytes());
82+
composite.addComponents(true, length, buf.retain());
83+
return Frame.from(composite);
84+
});
85+
}
86+
87+
@Override
88+
public Mono<Void> close() {
89+
return Mono.fromRunnable(onClose::onComplete);
90+
}
91+
92+
@Override
93+
public Mono<Void> onClose() {
94+
return onClose;
95+
}
96+
97+
@Override
98+
public double availability() {
99+
return onClose.isTerminated() ? 0.0 : 1.0;
100+
}
95101
}

0 commit comments

Comments
 (0)