Skip to content

Commit 6e823a1

Browse files
committed
Switch to Reactor snapshots
See gh-943 Signed-off-by: Rossen Stoyanchev <[email protected]>
1 parent 09abd12 commit 6e823a1

File tree

2 files changed

+10
-7
lines changed

2 files changed

+10
-7
lines changed

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ subprojects {
3232
apply plugin: 'io.spring.dependency-management'
3333
apply plugin: 'com.github.sherter.google-java-format'
3434

35-
ext['reactor-bom.version'] = '2020.0.0-RC1'
35+
ext['reactor-bom.version'] = '2020.0.0-SNAPSHOT'
3636
ext['logback.version'] = '1.2.3'
3737
ext['netty-bom.version'] = '4.1.52.Final'
3838
ext['netty-boringssl.version'] = '2.0.34.Final'
@@ -96,7 +96,7 @@ subprojects {
9696
mavenCentral()
9797

9898
maven {
99-
url 'https://repo.spring.io/libs-snapshot'
99+
url 'https://repo.spring.io/snapshot'
100100
content {
101101
includeGroup "io.projectreactor"
102102
includeGroup "io.projectreactor.netty"

rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public class ResumableDuplexConnection extends Flux<ByteBuf>
6868
public ResumableDuplexConnection(
6969
String tag, DuplexConnection initialConnection, ResumableFramesStore resumableFramesStore) {
7070
this.tag = tag;
71-
this.onConnectionClosedSink = Sinks.many().unsafe().unicast().onBackpressureBuffer();
71+
this.onConnectionClosedSink = Sinks.unsafe().many().unicast().onBackpressureBuffer();
7272
this.resumableFramesStore = resumableFramesStore;
7373
this.savableFramesSender = new UnboundedProcessor();
7474
this.framesSaverDisposable = resumableFramesStore.saveFrames(savableFramesSender).subscribe();
@@ -114,7 +114,10 @@ void initConnection(DuplexConnection nextConnection) {
114114
__ -> {
115115
frameReceivingSubscriber.dispose();
116116
disposable.dispose();
117-
onConnectionClosedSink.emitNext(currentConnectionIndex);
117+
Sinks.Emission emission = onConnectionClosedSink.tryEmitNext(currentConnectionIndex);
118+
if (emission.equals(Sinks.Emission.OK)) {
119+
logger.error("Failed to notify session of closed connection: {}", emission);
120+
}
118121
})
119122
.subscribe();
120123
}
@@ -160,13 +163,13 @@ public void sendErrorAndClose(RSocketErrorException rSocketErrorException) {
160163
t -> {
161164
framesSaverDisposable.dispose();
162165
savableFramesSender.dispose();
163-
onConnectionClosedSink.emitComplete();
166+
onConnectionClosedSink.tryEmitComplete();
164167
onClose.onError(t);
165168
},
166169
() -> {
167170
framesSaverDisposable.dispose();
168171
savableFramesSender.dispose();
169-
onConnectionClosedSink.emitComplete();
172+
onConnectionClosedSink.tryEmitComplete();
170173
final Throwable cause = rSocketErrorException.getCause();
171174
if (cause == null) {
172175
onClose.onComplete();
@@ -206,7 +209,7 @@ public void dispose() {
206209
framesSaverDisposable.dispose();
207210
activeReceivingSubscriber.dispose();
208211
savableFramesSender.dispose();
209-
onConnectionClosedSink.emitComplete();
212+
onConnectionClosedSink.tryEmitComplete();
210213
onClose.onComplete();
211214
}
212215

0 commit comments

Comments
 (0)