Skip to content

Commit 2f0e505

Browse files
committed
Merge branch 'tomandersen/ppSuggestion' into wuandy/JavaPplPP
# Conflicts: # google-cloud-firestore/src/main/java/com/google/cloud/firestore/Pipeline.java
2 parents 8d63fb6 + f8b9386 commit 2f0e505

File tree

4 files changed

+81
-178
lines changed

4 files changed

+81
-178
lines changed

google-cloud-firestore/src/main/java/com/google/cloud/firestore/Pipeline.java

Lines changed: 69 additions & 165 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,7 @@
3535
import com.google.cloud.firestore.pipeline.expressions.Selectable;
3636
import com.google.cloud.firestore.pipeline.stages.AddFields;
3737
import com.google.cloud.firestore.pipeline.stages.Aggregate;
38-
import com.google.cloud.firestore.pipeline.stages.Collection;
39-
import com.google.cloud.firestore.pipeline.stages.CollectionGroup;
40-
import com.google.cloud.firestore.pipeline.stages.Database;
4138
import com.google.cloud.firestore.pipeline.stages.Distinct;
42-
import com.google.cloud.firestore.pipeline.stages.Documents;
4339
import com.google.cloud.firestore.pipeline.stages.FindNearest;
4440
import com.google.cloud.firestore.pipeline.stages.FindNearestOptions;
4541
import com.google.cloud.firestore.pipeline.stages.GenericStage;
@@ -50,21 +46,18 @@
5046
import com.google.cloud.firestore.pipeline.stages.Stage;
5147
import com.google.cloud.firestore.pipeline.stages.StageUtils;
5248
import com.google.cloud.firestore.pipeline.stages.Where;
53-
import com.google.common.collect.ImmutableList;
49+
import com.google.common.collect.FluentIterable;
5450
import com.google.common.collect.ImmutableMap;
55-
import com.google.common.collect.Lists;
5651
import com.google.firestore.v1.Document;
5752
import com.google.firestore.v1.ExecutePipelineRequest;
5853
import com.google.firestore.v1.ExecutePipelineResponse;
5954
import com.google.firestore.v1.StructuredPipeline;
60-
import com.google.firestore.v1.Value;
6155
import io.opencensus.trace.AttributeValue;
6256
import io.opencensus.trace.Tracing;
6357
import java.util.ArrayList;
6458
import java.util.List;
6559
import java.util.logging.Level;
6660
import java.util.logging.Logger;
67-
import java.util.stream.Collectors;
6861

6962
/**
7063
* The Pipeline class provides a flexible and expressive framework for building complex data
@@ -119,32 +112,21 @@
119112
@BetaApi
120113
public final class Pipeline {
121114
private static Logger logger = Logger.getLogger(Pipeline.class.getName());
122-
private final ImmutableList<Stage> stages;
123-
private final Firestore db;
115+
private final FluentIterable<Stage> stages;
116+
private final FirestoreRpcContext<?> rpcContext;
124117

125-
private Pipeline(Firestore db, List<Stage> stages) {
126-
this.db = db;
127-
this.stages = ImmutableList.copyOf(stages);
118+
private Pipeline(FirestoreRpcContext<?> rpcContext, FluentIterable<Stage> stages) {
119+
this.rpcContext = rpcContext;
120+
this.stages = stages;
128121
}
129122

130123
@InternalApi
131-
Pipeline(Firestore db, Collection collection) {
132-
this(db, Lists.newArrayList(collection));
124+
Pipeline(FirestoreRpcContext<?> rpcContext, Stage stage) {
125+
this(rpcContext, FluentIterable.of(stage));
133126
}
134127

135-
@InternalApi
136-
Pipeline(Firestore db, CollectionGroup group) {
137-
this(db, Lists.newArrayList(group));
138-
}
139-
140-
@InternalApi
141-
Pipeline(Firestore firestore, Database db) {
142-
this(firestore, Lists.newArrayList(db));
143-
}
144-
145-
@InternalApi
146-
Pipeline(Firestore db, Documents docs) {
147-
this(db, Lists.newArrayList(docs));
128+
private Pipeline append(Stage stage) {
129+
return new Pipeline(this.rpcContext, stages.append(stage));
148130
}
149131

150132
/**
@@ -178,12 +160,7 @@ private Pipeline(Firestore db, List<Stage> stages) {
178160
*/
179161
@BetaApi
180162
public Pipeline addFields(Selectable... fields) {
181-
return new Pipeline(
182-
this.db,
183-
ImmutableList.<Stage>builder()
184-
.addAll(stages)
185-
.add(new AddFields(PipelineUtils.selectablesToMap(fields)))
186-
.build());
163+
return append(new AddFields(PipelineUtils.selectablesToMap(fields)));
187164
}
188165

