Skip to content

Commit 4e4a4b2

Browse files
committed
RealtimePipeline evaluate initial implementation
1 parent ae034da commit 4e4a4b2

File tree

11 files changed

+763
-261
lines changed

11 files changed

+763
-261
lines changed

firebase-firestore/src/main/java/com/google/firebase/firestore/Pipeline.kt

Lines changed: 165 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,18 @@ import com.google.firebase.firestore.pipeline.DatabaseSource
3232
import com.google.firebase.firestore.pipeline.DistinctStage
3333
import com.google.firebase.firestore.pipeline.DocumentsSource
3434
import com.google.firebase.firestore.pipeline.Expr
35+
import com.google.firebase.firestore.pipeline.Expr.Companion.field
3536
import com.google.firebase.firestore.pipeline.ExprWithAlias
3637
import com.google.firebase.firestore.pipeline.Field
3738
import com.google.firebase.firestore.pipeline.FindNearestStage
3839
import com.google.firebase.firestore.pipeline.FunctionExpr
3940
import com.google.firebase.firestore.pipeline.GenericStage
41+
import com.google.firebase.firestore.pipeline.InternalOptions
4042
import com.google.firebase.firestore.pipeline.LimitStage
4143
import com.google.firebase.firestore.pipeline.OffsetStage
4244
import com.google.firebase.firestore.pipeline.Ordering
4345
import com.google.firebase.firestore.pipeline.PipelineOptions
46+
import com.google.firebase.firestore.pipeline.RealtimePipelineOptions
4447
import com.google.firebase.firestore.pipeline.RemoveFieldsStage
4548
import com.google.firebase.firestore.pipeline.ReplaceStage
4649
import com.google.firebase.firestore.pipeline.SampleStage
@@ -55,12 +58,81 @@ import com.google.firestore.v1.ExecutePipelineRequest
5558
import com.google.firestore.v1.StructuredPipeline
5659
import com.google.firestore.v1.Value
5760

