@@ -29,10 +29,13 @@ import com.google.firebase.firestore.util.Preconditions
29
29
import com.google.firestore.v1.Pipeline
30
30
import com.google.firestore.v1.Value
31
31
import kotlinx.coroutines.flow.Flow
32
+ import kotlinx.coroutines.flow.drop
33
+ import kotlinx.coroutines.flow.emptyFlow
32
34
import kotlinx.coroutines.flow.filter
33
35
import kotlinx.coroutines.flow.flow
34
- import kotlinx.coroutines.flow.flowOf
36
+ import kotlinx.coroutines.flow.forEach
35
37
import kotlinx.coroutines.flow.map
38
+ import kotlinx.coroutines.flow.skip
36
39
import kotlinx.coroutines.flow.take
37
40
import kotlinx.coroutines.flow.toList
38
41
@@ -533,8 +536,21 @@ internal constructor(private val limit: Int, options: InternalOptions = Internal
533
536
override fun evaluate (
534
537
context : EvaluationContext ,
535
538
inputs : Flow <MutableDocument >
536
- ): Flow <MutableDocument > = if (limit > 0 ) inputs.take(limit) else flowOf()
537
-
539
+ ): Flow <MutableDocument > =
540
+ when {
541
+ limit > 0 -> inputs.take(limit)
542
+ limit < 0 ->
543
+ flow {
544
+ val limitLast = - limit
545
+ val buffer = ArrayDeque <MutableDocument >(limitLast)
546
+ inputs.collect { doc ->
547
+ if (buffer.size == limitLast) buffer.removeFirst()
548
+ buffer.add(doc)
549
+ }
550
+ buffer.forEach { emit(it) }
551
+ }
552
+ else -> emptyFlow()
553
+ }
538
554
override fun args (userDataReader : UserDataReader ): Sequence <Value > =
539
555
sequenceOf(encodeValue(limit))
540
556
}
@@ -545,6 +561,23 @@ internal constructor(private val offset: Int, options: InternalOptions = Interna
545
561
override fun self (options : InternalOptions ) = OffsetStage (offset, options)
546
562
override fun args (userDataReader : UserDataReader ): Sequence <Value > =
547
563
sequenceOf(encodeValue(offset))
564
+ override fun evaluate (
565
+ context : EvaluationContext ,
566
+ inputs : Flow <MutableDocument >
567
+ ): Flow <MutableDocument > =
568
+ when {
569
+ offset > 0 -> inputs.drop(offset)
570
+ offset < 0 ->
571
+ flow {
572
+ val offsetLast = - offset
573
+ val buffer = ArrayDeque <MutableDocument >(offsetLast)
574
+ inputs.collect { doc ->
575
+ if (buffer.size == offsetLast) emit(buffer.removeFirst())
576
+ buffer.add(doc)
577
+ }
578
+ }
579
+ else -> inputs
580
+ }
548
581
}
549
582
550
583
internal class SelectStage
0 commit comments