189166
/**
@@ -217,12 +194,7 @@ public Pipeline addFields(Selectable... fields) {
217194
*/
218195
@BetaApi
219196
public Pipeline select(Selectable... selections) {
220-
return new Pipeline(
221-
this.db,
222-
ImmutableList.<Stage>builder()
223-
.addAll(stages)
224-
.add(new Select(PipelineUtils.selectablesToMap(selections)))
225-
.build());
197+
return append(new Select(PipelineUtils.selectablesToMap(selections)));
226198
}
227199

228200
/**
@@ -248,12 +220,7 @@ public Pipeline select(Selectable... selections) {
248220
*/
249221
@BetaApi
250222
public Pipeline select(String... fields) {
251-
return new Pipeline(
252-
this.db,
253-
ImmutableList.<Stage>builder()
254-
.addAll(stages)
255-
.add(new Select(PipelineUtils.fieldNamesToMap(fields)))
256-
.build());
223+
return append(new Select(PipelineUtils.fieldNamesToMap(fields)));
257224
}
258225

259226
/**
@@ -289,8 +256,7 @@ public Pipeline select(String... fields) {
289256
*/
290257
@BetaApi
291258
public Pipeline where(FilterCondition condition) {
292-
return new Pipeline(
293-
this.db, ImmutableList.<Stage>builder().addAll(stages).add(new Where(condition)).build());
259+
return append(new Where(condition));
294260
}
295261

296262
/**
@@ -315,8 +281,7 @@ public Pipeline where(FilterCondition condition) {
315281
*/
316282
@BetaApi
317283
public Pipeline offset(int offset) {
318-
return new Pipeline(
319-
this.db, ImmutableList.<Stage>builder().addAll(stages).add(new Offset(offset)).build());
284+
return append(new Offset(offset));
320285
}
321286

322287
/**
@@ -346,8 +311,7 @@ public Pipeline offset(int offset) {
346311
*/
347312
@BetaApi
348313
public Pipeline limit(int limit) {
349-
return new Pipeline(
350-
this.db, ImmutableList.<Stage>builder().addAll(stages).add(new Limit(limit)).build());
314+
return append(new Limit(limit));
351315
}
352316

353317
/**
@@ -374,12 +338,7 @@ public Pipeline limit(int limit) {
374338
*/
375339
@BetaApi
376340
public Pipeline aggregate(ExprWithAlias<Accumulator>... accumulators) {
377-
return new Pipeline(
378-
this.db,
379-
ImmutableList.<Stage>builder()
380-
.addAll(stages)
381-
.add(Aggregate.withAccumulators(accumulators))
382-
.build());
341+
return append(Aggregate.withAccumulators(accumulators));
383342
}
384343

385344
/**
@@ -416,8 +375,7 @@ public Pipeline aggregate(ExprWithAlias<Accumulator>... accumulators) {
416375
*/
417376
@BetaApi
418377
public Pipeline aggregate(Aggregate aggregate) {
419-
return new Pipeline(
420-
this.db, ImmutableList.<Stage>builder().addAll(stages).add(aggregate).build());
378+
return append(aggregate);
421379
}
422380

