1616
1717package io.rsocket.kotlin.internal
1818
19- import io.reactivex.Completable
20- import io.reactivex.Flowable
21- import io.reactivex.Single
19+ import io.reactivex.*
2220import io.reactivex.disposables.Disposable
2321import io.rsocket.kotlin.*
2422import io.rsocket.kotlin.Frame.Request.initialRequestN
@@ -27,6 +25,7 @@ import io.rsocket.kotlin.exceptions.ApplicationException
2725import io.rsocket.kotlin.internal.frame.FrameHeaderFlyweight.FLAGS_C
2826import io.rsocket.kotlin.internal.frame.FrameHeaderFlyweight.FLAGS_M
2927import io.rsocket.kotlin.DefaultPayload
28+ import io.rsocket.kotlin.internal.util.reactiveStreamsRequestN
3029import org.reactivestreams.Publisher
3130import org.reactivestreams.Subscriber
3231import org.reactivestreams.Subscription
@@ -57,8 +56,7 @@ internal class RSocketResponder(
5756
5857 receiveDisposable = connection
5958 .receive()
60- .concatMap { frame -> handleFrame(frame) }
61- .subscribe({}, { completion.error(it) })
59+ .subscribe({ handleFrame(it) }, { completion.error(it) })
6260
6361 connection
6462 .onClose()
@@ -111,8 +109,8 @@ internal class RSocketResponder(
111109
112110 override fun onClose (): Completable = connection.onClose()
113111
114- private fun handleFrame (frame : Frame ): Flowable < Void > {
115- return try {
112+ private fun handleFrame (frame : Frame ) {
113+ try {
116114 val streamId = frame.streamId
117115 when (frame.type) {
118116 FrameType .FIRE_AND_FORGET -> handleFireAndForget(streamId, fireAndForget(DefaultPayload (frame)))
@@ -121,96 +119,126 @@ internal class RSocketResponder(
121119 FrameType .REQUEST_N -> handleRequestN(streamId, frame)
122120 FrameType .REQUEST_STREAM -> handleStream(streamId, requestStream(DefaultPayload (frame)), initialRequestN(frame))
123121 FrameType .REQUEST_CHANNEL -> handleChannel(streamId, frame)
124- FrameType .METADATA_PUSH -> metadataPush(DefaultPayload (frame))
122+ FrameType .METADATA_PUSH -> handleMetadataPush( metadataPush(DefaultPayload (frame) ))
125123 FrameType .NEXT -> handleNext(streamId, frame)
126124 FrameType .COMPLETE -> handleComplete(streamId)
127125 FrameType .ERROR -> handleError(streamId, frame)
128126 FrameType .NEXT_COMPLETE -> handleNextComplete(streamId, frame)
129127 else -> handleUnsupportedFrame(frame)
130- }.toFlowable()
128+ }
131129 } finally {
132130 frame.release()
133131 }
134132 }
135133
136- private fun handleUnsupportedFrame (frame : Frame ): Completable {
134+ private fun handleUnsupportedFrame (frame : Frame ) {
137135 errorConsumer(IllegalArgumentException (" Unsupported frame: $frame " ))
138- return Completable .complete()
139136 }
140137
141- private fun handleNextComplete (streamId : Int , frame : Frame ): Completable {
138+ private fun handleNextComplete (streamId : Int , frame : Frame ) {
142139 val receiver = channelReceivers[streamId]
143140 receiver?.onNext(DefaultPayload (frame))
144141 receiver?.onComplete()
145- return Completable .complete()
146142 }
147143
148- private fun handleError (streamId : Int , frame : Frame ): Completable {
144+ private fun handleError (streamId : Int , frame : Frame ) {
149145 val receiver = channelReceivers[streamId]
150146 receiver?.onError(ApplicationException (Frame .Error .message(frame)))
151- return Completable .complete()
152147 }
153148
154- private fun handleComplete (streamId : Int ): Completable {
149+ private fun handleComplete (streamId : Int ) {
155150 val receiver = channelReceivers[streamId]
156151 receiver?.onComplete()
157- return Completable .complete()
158152 }
159153
160- private fun handleNext (streamId : Int , frame : Frame ): Completable {
154+ private fun handleNext (streamId : Int , frame : Frame ) {
161155 val receiver = channelReceivers[streamId]
162156 receiver?.onNext(DefaultPayload (frame))
163- return Completable .complete()
164157 }
165158
166159 private fun handleFireAndForget (streamId : Int ,
167- result : Completable ): Completable {
168- return result
169- .doOnSubscribe { d -> sendingSubscriptions[streamId] = subscription(d) }
170- .doOnError(errorConsumer)
171- .doFinally { sendingSubscriptions - = streamId }
160+ result : Completable ) {
161+ result.subscribe(object : CompletableObserver {
162+ override fun onComplete () {
163+ sendingSubscriptions - = streamId
164+ }
165+
166+ override fun onSubscribe (d : Disposable ) {
167+ sendingSubscriptions[streamId] = subscription(d)
168+ }
169+
170+ override fun onError (e : Throwable ) {
171+ sendingSubscriptions - = streamId
172+ errorConsumer(e)
173+ }
174+ })
172175 }
173176
174177 private fun handleRequestResponse (streamId : Int ,
175- response : Single <Payload >): Completable {
176- return response
177- .doOnSubscribe { d -> sendingSubscriptions[streamId] = subscription(d) }
178- .map { payload ->
179- var flags = FLAGS_C
180- if (payload.hasMetadata) {
181- flags = Frame .setFlag(flags, FLAGS_M )
182- }
183- Frame .PayloadFrame .from(
184- streamId,
185- FrameType .NEXT_COMPLETE ,
186- payload,
187- flags)
178+ response : Single <Payload >) {
179+ response.subscribe(object : SingleObserver <Payload > {
180+
181+ override fun onSuccess (payload : Payload ) {
182+ sendingSubscriptions - = streamId
183+ var flags = FLAGS_C
184+ if (payload.hasMetadata) {
185+ flags = Frame .setFlag(flags, FLAGS_M )
188186 }
189- .onErrorResumeNext { t -> Single .just(Frame .Error .from(streamId, t)) }
190- .doOnSuccess { frameSender.send(it) }
191- .doFinally { sendingSubscriptions - = streamId }
192- .ignoreElement()
187+ frameSender.send(Frame .PayloadFrame .from(
188+ streamId,
189+ FrameType .NEXT_COMPLETE ,
190+ payload,
191+ flags))
192+ }
193+
194+ override fun onSubscribe (d : Disposable ) {
195+ sendingSubscriptions[streamId] = subscription(d)
196+ }
197+
198+ override fun onError (e : Throwable ) {
199+ sendingSubscriptions - = streamId
200+ val frame = when (e) {
201+ is NoSuchElementException -> Frame .PayloadFrame .from(streamId, FrameType .COMPLETE )
202+ else -> Frame .Error .from(streamId, e)
203+ }
204+ frameSender.send(frame)
205+ }
206+ })
193207 }
194208
195209 private fun handleStream (streamId : Int ,
196210 response : Flowable <Payload >,
197- initialRequestN : Int ): Completable {
211+ initialRequestN : Int ) {
198212 response
199- .map { payload -> Frame .PayloadFrame .from(streamId, FrameType .NEXT , payload) }
200213 .compose { frameFlux ->
201214 val frames = RequestingPublisher .wrap(frameFlux)
202215 sendingSubscriptions[streamId] = frames
203- frames.request(initialRequestN.toLong( ))
216+ frames.request(reactiveStreamsRequestN(initialRequestN ))
204217 frames
205218 }
206- .concatWith(Flowable .just(Frame .PayloadFrame .from(streamId, FrameType .COMPLETE )))
207- .onErrorResumeNext { t: Throwable -> Flowable .just(Frame .Error .from(streamId, t)) }
208- .doFinally { sendingSubscriptions - = streamId }
209- .subscribe { frameSender.send(it) }
210- return Completable .complete()
219+ .subscribe(object : Subscriber <Payload > {
220+
221+ override fun onSubscribe (s : Subscription ) {
222+ s.request(Long .MAX_VALUE )
223+ }
224+
225+ override fun onNext (payload : Payload ) {
226+ frameSender.send(Frame .PayloadFrame .from(streamId, FrameType .NEXT , payload))
227+ }
228+
229+ override fun onComplete () {
230+ sendingSubscriptions - = streamId
231+ frameSender.send(Frame .PayloadFrame .from(streamId, FrameType .COMPLETE ))
232+ }
233+
234+ override fun onError (t : Throwable ) {
235+ sendingSubscriptions - = streamId
236+ frameSender.send(Frame .Error .from(streamId, t))
237+ }
238+ })
211239 }
212240
213- private fun handleChannel (streamId : Int , firstFrame : Frame ): Completable {
241+ private fun handleChannel (streamId : Int , firstFrame : Frame ) {
214242 val receiver = StreamReceiver .create()
215243 channelReceivers[streamId] = receiver
216244
@@ -222,25 +250,32 @@ internal class RSocketResponder(
222250
223251 receiver.onNext(DefaultPayload (firstFrame))
224252
225- return handleStream(
253+ handleStream(
226254 streamId,
227255 requestChannel(request),
228256 initialRequestN(firstFrame))
229257 }
230258
231- private fun handleCancel (streamId : Int ): Completable {
259+ private fun handleMetadataPush (result : Completable ) {
260+ result.subscribe(object : CompletableObserver {
261+ override fun onComplete () {
262+ }
263+
264+ override fun onSubscribe (d : Disposable ) {
265+ }
266+
267+ override fun onError (e : Throwable ) = errorConsumer(e)
268+ })
269+ }
270+
271+ private fun handleCancel (streamId : Int ) {
232272 val subscription = sendingSubscriptions.remove(streamId)
233273 subscription?.cancel()
234- return Completable .complete()
235274 }
236275
237- private fun handleRequestN (streamId : Int , frame : Frame ): Completable {
276+ private fun handleRequestN (streamId : Int , frame : Frame ) {
238277 val subscription = sendingSubscriptions[streamId]
239- subscription?.let {
240- val n = Frame .RequestN .requestN(frame).toLong()
241- it.request(if (n >= Integer .MAX_VALUE ) Long .MAX_VALUE else n)
242- }
243- return Completable .complete()
278+ subscription?.request(reactiveStreamsRequestN(Frame .RequestN .requestN(frame)))
244279 }
245280
246281 private inner class Lifecycle {
@@ -261,8 +296,8 @@ internal class RSocketResponder(
261296 .close()
262297 .subscribe({}, errorConsumer)
263298
264- cleanUp(sendingSubscriptions, { it.cancel() })
265- cleanUp(channelReceivers, { it.onError(err) })
299+ cleanUp(sendingSubscriptions) { it.cancel() }
300+ cleanUp(channelReceivers) { it.onError(err) }
266301 }
267302 }
268303
0 commit comments