File tree Expand file tree Collapse file tree 5 files changed +27
-4
lines changed
main/kotlin/io/rsocket/kotlin/internal
test/kotlin/io/rsocket/kotlin
rsocket-transport-okhttp/gradle/dependency-locks Expand file tree Collapse file tree 5 files changed +27
-4
lines changed Original file line number Diff line number Diff line change @@ -41,9 +41,10 @@ subprojects {
4141 api ' io.netty:netty-buffer'
4242 api ' io.reactivex.rxjava2:rxjava'
4343 api ' org.jetbrains.kotlin:kotlin-stdlib-jdk7'
44+ api ' org.slf4j:slf4j-api'
45+
4446 compileOnly ' com.google.code.findbugs:jsr305'
4547
46- testImplementation ' org.slf4j:slf4j-api'
4748 testImplementation ' junit:junit'
4849 testImplementation ' org.mockito:mockito-core'
4950 testImplementation ' org.hamcrest:hamcrest-library'
Original file line number Diff line number Diff line change @@ -10,3 +10,4 @@ org.jetbrains.kotlin:kotlin-stdlib-jdk7:1.3.50
1010org.jetbrains.kotlin:kotlin-stdlib:1.3.50
1111org.jetbrains:annotations:13.0
1212org.reactivestreams:reactive-streams:1.0.2
13+ org.slf4j:slf4j-api:1.7.28
Original file line number Diff line number Diff line change @@ -61,6 +61,10 @@ internal class RSocketResponder(
6161 connection
6262 .onClose()
6363 .subscribe({ completion.complete() }, errorConsumer)
64+
65+ requestHandler
66+ .onClose()
67+ .subscribe({ completion.complete() }, errorConsumer)
6468 }
6569
6670 override fun fireAndForget (payload : Payload ): Completable {
@@ -291,7 +295,13 @@ internal class RSocketResponder(
291295
292296 private fun completeOnce (err : Throwable ) {
293297 if (completed.compareAndSet(false , true )) {
298+
294299 receiveDisposable.dispose()
300+
301+ connection
302+ .close()
303+ .subscribe({}, errorConsumer)
304+
295305 requestHandler
296306 .close()
297307 .subscribe({}, errorConsumer)
Original file line number Diff line number Diff line change @@ -39,6 +39,15 @@ class RSocketResponderTest {
3939 @get:Rule
4040 val rule = ServerSocketRule ()
4141
42+ @Test(timeout = 2000 )
43+ fun testRequestHandlerCloseTerminatesRSocket () {
44+ rule.acceptingSocket.close()
45+ .andThen(
46+ rule.rsocket.onClose()
47+ .mergeWith(rule.conn.onClose())
48+ ).blockingAwait()
49+ }
50+
4251 @Test(timeout = 2000 )
4352 @Throws(Exception ::class )
4453 fun testHandleResponseFrameNoError () {
@@ -110,7 +119,7 @@ class RSocketResponderTest {
110119
111120 lateinit var sender: PublishProcessor <Frame >
112121 lateinit var receiver: PublishProcessor <Frame >
113- private lateinit var conn: LocalDuplexConnection
122+ lateinit var conn: LocalDuplexConnection
114123 lateinit var errors: MutableList <Throwable >
115124 internal lateinit var rsocket: RSocketResponder
116125
@@ -137,9 +146,10 @@ class RSocketResponderTest {
137146 }
138147
139148 fun setAccSocket (acceptingSocket : RSocket ) {
149+ val cur = this .acceptingSocket
150+ cur.close().subscribe()
151+ cur.onClose().blockingAwait()
140152 this .acceptingSocket = acceptingSocket
141- acceptingSocket.close().subscribe()
142- acceptingSocket.onClose().blockingAwait()
143153 init ()
144154 }
145155
Original file line number Diff line number Diff line change @@ -12,3 +12,4 @@ org.jetbrains.kotlin:kotlin-stdlib-jdk7:1.3.50
1212org.jetbrains.kotlin:kotlin-stdlib:1.3.50
1313org.jetbrains:annotations:13.0
1414org.reactivestreams:reactive-streams:1.0.2
15+ org.slf4j:slf4j-api:1.7.28
You can’t perform that action at this time.
0 commit comments