423381
/**
@@ -439,12 +397,7 @@ public Pipeline aggregate(Aggregate aggregate) {
439397
*/
440398
@BetaApi
441399
public Pipeline distinct(String... fields) {
442-
return new Pipeline(
443-
this.db,
444-
ImmutableList.<Stage>builder()
445-
.addAll(stages)
446-
.add(new Distinct(PipelineUtils.fieldNamesToMap(fields)))
447-
.build());
400+
return append(new Distinct(PipelineUtils.fieldNamesToMap(fields)));
448401
}
449402

450403
/**
@@ -476,12 +429,7 @@ public Pipeline distinct(String... fields) {
476429
*/
477430
@BetaApi
478431
public Pipeline distinct(Selectable... selectables) {
479-
return new Pipeline(
480-
this.db,
481-
ImmutableList.<Stage>builder()
482-
.addAll(stages)
483-
.add(new Distinct(PipelineUtils.selectablesToMap(selectables)))
484-
.build());
432+
return append(new Distinct(PipelineUtils.selectablesToMap(selectables)));
485433
}
486434

487435
/**
@@ -624,12 +572,7 @@ public Pipeline sort(Ordering... orders) {
624572
@BetaApi
625573
public Pipeline genericStage(String name, List<Object> params) {
626574
// Implementation for genericStage (add the GenericStage if needed)
627-
return new Pipeline(
628-
this.db,
629-
ImmutableList.<Stage>builder()
630-
.addAll(stages)
631-
.add(new GenericStage(name, params)) // Assuming GenericStage takes a list of params
632-
.build());
575+
return append(new GenericStage(name, params)); // Assuming GenericStage takes a list of params
633576
}
634577

635578
/**
@@ -665,47 +608,29 @@ public Pipeline genericStage(String name, List<Object> params) {
665608
*/
666609
@BetaApi
667610
public ApiFuture<List<PipelineResult>> execute() {
668-
if (db instanceof FirestoreImpl) {
669-
FirestoreImpl firestoreImpl = (FirestoreImpl) db;
670-
Value pipelineValue = toProto();
671-
ExecutePipelineRequest request =
672-
ExecutePipelineRequest.newBuilder()
673-
.setDatabase(firestoreImpl.getResourcePath().getDatabaseName().toString())
674-
.setStructuredPipeline(
675-
StructuredPipeline.newBuilder()
676-
.setPipeline(pipelineValue.getPipelineValue())
677-
.build())
678-
.build();
679-
680-
SettableApiFuture<List<PipelineResult>> futureResult = SettableApiFuture.create();
681-
682-
pipelineInternalStream( // Assuming you have this method
683-
firestoreImpl,
684-
request,
685-
new PipelineResultObserver() {
686-
final List<PipelineResult> results = new ArrayList<>();
687-
688-
@Override
689-
public void onCompleted() {
690-
futureResult.set(results);
691-
}
611+
SettableApiFuture<List<PipelineResult>> futureResult = SettableApiFuture.create();
692612

693-
@Override
694-
public void onNext(PipelineResult result) {
695-
results.add(result);
696-
}
613+
execute( // Assuming you have this method
614+
new PipelineResultObserver() {
615+
final List<PipelineResult> results = new ArrayList<>();
697616

698-
@Override
699-
public void onError(Throwable t) {
700-
futureResult.setException(t);
701-
}
702-
});
617+
@Override
618+
public void onCompleted() {
619+
futureResult.set(results);
620+
}
621+
622+
@Override
623+
public void onNext(PipelineResult result) {
624+
results.add(result);
625+
}
703626

704-
return futureResult;
705-
} else {
706-
// Handle unsupported Firestore types
707-
throw new IllegalArgumentException("Unsupported Firestore type");
708-
}
627+
@Override
628+
public void onError(Throwable t) {
629+
futureResult.setException(t);
630+
}
631+
});
632+
633+
return futureResult;
709634
}
710635

