Skip to content

Commit 593d39e

Browse files
committed
executions, group bys, etc.
1 parent 12d4f29 commit 593d39e

File tree

11 files changed

+234
-141
lines changed

11 files changed

+234
-141
lines changed

google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ public CollectionGroup collectionGroup(@Nonnull final String collectionId) {
390390
@Override
391391
@BetaApi
392392
public PipelineSource pipeline() {
393-
return new PipelineSource();
393+
return new PipelineSource(this);
394394
}
395395

396396
@Nonnull

google-cloud-firestore/src/main/java/com/google/cloud/firestore/Pipeline.java

Lines changed: 35 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,7 @@
1010
import com.google.cloud.Timestamp;
1111
import com.google.cloud.firestore.pipeline.PaginatingPipeline;
1212
import com.google.cloud.firestore.pipeline.expressions.AccumulatorTarget;
13-
import com.google.cloud.firestore.pipeline.expressions.Expr;
14-
import com.google.cloud.firestore.pipeline.expressions.ExprWithAlias;
1513
import com.google.cloud.firestore.pipeline.expressions.Field;
16-
import com.google.cloud.firestore.pipeline.expressions.Fields;
1714
import com.google.cloud.firestore.pipeline.expressions.FilterCondition;
1815
import com.google.cloud.firestore.pipeline.expressions.Ordering;
1916
import com.google.cloud.firestore.pipeline.expressions.Selectable;
@@ -44,7 +41,6 @@
4441
import io.opencensus.trace.Tracing;
4542
import java.util.ArrayList;
4643
import java.util.Arrays;
47-
import java.util.HashMap;
4844
import java.util.List;
4945
import java.util.Map;
5046
import java.util.logging.Level;
@@ -86,106 +82,93 @@
8682
@BetaApi
8783
public final class Pipeline {
8884
private final ImmutableList<Stage> stages;
85+
private final Firestore db;
8986

90-
private Pipeline(List<Stage> stages) {
87+
private Pipeline(Firestore db, List<Stage> stages) {
88+
this.db = db;
9189
this.stages = ImmutableList.copyOf(stages);
9290
}
9391

94-
Pipeline(Collection collection) {
95-
this(Lists.newArrayList(collection));
92+
Pipeline(Firestore db, Collection collection) {
93+
this(db, Lists.newArrayList(collection));
9694
}
9795

98-
Pipeline(CollectionGroup group) {
99-
this(Lists.newArrayList(group));
96+
Pipeline(Firestore db, CollectionGroup group) {
97+
this(db, Lists.newArrayList(group));
10098
}
10199

102-
Pipeline(Database db) {
103-
this(Lists.newArrayList(db));
100+
Pipeline(Firestore firestore, Database db) {
101+
this(firestore, Lists.newArrayList(db));
104102
}
105103

106-
Pipeline(Documents docs) {
107-
this(Lists.newArrayList(docs));
108-
}
109-
110-
private Map<String, Expr> projectablesToMap(Selectable... selectables) {
111-
Map<String, Expr> projMap = new HashMap<>();
112-
for (Selectable proj : selectables) {
113-
if (proj instanceof Field) {
114-
Field fieldProj = (Field) proj;
115-
projMap.put(fieldProj.getPath().getEncodedPath(), fieldProj);
116-
} else if (proj instanceof AccumulatorTarget) {
117-
AccumulatorTarget aggregatorProj = (AccumulatorTarget) proj;
118-
projMap.put(aggregatorProj.getFieldName(), aggregatorProj.getAccumulator());
119-
} else if (proj instanceof Fields) {
120-
Fields fieldsProj = (Fields) proj;
121-
if (fieldsProj.getFields() != null) {
122-
fieldsProj.getFields().forEach(f -> projMap.put(f.getPath().getEncodedPath(), f));
123-
}
124-
} else if (proj instanceof ExprWithAlias) {
125-
ExprWithAlias exprWithAlias = (ExprWithAlias) proj;
126-
projMap.put(exprWithAlias.getAlias(), exprWithAlias.getExpr());
127-
}
128-
}
129-
return projMap;
130-
}
131-
132-
private Map<String, Expr> fieldNamesToMap(String... fields) {
133-
Map<String, Expr> projMap = new HashMap<>();
134-
for (String field : fields) {
135-
projMap.put(field, Field.of(field));
136-
}
137-
return projMap;
104+
Pipeline(Firestore db, Documents docs) {
105+
this(db, Lists.newArrayList(docs));
138106
}
139107

140108
@BetaApi
141109
public Pipeline addFields(Selectable... fields) {
142110
return new Pipeline(
111+
this.db,
143112
ImmutableList.<Stage>builder()
144113
.addAll(stages)
145-
.add(new AddFields(projectablesToMap(fields)))
114+
.add(new AddFields(PipelineUtils.selectablesToMap(fields)))
146115
.build());
147116
}
148117

149118
@BetaApi
150119
public Pipeline select(Selectable... projections) {
151120
return new Pipeline(
121+
this.db,
152122
ImmutableList.<Stage>builder()
153123
.addAll(stages)
154-
.add(new Select(projectablesToMap(projections)))
124+
.add(new Select(PipelineUtils.selectablesToMap(projections)))
155125
.build());
156126
}
157127

158128
@BetaApi
159129
public Pipeline select(String... fields) {
160130
return new Pipeline(
131+
this.db,
161132
ImmutableList.<Stage>builder()
162133
.addAll(stages)
163-
.add(new Select(fieldNamesToMap(fields)))
134+
.add(new Select(PipelineUtils.fieldNamesToMap(fields)))
164135
.build());
165136
}
166137

167138
@BetaApi
168139
public Pipeline where(FilterCondition condition) {
169140
return new Pipeline(
141+
this.db,
170142
ImmutableList.<Stage>builder().addAll(stages).add(new Where(condition)).build());
171143
}
172144

173145
@BetaApi
174146
public Pipeline offset(int offset) {
175147
return new Pipeline(
148+
this.db,
176149
ImmutableList.<Stage>builder().addAll(stages).add(new Offset(offset)).build());
177150
}
178151

179152
@BetaApi
180153
public Pipeline limit(int limit) {
181154
return new Pipeline(
155+
this.db,
182156
ImmutableList.<Stage>builder().addAll(stages).add(new Limit(limit)).build());
183157
}
184158

185159
@BetaApi
186160
public Pipeline aggregate(AccumulatorTarget... aggregators) {
187161
return new Pipeline(
188-
ImmutableList.<Stage>builder().addAll(stages).add(new Aggregate(aggregators)).build());
162+
this.db,
163+
ImmutableList.<Stage>builder().addAll(stages).add(Aggregate.newInstance().withAccumulators(aggregators)).build());
164+
}
165+
166+
@BetaApi
167+
public Pipeline aggregate(Aggregate aggregate) {
168+
return new Pipeline(
169+
this.db,
170+
ImmutableList.<Stage>builder().addAll(stages)
171+
.add(aggregate).build());
189172
}
190173

191174
@BetaApi
@@ -199,6 +182,7 @@ public Pipeline findNearest(
199182
Field property, double[] vector, FindNearest.FindNearestOptions options) {
200183
// Implementation for findNearest (add the FindNearest stage if needed)
201184
return new Pipeline(
185+
this.db,
202186
ImmutableList.<Stage>builder()
203187
.addAll(stages)
204188
.add(
@@ -210,6 +194,7 @@ public Pipeline findNearest(
210194
@BetaApi
211195
public Pipeline sort(List<Ordering> orders, Sort.Density density, Sort.Truncation truncation) {
212196
return new Pipeline(
197+
this.db,
213198
ImmutableList.<Stage>builder()
214199
.addAll(stages)
215200
.add(new Sort(orders, density, truncation))
@@ -231,6 +216,7 @@ public PaginatingPipeline paginate(int pageSize, Ordering... orders) {
231216
public Pipeline genericStage(String name, Map<String, Object> params) {
232217
// Implementation for genericStage (add the GenericStage if needed)
233218
return new Pipeline(
219+
this.db,
234220
ImmutableList.<Stage>builder()
235221
.addAll(stages)
236222
.add(
@@ -242,7 +228,7 @@ public Pipeline genericStage(String name, Map<String, Object> params) {
242228
}
243229

244230
@BetaApi
245-
public ApiFuture<List<PipelineResult>> execute(Firestore db) {
231+
public ApiFuture<List<PipelineResult>> execute() {
246232
if (db instanceof FirestoreImpl) {
247233
FirestoreImpl firestoreImpl = (FirestoreImpl) db;
248234
Value pipelineValue = toProto();
@@ -287,7 +273,7 @@ public void onError(Throwable t) {
287273
}
288274

289275
@BetaApi
290-
public void execute(Firestore db, ApiStreamObserver<PipelineResult> observer) {
276+
public void execute(ApiStreamObserver<PipelineResult> observer) {
291277
if (db instanceof FirestoreImpl) {
292278
FirestoreImpl firestoreImpl = (FirestoreImpl) db;
293279
Value pipelineValue = toProto();
Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.google.cloud.firestore;
22

33
import com.google.api.core.BetaApi;
4+
import com.google.api.core.InternalApi;
45
import com.google.cloud.firestore.pipeline.stages.Collection;
56
import com.google.cloud.firestore.pipeline.stages.CollectionGroup;
67
import com.google.cloud.firestore.pipeline.stages.Database;
@@ -10,11 +11,17 @@
1011

1112
@BetaApi
1213
public class PipelineSource {
14+
private final Firestore db;
15+
16+
@InternalApi
17+
PipelineSource(Firestore db){
18+
this.db = db;
19+
}
1320

1421
@Nonnull
1522
@BetaApi
1623
public Pipeline collection(@Nonnull String path) {
17-
return new Pipeline(new Collection(path));
24+
return new Pipeline(this.db,new Collection(path));
1825
}
1926

2027
@Nonnull
@@ -24,18 +31,18 @@ public Pipeline collectionGroup(@Nonnull String collectionId) {
2431
!collectionId.contains("/"),
2532
"Invalid collectionId '%s'. Collection IDs must not contain '/'.",
2633
collectionId);
27-
return new Pipeline(new CollectionGroup(collectionId));
34+
return new Pipeline(this.db, new CollectionGroup(collectionId));
2835
}
2936

3037
@Nonnull
3138
@BetaApi
3239
public Pipeline database() {
33-
return new Pipeline(new Database());
40+
return new Pipeline(this.db, new Database());
3441
}
3542

3643
@Nonnull
3744
@BetaApi
3845
public Pipeline documents(DocumentReference... docs) {
39-
return new Pipeline(Documents.of(docs));
46+
return new Pipeline(this.db, Documents.of(docs));
4047
}
4148
}

google-cloud-firestore/src/main/java/com/google/cloud/firestore/PipelineUtils.java

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,18 @@
1515
import com.google.cloud.firestore.Query.UnaryFilterInternal;
1616
import com.google.cloud.firestore.pipeline.PaginatingPipeline;
1717
import com.google.cloud.firestore.pipeline.expressions.AccumulatorTarget;
18+
import com.google.cloud.firestore.pipeline.expressions.Expr;
19+
import com.google.cloud.firestore.pipeline.expressions.ExprWithAlias;
1820
import com.google.cloud.firestore.pipeline.expressions.Field;
21+
import com.google.cloud.firestore.pipeline.expressions.Fields;
1922
import com.google.cloud.firestore.pipeline.expressions.FilterCondition;
23+
import com.google.cloud.firestore.pipeline.expressions.Selectable;
2024
import com.google.common.collect.Lists;
2125
import com.google.firestore.v1.Cursor;
2226
import com.google.firestore.v1.Value;
27+
import java.util.HashMap;
2328
import java.util.List;
29+
import java.util.Map;
2430
import java.util.stream.Collectors;
2531

2632
@InternalApi
@@ -158,15 +164,47 @@ static AccumulatorTarget toPipelineAggregatorTarget(AggregateField f) {
158164

159165
switch (operator) {
160166
case "sum":
161-
return Field.of(fieldPath).sum().toField(f.getAlias());
167+
return Field.of(fieldPath).sum().as(f.getAlias());
162168

163169
case "count":
164-
return countAll().toField(f.getAlias());
170+
return countAll().as(f.getAlias());
165171
case "average":
166-
return Field.of(fieldPath).avg().toField(f.getAlias());
172+
return Field.of(fieldPath).avg().as(f.getAlias());
167173
default:
168174
// Handle the 'else' case appropriately in your Java code
169175
throw new IllegalArgumentException("Unsupported operator: " + operator);
170176
}
171177
}
178+
179+
@InternalApi
180+
public static Map<String, Expr> selectablesToMap(Selectable... selectables) {
181+
Map<String, Expr> projMap = new HashMap<>();
182+
for (Selectable proj : selectables) {
183+
if (proj instanceof Field) {
184+
Field fieldProj = (Field) proj;
185+
projMap.put(fieldProj.getPath().getEncodedPath(), fieldProj);
186+
} else if (proj instanceof AccumulatorTarget) {
187+
AccumulatorTarget aggregatorProj = (AccumulatorTarget) proj;
188+
projMap.put(aggregatorProj.getFieldName(), aggregatorProj.getAccumulator());
189+
} else if (proj instanceof Fields) {
190+
Fields fieldsProj = (Fields) proj;
191+
if (fieldsProj.getFields() != null) {
192+
fieldsProj.getFields().forEach(f -> projMap.put(f.getPath().getEncodedPath(), f));
193+
}
194+
} else if (proj instanceof ExprWithAlias) {
195+
ExprWithAlias exprWithAlias = (ExprWithAlias) proj;
196+
projMap.put(exprWithAlias.getAlias(), exprWithAlias.getExpr());
197+
}
198+
}
199+
return projMap;
200+
}
201+
202+
@InternalApi
203+
public static Map<String, Expr> fieldNamesToMap(String... fields) {
204+
Map<String, Expr> projMap = new HashMap<>();
205+
for (String field : fields) {
206+
projMap.put(field, Field.of(field));
207+
}
208+
return projMap;
209+
}
172210
}

google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2128,8 +2128,8 @@ public Pipeline pipeline() {
21282128
// From
21292129
Pipeline ppl =
21302130
this.options.getAllDescendants()
2131-
? new PipelineSource().collectionGroup(this.options.getCollectionId())
2132-
: new PipelineSource()
2131+
? new PipelineSource(this.getFirestore()).collectionGroup(this.options.getCollectionId())
2132+
: new PipelineSource(this.getFirestore())
21332133
.collection(
21342134
this.options.getParentPath().append(this.options.getCollectionId()).getPath());
21352135

google-cloud-firestore/src/main/java/com/google/cloud/firestore/pipeline/expressions/Accumulator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
@BetaApi
66
public interface Accumulator extends Expr {
77
@BetaApi
8-
default AccumulatorTarget toField(String fieldName) {
8+
@Override
9+
default AccumulatorTarget as(String fieldName) {
910
return new AccumulatorTarget(this, fieldName, false);
1011
}
1112
}

google-cloud-firestore/src/main/java/com/google/cloud/firestore/pipeline/expressions/Expr.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ default Ordering descending() {
300300
}
301301

302302
@BetaApi
303-
default Selectable asAlias(String alias) {
303+
default Selectable as(String alias) {
304304
return new ExprWithAlias(this, alias);
305305
}
306306
}

0 commit comments

Comments
 (0)