@@ -23,6 +23,7 @@ import io.reactivex.processors.UnicastProcessor
2323import io.rsocket.kotlin.*
2424import io.rsocket.kotlin.exceptions.ApplicationException
2525import io.rsocket.kotlin.exceptions.ChannelRequestException
26+ import io.rsocket.kotlin.internal.util.reactiveStreamsRequestN
2627import org.reactivestreams.Publisher
2728import org.reactivestreams.Subscriber
2829import org.reactivestreams.Subscription
@@ -121,10 +122,11 @@ internal class RSocketRequester(
121122 val streamId = streamIds.nextStreamId(receivers)
122123 val receiver = StreamReceiver .create()
123124 receivers[streamId] = receiver
124- val reqN = Cond ()
125+ var isFirstRequestN = true
125126
126127 receiver.doOnRequestIfActive { requestN ->
127- val frame = if (reqN.first()) {
128+ val frame = if (isFirstRequestN) {
129+ isFirstRequestN = false
128130 Frame .Request .from(
129131 streamId,
130132 FrameType .REQUEST_STREAM ,
@@ -148,35 +150,37 @@ internal class RSocketRequester(
148150 return Flowable .defer {
149151 val receiver = StreamReceiver .create()
150152 val streamId = streamIds.nextStreamId(receivers)
151- val reqN = Cond ()
153+ var firstReqN = true
154+ var firstReqPayload = true
152155
153156 receiver.doOnRequestIfActive { requestN ->
154157
155- if (reqN.first()) {
156- val wrappedRequest = request.compose {
157- val sender = RequestingPublisher .wrap(it)
158- sender.request(1 )
159- senders[streamId] = sender
160- receivers[streamId] = receiver
161- sender
162- }.publish().autoConnect(2 )
158+ if (firstReqN) {
159+ firstReqN = false
163160
164- val first = wrappedRequest.take( 1 )
165- .map { payload ->
166- Frame . Request .from(
167- streamId,
168- FrameType . REQUEST_CHANNEL ,
169- payload,
170- requestN)
161+ val requestFrames = request
162+ .compose {
163+ val sender = RequestingPublisher .wrap(it)
164+ sender.request( 1 )
165+ senders[streamId] = sender
166+ receivers[streamId] = receiver
167+ sender
171168 }
172- val rest = wrappedRequest.skip(1 )
173169 .map { payload ->
174- Frame .PayloadFrame .from(
175- streamId,
176- FrameType .NEXT ,
177- payload)
170+ if (firstReqPayload) {
171+ firstReqPayload = false
172+ Frame .Request .from(
173+ streamId,
174+ FrameType .REQUEST_CHANNEL ,
175+ payload,
176+ requestN)
177+ } else {
178+ Frame .PayloadFrame .from(
179+ streamId,
180+ FrameType .NEXT ,
181+ payload)
182+ }
178183 }
179- val requestFrames = Flowable .concatArrayEager(first, rest)
180184 requestFrames.subscribe(
181185 ChannelRequestSubscriber (
182186 { payload -> frameSender.send(payload) },
@@ -248,10 +252,7 @@ internal class RSocketRequester(
248252 FrameType .NEXT -> receiver.onNext(DefaultPayload (frame))
249253 FrameType .REQUEST_N -> {
250254 val sender = senders[streamId]
251- sender?.let {
252- val n = Frame .RequestN .requestN(frame).toLong()
253- it.request(n)
254- }
255+ sender?.request(reactiveStreamsRequestN(Frame .RequestN .requestN(frame)))
255256 }
256257 FrameType .COMPLETE -> {
257258 receiver.onComplete()
@@ -320,18 +321,6 @@ internal class RSocketRequester(
320321 }
321322 }
322323
323- private class Cond {
324- private var first = true
325-
326- fun first (): Boolean =
327- if (first) {
328- first = false
329- true
330- } else {
331- false
332- }
333- }
334-
335324 private class ChannelRequestSubscriber (private val next : (Frame ) -> Unit ,
336325 private val error : (Throwable ) -> Unit ,
337326 private val complete : (Boolean ) -> Unit )
0 commit comments