Skip to content

Commit 3ba4d1e

Browse files
tom-andersenwu-hui
andauthored
Prototype of PipelineOptions (#1959)
* Incomplete prototyping of pipeline options * Incomplete prototyping of pipeline options * Incomplete prototyping of pipeline options * Incomplete prototyping of pipeline options * Changes from feedback * Expand example and fix. * Comments --------- Co-authored-by: wu-hui <[email protected]>
1 parent bdcca71 commit 3ba4d1e

40 files changed

+864
-341
lines changed

google-cloud-firestore/src/main/java/com/google/cloud/firestore/Pipeline.java

Lines changed: 61 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525
import com.google.api.gax.rpc.ResponseObserver;
2626
import com.google.api.gax.rpc.StreamController;
2727
import com.google.cloud.Timestamp;
28+
import com.google.cloud.firestore.pipeline.stages.AggregateOptions;
29+
import com.google.cloud.firestore.pipeline.stages.PipelineOptions;
30+
import com.google.cloud.firestore.pipeline.stages.GenericOptions;
2831
import com.google.cloud.firestore.pipeline.expressions.Accumulator;
2932
import com.google.cloud.firestore.pipeline.expressions.Expr;
3033
import com.google.cloud.firestore.pipeline.expressions.ExprWithAlias;
@@ -44,7 +47,6 @@
4447
import com.google.cloud.firestore.pipeline.stages.RemoveFields;
4548
import com.google.cloud.firestore.pipeline.stages.Replace;
4649
import com.google.cloud.firestore.pipeline.stages.Sample;
47-
import com.google.cloud.firestore.pipeline.stages.SampleOptions;
4850
import com.google.cloud.firestore.pipeline.stages.Select;
4951
import com.google.cloud.firestore.pipeline.stages.Sort;
5052
import com.google.cloud.firestore.pipeline.stages.Stage;
@@ -69,6 +71,7 @@
6971
import java.util.List;
7072
import java.util.logging.Level;
7173
import java.util.logging.Logger;
74+
import javax.annotation.Nonnull;
7275
import javax.annotation.Nullable;
7376

7477
/**
@@ -502,12 +505,10 @@ public Pipeline distinct(Selectable... selectables) {
502505
* <pre>{@code
503506
* // Find books with similar "topicVectors" to the given targetVector
504507
* firestore.pipeline().collection("books")
505-
* .findNearest("topicVectors", targetVector, FindNearest.DistanceMeasure.cosine(),
506-
* FindNearestOptions
507-
* .builder()
508-
* .limit(10)
509-
* .distanceField("distance")
510-
* .build());
508+
* .findNearest("topicVectors", targetVector, FindNearest.DistanceMeasure.COSINE,
509+
* FindNearestOptions.DEFAULT
510+
* .withLimit(10)
511+
* .withDistanceField("distance"));
511512
* }</pre>
512513
*
513514
* @param fieldName The name of the field containing the vector data. This field should store
@@ -540,12 +541,11 @@ public Pipeline findNearest(
540541
* <pre>{@code
541542
* // Find books with similar "topicVectors" to the given targetVector
542543
* firestore.pipeline().collection("books")
543-
* .findNearest(Field.of("topicVectors"), targetVector, FindNearest.DistanceMeasure.cosine(),
544-
* FindNearestOptions
545-
* .builder()
546-
* .limit(10)
547-
* .distanceField("distance")
548-
* .build());
544+
* .findNearest(
545+
* FindNearest.of(Field.of("topicVectors"), targetVector, FindNearest.DistanceMeasure.COSINE),
546+
* FindNearestOptions.DEFAULT
547+
* .withLimit(10)
548+
* .withDistanceField("distance"));
549549
* }</pre>
550550
*
551551
* @param property The expression that evaluates to a vector value using the stage inputs.
@@ -590,7 +590,7 @@ public Pipeline findNearest(
590590
*/
591591
@BetaApi
592592
public Pipeline sort(Ordering... orders) {
593-
return append(new Sort(orders));
593+
return append(new Sort(ImmutableList.copyOf(orders)));
594594
}
595595

596596
/**
@@ -683,8 +683,7 @@ public Pipeline replace(Selectable field) {
683683
*/
684684
@BetaApi
685685
public Pipeline sample(int limit) {
686-
SampleOptions options = SampleOptions.docLimit(limit);
687-
return sample(options);
686+
return sample(Sample.withDocLimit(limit));
688687
}
689688

690689
/**
@@ -698,19 +697,19 @@ public Pipeline sample(int limit) {
698697
* <pre>{@code
699698
* // Sample 10 books, if available.
700699
* firestore.pipeline().collection("books")
701-
* .sample(SampleOptions.docLimit(10));
700+
* .sample(Sample.withDocLimit(10));
702701
*
703702
* // Sample 50% of books.
704703
* firestore.pipeline().collection("books")
705-
* .sample(SampleOptions.percentage(0.5));
704+
* .sample(Sample.withPercentage(0.5));
706705
* }</pre>
707706
*
708-
* @param options The {@code SampleOptions} specifies how sampling is performed.
707+
* @param sample The {@code Sample} specifies how sampling is performed.
709708
* @return A new {@code Pipeline} object with this stage appended to the stage list.
710709
*/
711710
@BetaApi
712-
public Pipeline sample(SampleOptions options) {
713-
return append(new Sample(options));
711+
public Pipeline sample(Sample sample) {
712+
return append(sample);
714713
}
715714

716715
/**
@@ -756,21 +755,21 @@ public Pipeline union(Pipeline other) {
756755
*
757756
* // Emit a book document for each tag of the book.
758757
* firestore.pipeline().collection("books")
759-
* .unnest("tags");
758+
* .unnest("tags", "tag");
760759
*
761760
* // Output:
762-
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tags": "comedy", ... }
763-
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tags": "space", ... }
764-
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tags": "adventure", ... }
761+
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tag": "comedy", ... }
762+
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tag": "space", ... }
763+
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tag": "adventure", ... }
765764
* }</pre>
766765
*
767766
* @param fieldName The name of the field containing the array.
768767
* @return A new {@code Pipeline} object with this stage appended to the stage list.
769768
*/
770769
@BetaApi
771-
public Pipeline unnest(String fieldName) {
770+
public Pipeline unnest(String fieldName, String alias) {
772771
// return unnest(Field.of(fieldName));
773-
return append(new Unnest(Field.of(fieldName)));
772+
return append(new Unnest(Field.of(fieldName), alias));
774773
}
775774

776775
// /**
@@ -829,22 +828,22 @@ public Pipeline unnest(String fieldName) {
829828
*
830829
* // Emit a book document for each tag of the book.
831830
* firestore.pipeline().collection("books")
832-
* .unnest("tags", UnnestOptions.indexField("tagIndex"));
831+
* .unnest("tags", "tag", Unnest.Options.DEFAULT.withIndexField("tagIndex"));
833832
*
834833
* // Output:
835-
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tagIndex": 0, "tags": "comedy", ... }
836-
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tagIndex": 1, "tags": "space", ... }
837-
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tagIndex": 2, "tags": "adventure", ... }
834+
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tagIndex": 0, "tag": "comedy", ... }
835+
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tagIndex": 1, "tag": "space", ... }
836+
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tagIndex": 2, "tag": "adventure", ... }
838837
* }</pre>
839838
*
840839
* @param fieldName The name of the field containing the array.
841840
* @param options The {@code UnnestOptions} options.
842841
* @return A new {@code Pipeline} object with this stage appended to the stage list.
843842
*/
844843
@BetaApi
845-
public Pipeline unnest(String fieldName, UnnestOptions options) {
844+
public Pipeline unnest(String fieldName, String alias, UnnestOptions options) {
846845
// return unnest(Field.of(fieldName), options);
847-
return append(new Unnest(Field.of(fieldName), options));
846+
return append(new Unnest(Field.of(fieldName), alias, options));
848847
}
849848

850849
// /**
@@ -905,12 +904,13 @@ public Pipeline unnest(String fieldName, UnnestOptions options) {
905904
*
906905
* @param name The unique name of the generic stage to add.
907906
* @param params A map of parameters to configure the generic stage's behavior.
907+
* @param optionalParams Named optional parameters to configure the generic stage's behavior.
908908
* @return A new {@code Pipeline} object with this stage appended to the stage list.
909909
*/
910910
@BetaApi
911-
public Pipeline genericStage(String name, List<Object> params) {
911+
public Pipeline genericStage(String name, List<Object> params, GenericOptions optionalParams) {
912912
// Implementation for genericStage (add the GenericStage if needed)
913-
return append(new GenericStage(name, params)); // Assuming GenericStage takes a list of params
913+
return append(new GenericStage(name, params, optionalParams)); // Assuming GenericStage takes a list of params
914914
}
915915

916916
/**
@@ -946,7 +946,12 @@ public Pipeline genericStage(String name, List<Object> params) {
946946
*/
947947
@BetaApi
948948
public ApiFuture<List<PipelineResult>> execute() {
949-
return execute((ByteString) null, (com.google.protobuf.Timestamp) null);
949+
return execute(PipelineOptions.DEFAULT, (ByteString) null, (com.google.protobuf.Timestamp) null);
950+
}
951+
952+
@BetaApi
953+
public ApiFuture<List<PipelineResult>> execute(PipelineOptions options) {
954+
return execute(options, (ByteString) null, (com.google.protobuf.Timestamp) null);
950955
}
951956

952957
/**
@@ -996,7 +1001,7 @@ public ApiFuture<List<PipelineResult>> execute() {
9961001
*/
9971002
@BetaApi
9981003
public void execute(ApiStreamObserver<PipelineResult> observer) {
999-
executeInternal(null, null, observer);
1004+
executeInternal(PipelineOptions.DEFAULT, null, null, observer);
10001005
}
10011006

10021007
// @BetaApi
@@ -1016,10 +1021,13 @@ public void execute(ApiStreamObserver<PipelineResult> observer) {
10161021
// }
10171022

10181023
ApiFuture<List<PipelineResult>> execute(
1019-
@Nullable final ByteString transactionId, @Nullable com.google.protobuf.Timestamp readTime) {
1024+
@Nonnull PipelineOptions options,
1025+
@Nullable final ByteString transactionId,
1026+
@Nullable com.google.protobuf.Timestamp readTime) {
10201027
SettableApiFuture<List<PipelineResult>> futureResult = SettableApiFuture.create();
10211028

10221029
executeInternal(
1030+
options,
10231031
transactionId,
10241032
readTime,
10251033
new PipelineResultObserver() {
@@ -1045,13 +1053,17 @@ public void onError(Throwable t) {
10451053
}
10461054

10471055
void executeInternal(
1056+
@Nonnull PipelineOptions options,
10481057
@Nullable final ByteString transactionId,
10491058
@Nullable com.google.protobuf.Timestamp readTime,
10501059
ApiStreamObserver<PipelineResult> observer) {
10511060
ExecutePipelineRequest.Builder request =
10521061
ExecutePipelineRequest.newBuilder()
10531062
.setDatabase(rpcContext.getDatabaseName())
1054-
.setStructuredPipeline(StructuredPipeline.newBuilder().setPipeline(toProto()).build());
1063+
.setStructuredPipeline(StructuredPipeline.newBuilder()
1064+
.setPipeline(toProto())
1065+
.putAllOptions(StageUtils.toMap(options))
1066+
.build());
10551067

10561068
if (transactionId != null) {
10571069
request.setTransaction(transactionId);
@@ -1164,18 +1176,18 @@ public void onComplete() {
11641176

11651177
rpcContext.streamRequest(request, observer, rpcContext.getClient().executePipelineCallable());
11661178
}
1167-
}
11681179

1169-
@InternalExtensionOnly
1170-
abstract class PipelineResultObserver implements ApiStreamObserver<PipelineResult> {
1171-
private Timestamp executionTime; // Remove optional since Java doesn't have it
1180+
@InternalExtensionOnly
1181+
static abstract class PipelineResultObserver implements ApiStreamObserver<PipelineResult> {
1182+
private Timestamp executionTime; // Remove optional since Java doesn't have it
11721183

1173-
public void onCompleted(Timestamp executionTime) {
1174-
this.executionTime = executionTime;
1175-
this.onCompleted();
1176-
}
1184+
public void onCompleted(Timestamp executionTime) {
1185+
this.executionTime = executionTime;
1186+
this.onCompleted();
1187+
}
11771188

1178-
public Timestamp getExecutionTime() { // Add getter for executionTime
1179-
return executionTime;
1189+
public Timestamp getExecutionTime() { // Add getter for executionTime
1190+
return executionTime;
1191+
}
11801192
}
11811193
}

google-cloud-firestore/src/main/java/com/google/cloud/firestore/PipelineSource.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import com.google.api.core.InternalApi;
2121
import com.google.cloud.firestore.pipeline.stages.Collection;
2222
import com.google.cloud.firestore.pipeline.stages.CollectionGroup;
23+
import com.google.cloud.firestore.pipeline.stages.CollectionGroupOptions;
24+
import com.google.cloud.firestore.pipeline.stages.CollectionHints;
25+
import com.google.cloud.firestore.pipeline.stages.CollectionOptions;
2326
import com.google.cloud.firestore.pipeline.stages.Database;
2427
import com.google.cloud.firestore.pipeline.stages.Documents;
2528
import com.google.common.base.Preconditions;
@@ -45,7 +48,7 @@
4548
* }</pre>
4649
*/
4750
@BetaApi
48-
public class PipelineSource {
51+
public final class PipelineSource {
4952
private final FirestoreRpcContext<?> rpcContext;
5053

5154
@InternalApi
@@ -62,7 +65,13 @@ public class PipelineSource {
6265
@Nonnull
6366
@BetaApi
6467
public Pipeline collection(@Nonnull String path) {
65-
return new Pipeline(this.rpcContext, new Collection(path));
68+
return collection(path, CollectionOptions.DEFAULT);
69+
}
70+
71+
@Nonnull
72+
@BetaApi
73+
public Pipeline collection(@Nonnull String path, CollectionOptions options) {
74+
return new Pipeline(this.rpcContext, new Collection(path, options));
6675
}
6776

6877
/**
@@ -78,11 +87,17 @@ public Pipeline collection(@Nonnull String path) {
7887
@Nonnull
7988
@BetaApi
8089
public Pipeline collectionGroup(@Nonnull String collectionId) {
90+
return collectionGroup(collectionId, CollectionGroupOptions.DEFAULT);
91+
}
92+
93+
@Nonnull
94+
@BetaApi
95+
public Pipeline collectionGroup(@Nonnull String collectionId, CollectionGroupOptions options) {
8196
Preconditions.checkArgument(
8297
!collectionId.contains("/"),
8398
"Invalid collectionId '%s'. Collection IDs must not contain '/'.",
8499
collectionId);
85-
return new Pipeline(this.rpcContext, new CollectionGroup(collectionId));
100+
return new Pipeline(this.rpcContext, new CollectionGroup(collectionId, options));
86101
}
87102

88103
/**

google-cloud-firestore/src/main/java/com/google/cloud/firestore/PipelineUtils.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static com.google.cloud.firestore.pipeline.expressions.Function.inAny;
2323
import static com.google.cloud.firestore.pipeline.expressions.Function.not;
2424
import static com.google.cloud.firestore.pipeline.expressions.Function.or;
25+
import static com.google.cloud.firestore.pipeline.expressions.FunctionUtils.exprToValue;
2526

2627
import com.google.api.core.InternalApi;
2728
import com.google.cloud.firestore.Query.ComparisonFilterInternal;
@@ -38,6 +39,7 @@
3839
import com.google.cloud.firestore.pipeline.expressions.Selectable;
3940
import com.google.common.collect.Lists;
4041
import com.google.firestore.v1.Cursor;
42+
import com.google.firestore.v1.MapValue;
4143
import com.google.firestore.v1.Value;
4244
import java.util.HashMap;
4345
import java.util.List;
@@ -51,6 +53,36 @@ public static Value encodeValue(Object value) {
5153
return UserDataConverter.encodeValue(FieldPath.empty(), value, UserDataConverter.ARGUMENT);
5254
}
5355

56+
@InternalApi
57+
public static Value encodeValue(Expr value) {
58+
return exprToValue(value);
59+
}
60+
61+
@InternalApi
62+
public static Value encodeValue(String value) {
63+
return Value.newBuilder().setStringValue(value).build();
64+
}
65+
66+
@InternalApi
67+
public static Value encodeValue(boolean value) {
68+
return Value.newBuilder().setBooleanValue(value).build();
69+
}
70+
71+
@InternalApi
72+
public static Value encodeValue(long value) {
73+
return Value.newBuilder().setIntegerValue(value).build();
74+
}
75+
76+
@InternalApi
77+
public static Value encodeValue(double value) {
78+
return Value.newBuilder().setDoubleValue(value).build();
79+
}
80+
81+
@InternalApi
82+
public static Value encodeValue(Map<String, Value> options) {
83+
return Value.newBuilder().setMapValue(MapValue.newBuilder().putAllFields(options).build()).build();
84+
}
85+
5486
@InternalApi
5587
static FilterCondition toPipelineFilterCondition(FilterInternal f) {
5688
if (f instanceof ComparisonFilterInternal) {

google-cloud-firestore/src/main/java/com/google/cloud/firestore/ReadTimeTransaction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.google.api.core.ApiFuture;
2020
import com.google.api.core.ApiFutures;
21+
import com.google.cloud.firestore.pipeline.stages.PipelineOptions;
2122
import com.google.cloud.firestore.telemetry.TraceUtil;
2223
import com.google.common.base.Preconditions;
2324
import com.google.common.util.concurrent.MoreExecutors;
@@ -129,7 +130,7 @@ public ApiFuture<AggregateQuerySnapshot> get(@Nonnull AggregateQuery query) {
129130
@Override
130131
public ApiFuture<List<PipelineResult>> execute(@Nonnull Pipeline pipeline) {
131132
try (TraceUtil.Scope ignored = transactionTraceContext.makeCurrent()) {
132-
return pipeline.execute(null, readTime);
133+
return pipeline.execute(PipelineOptions.DEFAULT, null, readTime);
133134
}
134135
}
135136

0 commit comments

Comments
 (0)