Skip to content

Commit d6bb0d2

Browse files
authored
Implement new stages. (#1908)
* Implement new stages. * Replace toList() with older languages feature. * Pretty * Pretty * Comment out under development API surface * Pretty
1 parent 382a5cd commit d6bb0d2

File tree

17 files changed

+819
-129
lines changed

17 files changed

+819
-129
lines changed

google-cloud-firestore/src/main/java/com/google/cloud/firestore/Pipeline.java

Lines changed: 321 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,16 @@
4242
import com.google.cloud.firestore.pipeline.stages.Limit;
4343
import com.google.cloud.firestore.pipeline.stages.Offset;
4444
import com.google.cloud.firestore.pipeline.stages.RemoveFields;
45+
import com.google.cloud.firestore.pipeline.stages.Replace;
46+
import com.google.cloud.firestore.pipeline.stages.Sample;
47+
import com.google.cloud.firestore.pipeline.stages.SampleOptions;
4548
import com.google.cloud.firestore.pipeline.stages.Select;
4649
import com.google.cloud.firestore.pipeline.stages.Sort;
4750
import com.google.cloud.firestore.pipeline.stages.Stage;
4851
import com.google.cloud.firestore.pipeline.stages.StageUtils;
52+
import com.google.cloud.firestore.pipeline.stages.Union;
53+
import com.google.cloud.firestore.pipeline.stages.Unnest;
54+
import com.google.cloud.firestore.pipeline.stages.UnnestOptions;
4955
import com.google.cloud.firestore.pipeline.stages.Where;
5056
import com.google.common.collect.FluentIterable;
5157
import com.google.common.collect.ImmutableList;
@@ -54,6 +60,7 @@
5460
import com.google.firestore.v1.ExecutePipelineRequest;
5561
import com.google.firestore.v1.ExecutePipelineResponse;
5662
import com.google.firestore.v1.StructuredPipeline;
63+
import com.google.firestore.v1.Value;
5764
import com.google.protobuf.ByteString;
5865
import io.opencensus.trace.AttributeValue;
5966
import io.opencensus.trace.Tracing;
@@ -586,6 +593,297 @@ public Pipeline sort(Ordering... orders) {
586593
return append(new Sort(orders));
587594
}
588595

596+
/**
597+
* Fully overwrites all fields in a document with those coming from a nested map.
598+
*
599+
* <p>This stage allows you to emit a map value as a document. Each key of the map becomes a field
600+
* on the document that contains the corresponding value.
601+
*
602+
* <p>Example:
603+
*
604+
* <pre>{@code
605+
* // Input.
606+
* // {
607+
* // "name": "John Doe Jr.",
608+
* // "parents": {
609+
* // "father": "John Doe Sr.",
610+
* // "mother": "Jane Doe"
611+
* // }
612+
*
613+
* // Emit parents as document.
614+
* firestore.pipeline().collection("people").replace("parents");
615+
*
616+
* // Output
617+
* // {
618+
* // "father": "John Doe Sr.",
619+
* // "mother": "Jane Doe"
620+
* // }
621+
* }</pre>
622+
*
623+
* @param fieldName The name of the field containing the nested map.
624+
* @return A new {@code Pipeline} object with this stage appended to the stage list.
625+
*/
626+
@BetaApi
627+
public Pipeline replace(String fieldName) {
628+
return replace(Field.of(fieldName));
629+
}
630+
631+
/**
632+
* Fully overwrites all fields in a document with those coming from a nested map.
633+
*
634+
* <p>This stage allows you to emit a map value as a document. Each key of the map becomes a field
635+
* on the document that contains the corresponding value.
636+
*
637+
* <p>Example:
638+
*
639+
* <pre>{@code
640+
* // Input.
641+
* // {
642+
* // "name": "John Doe Jr.",
643+
* // "parents": {
644+
* // "father": "John Doe Sr.",
645+
* // "mother": "Jane Doe"
646+
* // }
647+
*
648+
* // Emit parents as document.
649+
* firestore.pipeline().collection("people").replace(Field.of("parents"));
650+
*
651+
* // Output
652+
* // {
653+
* // "father": "John Doe Sr.",
654+
* // "mother": "Jane Doe"
655+
* // }
656+
* }</pre>
657+
*
658+
* @param field The {@link Selectable} field containing the nested map.
659+
* @return A new {@code Pipeline} object with this stage appended to the stage list.
660+
*/
661+
@BetaApi
662+
public Pipeline replace(Selectable field) {
663+
return append(new Replace(field));
664+
}
665+
666+
/**
667+
* Performs a pseudo-random sampling of the documents from the previous stage.
668+
*
669+
* <p>This stage will filter documents pseudo-randomly. The 'limit' parameter specifies the number
670+
* of documents to emit from this stage, but if there are fewer documents from previous stage than
671+
* the 'limit' parameter, then no filtering will occur and all documents will pass through.
672+
*
673+
* <p>Example:
674+
*
675+
* <pre>{@code
676+
* // Sample 10 books, if available.
677+
* firestore.pipeline().collection("books")
678+
* .sample(10);
679+
* }</pre>
680+
*
681+
* @param limit The number of documents to emit, if possible.
682+
* @return A new {@code Pipeline} object with this stage appended to the stage list.
683+
*/
684+
@BetaApi
685+
public Pipeline sample(int limit) {
686+
SampleOptions options = SampleOptions.docLimit(limit);
687+
return sample(options);
688+
}
689+
690+
/**
691+
* Performs a pseudo-random sampling of the documents from the previous stage.
692+
*
693+
* <p>This stage will filter documents pseudo-randomly. The 'options' parameter specifies how
694+
* sampling will be performed. See {@code SampleOptions} for more information.
695+
*
696+
* <p>Examples:
697+
*
698+
* <pre>{@code
699+
* // Sample 10 books, if available.
700+
* firestore.pipeline().collection("books")
701+
* .sample(SampleOptions.docLimit(10));
702+
*
703+
* // Sample 50% of books.
704+
* firestore.pipeline().collection("books")
705+
* .sample(SampleOptions.percentage(0.5));
706+
* }</pre>
707+
*
708+
* @param options The {@code SampleOptions} specifies how sampling is performed.
709+
* @return A new {@code Pipeline} object with this stage appended to the stage list.
710+
*/
711+
@BetaApi
712+
public Pipeline sample(SampleOptions options) {
713+
return append(new Sample(options));
714+
}
715+
716+
/**
717+
* Performs union of all documents from two pipelines, including duplicates.
718+
*
719+
* <p>This stage will pass through documents from previous stage, and also pass through documents
720+
* from previous stage of the `other` {@code Pipeline} given in parameter. The order of documents
721+
* emitted from this stage is undefined.
722+
*
723+
* <p>Example:
724+
*
725+
* <pre>{@code
726+
* // Emit documents from books collection and magazines collection.
727+
* firestore.pipeline().collection("books")
728+
* .union(firestore.pipeline().collection("magazines"));
729+
* }</pre>
730+
*
731+
* @param other The other {@code Pipeline} that is part of union.
732+
* @return A new {@code Pipeline} object with this stage appended to the stage list.
733+
*/
734+
@BetaApi
735+
public Pipeline union(Pipeline other) {
736+
return append(new Union(other));
737+
}
738+
739+
/**
740+
* Produces a document for each element in array found in previous stage document.
741+
*
742+
* <p>For each previous stage document, this stage will emit zero or more augmented documents. The
743+
* input array found in the previous stage document field specified by the `fieldName` parameter,
744+
* will for each input array element produce an augmented document. The input array element will
745+
* augment the previous stage document by replacing the field specified by `fieldName` parameter
746+
* with the element value.
747+
*
748+
* <p>In other words, the field containing the input array will be removed from the augmented
749+
* document and replaced by the corresponding array element.
750+
*
751+
* <p>Example:
752+
*
753+
* <pre>{@code
754+
* // Input:
755+
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tags": [ "comedy", "space", "adventure" ], ... }
756+
*
757+
* // Emit a book document for each tag of the book.
758+
* firestore.pipeline().collection("books")
759+
* .unnest("tags");
760+
*
761+
* // Output:
762+
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tags": "comedy", ... }
763+
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tags": "space", ... }
764+
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tags": "adventure", ... }
765+
* }</pre>
766+
*
767+
* @param fieldName The name of the field containing the array.
768+
* @return A new {@code Pipeline} object with this stage appended to the stage list.
769+
*/
770+
@BetaApi
771+
public Pipeline unnest(String fieldName) {
772+
// return unnest(Field.of(fieldName));
773+
return append(new Unnest(Field.of(fieldName)));
774+
}
775+
776+
// /**
777+
// * Produces a document for each element in array found in previous stage document.
778+
// *
779+
// * <p>For each previous stage document, this stage will emit zero or more augmented documents.
780+
// * The input array found in the specified by {@code Selectable} expression parameter, will for
781+
// * each input array element produce an augmented document. The input array element will augment
782+
// * the previous stage document by assigning the {@code Selectable} alias the element value.
783+
// *
784+
// * <p>Example:
785+
// *
786+
// * <pre>{@code
787+
// * // Input:
788+
// * // { "title": "The Hitchhiker's Guide to the Galaxy", "tags": [ "comedy", "space",
789+
// "adventure" ], ... }
790+
// *
791+
// * // Emit a book document for each tag of the book.
792+
// * firestore.pipeline().collection("books")
793+
// * .unnest(Field.of("tags").as("tag"));
794+
// *
795+
// * // Output:
796+
// * // { "title": "The Hitchhiker's Guide to the Galaxy", "tag": "comedy", "tags": [ "comedy",
797+
// "space", "adventure" ], ... }
798+
// * // { "title": "The Hitchhiker's Guide to the Galaxy", "tag": "space", "tags": [ "comedy",
799+
// "space", "adventure" ], ... }
800+
// * // { "title": "The Hitchhiker's Guide to the Galaxy", "tag": "adventure", "tags": [
801+
// "comedy", "space", "adventure" ], ... }
802+
// * }</pre>
803+
// *
804+
// * @param field The expression that evaluates to the input array.
805+
// * @return A new {@code Pipeline} object with this stage appended to the stage list.
806+
// */
807+
// @BetaApi
808+
// public Pipeline unnest(Selectable field) {
809+
// return append(new Unnest(field));
810+
// }
811+
812+
/**
813+
* Produces a document for each element in array found in previous stage document.
814+
*
815+
* <p>For each previous stage document, this stage will emit zero or more augmented documents. The
816+
* input array found in the previous stage document field specified by the `fieldName` parameter,
817+
* will for each input array element produce an augmented document. The input array element will
818+
* augment the previous stage document by replacing the field specified by `fieldName` parameter
819+
* with the element value.
820+
*
821+
* <p>In other words, the field containing the input array will be removed from the augmented
822+
* document and replaced by the corresponding array element.
823+
*
824+
* <p>Example:
825+
*
826+
* <pre>{@code
827+
* // Input:
828+
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tags": [ "comedy", "space", "adventure" ], ... }
829+
*
830+
* // Emit a book document for each tag of the book.
831+
* firestore.pipeline().collection("books")
832+
* .unnest("tags", UnnestOptions.indexField("tagIndex"));
833+
*
834+
* // Output:
835+
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tagIndex": 0, "tags": "comedy", ... }
836+
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tagIndex": 1, "tags": "space", ... }
837+
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tagIndex": 2, "tags": "adventure", ... }
838+
* }</pre>
839+
*
840+
* @param fieldName The name of the field containing the array.
841+
* @param options The {@code UnnestOptions} options.
842+
* @return A new {@code Pipeline} object with this stage appended to the stage list.
843+
*/
844+
@BetaApi
845+
public Pipeline unnest(String fieldName, UnnestOptions options) {
846+
// return unnest(Field.of(fieldName), options);
847+
return append(new Unnest(Field.of(fieldName), options));
848+
}
849+
850+
// /**
851+
// * Produces a document for each element in array found in previous stage document.
852+
// *
853+
// * <p>For each previous stage document, this stage will emit zero or more augmented documents.
854+
// * The input array found in the specified by {@code Selectable} expression parameter, will for
855+
// * each input array element produce an augmented document. The input array element will augment
856+
// * the previous stage document by assigning the {@code Selectable} alias the element value.
857+
// *
858+
// * <p>Example:
859+
// *
860+
// * <pre>{@code
861+
// * // Input:
862+
// * // { "title": "The Hitchhiker's Guide to the Galaxy", "tags": [ "comedy", "space",
863+
// "adventure" ], ... }
864+
// *
865+
// * // Emit a book document for each tag of the book.
866+
// * firestore.pipeline().collection("books")
867+
// * .unnest(Field.of("tags").as("tag"), UnnestOptions.indexField("tagIndex"));
868+
// *
869+
// * // Output:
870+
// * // { "title": "The Hitchhiker's Guide to the Galaxy", "tagIndex": 0, "tag": "comedy",
871+
// "tags": [ "comedy", "space", "adventure" ], ... }
872+
// * // { "title": "The Hitchhiker's Guide to the Galaxy", "tagIndex": 1, "tag": "space", "tags":
873+
// [ "comedy", "space", "adventure" ], ... }
874+
// * // { "title": "The Hitchhiker's Guide to the Galaxy", "tagIndex": 2, "tag": "adventure",
875+
// "tags": [ "comedy", "space", "adventure" ], ... }
876+
// * }</pre>
877+
// *
878+
// * @param field The expression that evaluates to the input array.
879+
// * @param options The {@code UnnestOptions} options.
880+
// * @return A new {@code Pipeline} object with this stage appended to the stage list.
881+
// */
882+
// @BetaApi
883+
// public Pipeline unnest(Selectable field, UnnestOptions options) {
884+
// return append(new Unnest(field, options));
885+
// }
886+
589887
/**
590888
* Adds a generic stage to the pipeline.
591889
*
@@ -648,7 +946,7 @@ public Pipeline genericStage(String name, List<Object> params) {
648946
*/
649947
@BetaApi
650948
public ApiFuture<List<PipelineResult>> execute() {
651-
return execute(null, null);
949+
return execute((ByteString) null, (com.google.protobuf.Timestamp) null);
652950
}
653951

654952
/**
@@ -701,6 +999,22 @@ public void execute(ApiStreamObserver<PipelineResult> observer) {
701999
executeInternal(null, null, observer);
7021000
}
7031001

1002+
// @BetaApi
1003+
// public void execute(ApiStreamObserver<PipelineResult> observer, PipelineOptions options) {
1004+
// throw new RuntimeException("Not Implemented");
1005+
// }
1006+
//
1007+
// @BetaApi
1008+
// public ApiFuture<List<PipelineResult>> explain() {
1009+
// throw new RuntimeException("Not Implemented");
1010+
// }
1011+
//
1012+
// @BetaApi
1013+
// public void explain(ApiStreamObserver<PipelineResult> observer, PipelineExplainOptions options)
1014+
// {
1015+
// throw new RuntimeException("Not Implemented");
1016+
// }
1017+
7041018
ApiFuture<List<PipelineResult>> execute(
7051019
@Nullable final ByteString transactionId, @Nullable com.google.protobuf.Timestamp readTime) {
7061020
SettableApiFuture<List<PipelineResult>> futureResult = SettableApiFuture.create();
@@ -767,12 +1081,18 @@ public void onError(Throwable t) {
7671081
});
7681082
}
7691083

1084+
@InternalApi
7701085
private com.google.firestore.v1.Pipeline toProto() {
7711086
return com.google.firestore.v1.Pipeline.newBuilder()
7721087
.addAllStages(stages.transform(StageUtils::toStageProto))
7731088
.build();
7741089
}
7751090

1091+
@InternalApi
1092+
public com.google.firestore.v1.Value toProtoValue() {
1093+
return Value.newBuilder().setPipelineValue(toProto()).build();
1094+
}
1095+
7761096
private void pipelineInternalStream(
7771097
ExecutePipelineRequest request, PipelineResultObserver resultObserver) {
7781098
ResponseObserver<ExecutePipelineResponse> observer =

google-cloud-firestore/src/main/java/com/google/cloud/firestore/PipelineUtils.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import com.google.cloud.firestore.pipeline.expressions.Expr;
3535
import com.google.cloud.firestore.pipeline.expressions.ExprWithAlias;
3636
import com.google.cloud.firestore.pipeline.expressions.Field;
37-
import com.google.cloud.firestore.pipeline.expressions.Fields;
3837
import com.google.cloud.firestore.pipeline.expressions.FilterCondition;
3938
import com.google.cloud.firestore.pipeline.expressions.Selectable;
4039
import com.google.common.collect.Lists;
@@ -173,11 +172,6 @@ public static Map<String, Expr> selectablesToMap(Selectable... selectables) {
173172
} else if (proj instanceof AccumulatorTarget) {
174173
AccumulatorTarget aggregatorProj = (AccumulatorTarget) proj;
175174
projMap.put(aggregatorProj.getFieldName(), aggregatorProj.getAccumulator());
176-
} else if (proj instanceof Fields) {
177-
Fields fieldsProj = (Fields) proj;
178-
if (fieldsProj.getFields() != null) {
179-
fieldsProj.getFields().forEach(f -> projMap.put(f.getPath().getEncodedPath(), f));
180-
}
181175
} else if (proj instanceof ExprWithAlias) {
182176
ExprWithAlias exprWithAlias = (ExprWithAlias) proj;
183177
projMap.put(exprWithAlias.getAlias(), exprWithAlias.getExpr());

0 commit comments

Comments
 (0)