711636
/**
@@ -755,59 +680,40 @@ public void onError(Throwable t) {
755680
*/
756681
@BetaApi
757682
public void execute(ApiStreamObserver<PipelineResult> observer) {
758-
if (db instanceof FirestoreImpl) {
759-
FirestoreImpl firestoreImpl = (FirestoreImpl) db;
760-
Value pipelineValue = toProto();
761-
ExecutePipelineRequest request =
762-
ExecutePipelineRequest.newBuilder()
763-
.setDatabase(firestoreImpl.getResourcePath().getDatabaseName().toString())
764-
.setStructuredPipeline(
765-
StructuredPipeline.newBuilder()
766-
.setPipeline(pipelineValue.getPipelineValue())
767-
.build())
768-
.build();
769-
770-
pipelineInternalStream(
771-
firestoreImpl,
772-
request,
773-
new PipelineResultObserver() {
774-
@Override
775-
public void onCompleted() {
776-
observer.onCompleted();
777-
}
683+
ExecutePipelineRequest request =
684+
ExecutePipelineRequest.newBuilder()
685+
.setDatabase(rpcContext.getDatabaseName())
686+
.setStructuredPipeline(StructuredPipeline.newBuilder().setPipeline(toProto()).build())
687+
.build();
688+
689+
pipelineInternalStream(
690+
request,
691+
new PipelineResultObserver() {
692+
@Override
693+
public void onCompleted() {
694+
observer.onCompleted();
695+
}
778696

779-
@Override
780-
public void onNext(PipelineResult result) {
781-
observer.onNext(result);
782-
}
697+
@Override
698+
public void onNext(PipelineResult result) {
699+
observer.onNext(result);
700+
}
783701

784-
@Override
785-
public void onError(Throwable t) {
786-
observer.onError(t);
787-
}
788-
});
789-
} else {
790-
// Handle unsupported Firestore types
791-
throw new IllegalArgumentException("Unsupported Firestore type");
792-
}
702+
@Override
703+
public void onError(Throwable t) {
704+
observer.onError(t);
705+
}
706+
});
793707
}
794708

795-
private Value toProto() {
796-
return Value.newBuilder()
797-
.setPipelineValue(
798-
com.google.firestore.v1.Pipeline.newBuilder()
799-
.addAllStages(
800-
stages.stream()
801-
.map(StageUtils::toStageProto)
802-
.collect(Collectors.toList())) // Use the static method
803-
)
709+
private com.google.firestore.v1.Pipeline toProto() {
710+
return com.google.firestore.v1.Pipeline.newBuilder()
711+
.addAllStages(stages.transform(StageUtils::toStageProto))
804712
.build();
805713
}
806714

807715
private void pipelineInternalStream(
808-
FirestoreImpl rpcContext,
809-
ExecutePipelineRequest request,
810-
PipelineResultObserver resultObserver) {
716+
ExecutePipelineRequest request, PipelineResultObserver resultObserver) {
811717
ResponseObserver<ExecutePipelineResponse> observer =
812718
new ResponseObserver<ExecutePipelineResponse>() {
813719
Timestamp executionTime = null;
@@ -841,9 +747,7 @@ public void onResponse(ExecutePipelineResponse response) {
841747
}
842748

843749
for (Document doc : response.getResultsList()) {
844-
resultObserver.onNext(
845-
PipelineResult.fromDocument(
846-
rpcContext, Timestamp.fromProto(response.getExecutionTime()), doc));
750+
resultObserver.onNext(PipelineResult.fromDocument(rpcContext, executionTime, doc));
847751
}
848752
}
849753

@@ -870,7 +774,7 @@ public void onComplete() {
870774
.addAnnotation(
871775
"Firestore.ExecutePipeline: Completed",
872776
ImmutableMap.of(
873-
"numDocuments", AttributeValue.longAttributeValue((long) numDocuments)));
777+
"numDocuments", AttributeValue.longAttributeValue((long) totalNumDocuments)));
874778
resultObserver.onCompleted(executionTime);
875779
}
876780
};

0 commit comments

Comments
 (0)