58-
class Pipeline
61+
open class AbstractPipeline
5962
internal constructor(
6063
internal val firestore: FirebaseFirestore,
6164
internal val userDataReader: UserDataReader,
62-
private val stages: FluentIterable<Stage<*>>
65+
internal val stages: FluentIterable<Stage<*>>
6366
) {
67+
private fun toStructuredPipelineProto(options: InternalOptions?): StructuredPipeline {
68+
val builder = StructuredPipeline.newBuilder()
69+
builder.pipeline = toPipelineProto()
70+
options?.forEach(builder::putOptions)
71+
return builder.build()
72+
}
73+
74+
internal fun toPipelineProto(): com.google.firestore.v1.Pipeline =
75+
com.google.firestore.v1.Pipeline.newBuilder()
76+
.addAllStages(stages.map { it.toProtoStage(userDataReader) })
77+
.build()
78+
79+
private fun toExecutePipelineRequest(options: InternalOptions?): ExecutePipelineRequest {
80+
val database = firestore.databaseId
81+
val builder = ExecutePipelineRequest.newBuilder()
82+
builder.database = "projects/${database.projectId}/databases/${database.databaseId}"
83+
builder.structuredPipeline = toStructuredPipelineProto(options)
84+
return builder.build()
85+
}
86+
87+
protected fun execute(options: InternalOptions?): Task<PipelineSnapshot> {
88+
val request = toExecutePipelineRequest(options)
89+
val observerTask = ObserverSnapshotTask()
90+
firestore.callClient { call -> call!!.executePipeline(request, observerTask) }
91+
return observerTask.task
92+
}
93+
94+
private inner class ObserverSnapshotTask : PipelineResultObserver {
95+
private val userDataWriter =
96+
UserDataWriter(firestore, DocumentSnapshot.ServerTimestampBehavior.DEFAULT)
97+
private val taskCompletionSource = TaskCompletionSource<PipelineSnapshot>()
98+
private val results: ImmutableList.Builder<PipelineResult> = ImmutableList.builder()
99+
override fun onDocument(
100+
key: DocumentKey?,
101+
data: Map<String, Value>,
102+
createTime: Timestamp?,
103+
updateTime: Timestamp?
104+
) {
105+
results.add(
106+
PipelineResult(
107+
firestore,
108+
userDataWriter,
109+
if (key == null) null else DocumentReference(key, firestore),
110+
data,
111+
createTime,
112+
updateTime
113+
)
114+
)
115+
}
116+
117+
override fun onComplete(executionTime: Timestamp) {
118+
taskCompletionSource.setResult(PipelineSnapshot(executionTime, results.build()))
119+
}
120+
121+
override fun onError(exception: FirebaseFirestoreException) {
122+
taskCompletionSource.setException(exception)
123+
}
124+
125+
val task: Task<PipelineSnapshot>
126+
get() = taskCompletionSource.task
127+
}
128+
}
129+
130+
class Pipeline
131+
private constructor(
132+
firestore: FirebaseFirestore,
133+
userDataReader: UserDataReader,
134+
stages: FluentIterable<Stage<*>>
135+
) : AbstractPipeline(firestore, userDataReader, stages) {
64136
internal constructor(
65137
firestore: FirebaseFirestore,
66138
userDataReader: UserDataReader,
@@ -71,37 +143,14 @@ internal constructor(
71143
return Pipeline(firestore, userDataReader, stages.append(stage))
72144
}
73145

74-
fun execute(): Task<PipelineSnapshot> = execute(PipelineOptions.DEFAULT)
146+
fun execute(): Task<PipelineSnapshot> = execute(null)
75147

76-
fun execute(options: PipelineOptions): Task<PipelineSnapshot> {
77-
val observerTask = ObserverSnapshotTask()
78-
firestore.callClient { call -> call!!.executePipeline(toProto(options), observerTask) }
79-
return observerTask.task
80-
}
148+
fun execute(options: RealtimePipelineOptions): Task<PipelineSnapshot> = execute(options.options)
81149

82150
internal fun documentReference(key: DocumentKey): DocumentReference {
83151
return DocumentReference(key, firestore)
84152
}
85153

86-
private fun toProto(options: PipelineOptions): ExecutePipelineRequest {
87-
val database = firestore.databaseId
88-
val builder = ExecutePipelineRequest.newBuilder()
89-
builder.database = "projects/${database.projectId}/databases/${database.databaseId}"
90-
builder.structuredPipeline = toStructuredPipelineProto()
91-
return builder.build()
92-
}
93-
94-
private fun toStructuredPipelineProto(): StructuredPipeline {
95-
val builder = StructuredPipeline.newBuilder()
96-
builder.pipeline = toPipelineProto()
97-
return builder.build()
98-
}
99-
100-
internal fun toPipelineProto(): com.google.firestore.v1.Pipeline =
101-
com.google.firestore.v1.Pipeline.newBuilder()
102-
.addAllStages(stages.map { it.toProtoStage(userDataReader) })
103-
.build()
104-
105154
/**
106155
* Adds a stage to the pipeline by specifying the stage name as an argument. This does not offer
107156
* any type safety on the stage params and requires the caller to know the order (and optionally
@@ -153,9 +202,7 @@ internal constructor(
153202
*/
154203
fun removeFields(field: String, vararg additionalFields: String): Pipeline =
155204
append(
156-
RemoveFieldsStage(
157-
arrayOf(Expr.field(field), *additionalFields.map(Expr::field).toTypedArray())
158-
)
205+
RemoveFieldsStage(arrayOf(field(field), *additionalFields.map(Expr::field).toTypedArray()))
159206
)
160207

161208
/**
@@ -178,11 +225,7 @@ internal constructor(
178225
* @return A new [Pipeline] object with this stage appended to the stage list.
179226
*/
180227
fun select(selection: Selectable, vararg additionalSelections: Any): Pipeline =
181-
append(
182-
SelectStage(
183-
arrayOf(selection, *additionalSelections.map(Selectable::toSelectable).toTypedArray())
184-
)
185-
)
228+
append(SelectStage.of(selection, *additionalSelections))
186229

187230
/**
188231
* Selects or creates a set of fields from the outputs of previous stages.
@@ -204,14 +247,7 @@ internal constructor(
204247
* @return A new [Pipeline] object with this stage appended to the stage list.
205248
*/
206249
fun select(fieldName: String, vararg additionalSelections: Any): Pipeline =
207-
append(
208-
SelectStage(
209-
arrayOf(
210-
Expr.field(fieldName),
211-
*additionalSelections.map(Selectable::toSelectable).toTypedArray()
212-
)
213-
)
214-
)
250+
append(SelectStage.of(fieldName, *additionalSelections))
215251

216252
/**
217253
* Sorts the documents from previous stages based on one or more [Ordering] criteria.
@@ -320,10 +356,7 @@ internal constructor(
320356
fun distinct(groupField: String, vararg additionalGroups: Any): Pipeline =
321357
append(
322358
DistinctStage(
323-
arrayOf(
324-
Expr.field(groupField),
325-
*additionalGroups.map(Selectable::toSelectable).toTypedArray()
326-
)
359+
arrayOf(field(groupField), *additionalGroups.map(Selectable::toSelectable).toTypedArray())
327360
)
328361
)
329362

@@ -453,7 +486,7 @@ internal constructor(
453486
* @param field The [String] specifying the field name containing the nested map.
454487
* @return A new [Pipeline] object with this stage appended to the stage list.
455488
*/
456-
fun replace(field: String): Pipeline = replace(Expr.field(field))
489+
fun replace(field: String): Pipeline = replace(field(field))
457490

458491
/**
459492
* Fully overwrites all fields in a document with those coming from a nested map.
@@ -514,8 +547,7 @@ internal constructor(
514547
* @param alias The name of field to store emitted element of array.
515548
* @return A new [Pipeline] object with this stage appended to the stage list.
516549
*/
517-
fun unnest(arrayField: String, alias: String): Pipeline =
518-
unnest(Expr.field(arrayField).alias(alias))
550+
fun unnest(arrayField: String, alias: String): Pipeline = unnest(field(arrayField).alias(alias))
519551

520552
/**
521553
* Takes a specified array from the input documents and outputs a document for each element with
@@ -550,41 +582,6 @@ internal constructor(
550582
* @return A new [Pipeline] object with this stage appended to the stage list.
551583
*/
552584
fun unnest(unnestStage: UnnestStage): Pipeline = append(unnestStage)
553-
554-
private inner class ObserverSnapshotTask : PipelineResultObserver {
555-
private val userDataWriter =
556-
UserDataWriter(firestore, DocumentSnapshot.ServerTimestampBehavior.DEFAULT)
557-
private val taskCompletionSource = TaskCompletionSource<PipelineSnapshot>()
558-
private val results: ImmutableList.Builder<PipelineResult> = ImmutableList.builder()
559-
override fun onDocument(
560-
key: DocumentKey?,
561-
data: Map<String, Value>,
562-
createTime: Timestamp?,
563-
updateTime: Timestamp?
564-
) {
565-
results.add(
566-
PipelineResult(
567-
firestore,
568-
userDataWriter,
569-
if (key == null) null else DocumentReference(key, firestore),
570-
data,
571-
createTime,
572-
updateTime
573-
)
574-
)
575-
}
576-
577-
override fun onComplete(executionTime: Timestamp) {
578-
taskCompletionSource.setResult(PipelineSnapshot(executionTime, results.build()))
579-
}
580-
581-
override fun onError(exception: FirebaseFirestoreException) {
582-
taskCompletionSource.setException(exception)
583-
}
584-
585-
val task: Task<PipelineSnapshot>
586-
get() = taskCompletionSource.task
587-
}
588585
}
589586

590587
/** Start of a Firestore Pipeline */
@@ -644,7 +641,7 @@ class PipelineSource internal constructor(private val firestore: FirebaseFiresto
644641
* Set the pipeline's source to the collection specified by CollectionSource.
645642
*
646643
* @param stage A [CollectionSource] that will be the source of this pipeline.
647-
* @return Pipeline with documents from target collection.
644+
* @return A new [Pipeline] object with documents from target collection.
648645
* @throws [IllegalArgumentException] Thrown if the [stage] provided targets a different project
649646
* or database than the pipeline.
650647
*/
@@ -659,6 +656,7 @@ class PipelineSource internal constructor(private val firestore: FirebaseFiresto
659656
* Set the pipeline's source to the collection group with the given id.
660657
*
661658
* @param collectionId The id of a collection group that will be the source of this pipeline.
659+
* @return A new [Pipeline] object with documents from target collection group.
662660
*/
663661
fun collectionGroup(collectionId: String): Pipeline =
664662
pipeline(CollectionGroupSource.of((collectionId)))
@@ -710,6 +708,87 @@ class PipelineSource internal constructor(private val firestore: FirebaseFiresto
710708
}
711709
}
712710

711+
class RealtimePipelineSource internal constructor(private val firestore: FirebaseFirestore) {
712+
713+
/**
714+
* Set the pipeline's source to the collection specified by the given path.
715+
*
716+
* @param path A path to a collection that will be the source of this pipeline.
717+
* @return A new [RealtimePipeline] object with documents from target collection.
718+
*/
719+
fun collection(path: String): RealtimePipeline = collection(CollectionSource.of(path))
720+
721+
/**
722+
* Set the pipeline's source to the collection specified by the given [CollectionReference].
723+
*
724+
* @param ref A [CollectionReference] for a collection that will be the source of this pipeline.
725+
* @return A new [RealtimePipeline] object with documents from target collection.
726+
* @throws [IllegalArgumentException] Thrown if the [ref] provided targets a different project or
727+
* database than the pipeline.
728+
*/
729+
fun collection(ref: CollectionReference): RealtimePipeline = collection(CollectionSource.of(ref))
730+
731+
/**
732+
* Set the pipeline's source to the collection specified by CollectionSource.
733+
*
734+
* @param stage A [CollectionSource] that will be the source of this pipeline.
735+
* @return A new [RealtimePipeline] object with documents from target collection.
736+
* @throws [IllegalArgumentException] Thrown if the [stage] provided targets a different project
737+
* or database than the pipeline.
738+
*/
739+
fun collection(stage: CollectionSource): RealtimePipeline {
740+
if (stage.firestore != null && stage.firestore.databaseId != firestore.databaseId) {
741+
throw IllegalArgumentException("Provided collection is from a different Firestore instance.")
742+
}
743+
return RealtimePipeline(firestore, firestore.userDataReader, stage)
744+
}
745+
746+
/**
747+
* Set the pipeline's source to the collection group with the given id.
748+
*
749+
* @param collectionId The id of a collection group that will be the source of this pipeline.
750+
* @return A new [RealtimePipeline] object with documents from target collection group.
751+
*/
752+
fun collectionGroup(collectionId: String): RealtimePipeline =
753+
pipeline(CollectionGroupSource.of((collectionId)))
754+
755+
fun pipeline(stage: CollectionGroupSource): RealtimePipeline =
756+
RealtimePipeline(firestore, firestore.userDataReader, stage)
757+
}
758+
759+
class RealtimePipeline
760+
internal constructor(
761+
firestore: FirebaseFirestore,
762+
userDataReader: UserDataReader,
763+
stages: FluentIterable<Stage<*>>
764+
) : AbstractPipeline(firestore, userDataReader, stages) {
765+
internal constructor(
766+
firestore: FirebaseFirestore,
767+
userDataReader: UserDataReader,
768+
stage: Stage<*>
769+
) : this(firestore, userDataReader, FluentIterable.of(stage))
770+
771+
private fun append(stage: Stage<*>): RealtimePipeline {
772+
return RealtimePipeline(firestore, userDataReader, stages.append(stage))
773+
}
774+
775+
fun execute(): Task<PipelineSnapshot> = execute(null)
776+
777+
fun execute(options: PipelineOptions): Task<PipelineSnapshot> = execute(options.options)
778+
779+
fun limit(limit: Int): RealtimePipeline = append(LimitStage(limit))
780+
781+
fun offset(offset: Int): RealtimePipeline = append(OffsetStage(offset))
782+
783+
fun select(selection: Selectable, vararg additionalSelections: Any): RealtimePipeline =
784+
append(SelectStage.of(selection, *additionalSelections))
785+
786+
fun select(fieldName: String, vararg additionalSelections: Any): RealtimePipeline =
787+
append(SelectStage.of(fieldName, *additionalSelections))
788+
789+
fun where(condition: BooleanExpr): RealtimePipeline = append(WhereStage(condition))
790+
}
791+
713792
/**
714793
*/
715794
class PipelineSnapshot

firebase-firestore/src/main/java/com/google/firebase/firestore/core/CompositeFilter.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -170,13 +170,16 @@ public String getCanonicalId() {
170170

171171
@Override
172172
BooleanExpr toPipelineExpr() {
173-
BooleanExpr[] booleanExprs =
174-
filters.stream().map(Filter::toPipelineExpr).toArray(BooleanExpr[]::new);
173+
BooleanExpr first = filters.get(0).toPipelineExpr();
174+
BooleanExpr[] additional = new BooleanExpr[filters.size() - 1];
175+
for (int i = 1, filtersSize = filters.size(); i < filtersSize; i++) {
176+
additional[i - 1] = filters.get(i).toPipelineExpr();
177+
}
175178
switch (operator) {
176179
case AND:
177-
return new BooleanExpr("and", booleanExprs);
180+
return BooleanExpr.and(first, additional);
178181
case OR:
179-
return new BooleanExpr("or", booleanExprs);
182+
return BooleanExpr.or(first, additional);
180183
}
181184
// Handle OPERATOR_UNSPECIFIED and UNRECOGNIZED cases as needed
182185
throw new IllegalArgumentException("Unsupported operator: " + operator);
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.google.firebase.firestore.core
2+
3+
import com.google.firebase.firestore.AbstractPipeline
4+
import com.google.firebase.firestore.model.MutableDocument
5+
import com.google.firebase.firestore.pipeline.EvaluationContext
6+
import kotlinx.coroutines.flow.Flow
7+
8+
internal fun runPipeline(
9+
pipeline: AbstractPipeline,
10+
input: Flow<MutableDocument>
11+
): Flow<MutableDocument> {
12+
val context = EvaluationContext(pipeline.userDataReader)
13+
return pipeline.stages.fold(input) { documentFlow, stage ->
14+
stage.evaluate(context, documentFlow)
15+
}
16+
}

0 commit comments

Comments
 (0)