Skip to content

Commit 0f63a54

Browse files
committed
limit to last, cursors and multitab for documents and database stages
1 parent 58124c4 commit 0f63a54

19 files changed

+236
-126
lines changed

packages/firestore/src/core/pipeline-util.ts

Lines changed: 70 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,17 @@
1313
// limitations under the License.
1414

1515
import {
16+
And,
1617
and,
1718
Constant,
1819
Expr,
1920
Field,
2021
FilterCondition,
2122
FirestoreFunction,
23+
gt,
24+
gte,
25+
lt,
26+
lte,
2227
not,
2328
or,
2429
Ordering
@@ -85,6 +90,7 @@ import { Firestore } from '../api/database';
8590
import { doc } from '../lite-api/reference';
8691
import { Direction } from './order_by';
8792
import { CorePipeline } from './pipeline_run';
93+
import { Bound } from './bound';
8894

8995
/* eslint @typescript-eslint/no-explicit-any: 0 */
9096

@@ -295,6 +301,16 @@ export function toPipelineFilterCondition(
295301
throw new Error(`Failed to convert filter to pipeline conditions: ${f}`);
296302
}
297303

304+
function reverseOrderings(orderings: Ordering[]): Ordering[] {
305+
return orderings.map(
306+
o =>
307+
new Ordering(
308+
o.expr,
309+
o.direction === 'ascending' ? 'descending' : 'ascending'
310+
)
311+
);
312+
}
313+
298314
export function toPipeline(query: Query, db: Firestore): Pipeline {
299315
let pipeline: Pipeline;
300316
if (isCollectionGroupQuery(query)) {
@@ -323,28 +339,68 @@ export function toPipeline(query: Query, db: Firestore): Pipeline {
323339
pipeline = pipeline.where(existsConditions[0]);
324340
}
325341

326-
pipeline = pipeline.sort(
327-
...orders.map(order =>
328-
order.dir === Direction.ASCENDING
329-
? Field.of(order.field.canonicalString()).ascending()
330-
: Field.of(order.field.canonicalString()).descending()
331-
)
342+
const orderings = orders.map(order =>
343+
order.dir === Direction.ASCENDING
344+
? Field.of(order.field.canonicalString()).ascending()
345+
: Field.of(order.field.canonicalString()).descending()
332346
);
333347

334-
// cursors and limits
335-
if (query.startAt !== null || query.endAt !== null) {
336-
throw new Error('Cursors are not supported yet.');
337-
}
338348
if (query.limitType === LimitType.Last) {
339-
throw new Error('Limit to last are not supported yet.');
340-
}
341-
if (query.limit !== null) {
342-
pipeline = pipeline.limit(query.limit);
349+
pipeline = pipeline.sort(...reverseOrderings(orderings));
350+
// cursors
351+
if (query.startAt !== null) {
352+
pipeline = pipeline.where(
353+
whereConditionsFromCursor(query.startAt, orderings, 'before')
354+
);
355+
}
356+
357+
if (query.endAt !== null) {
358+
pipeline = pipeline.where(
359+
whereConditionsFromCursor(query.endAt, orderings, 'after')
360+
);
361+
}
362+
363+
pipeline = pipeline._limit(query.limit!, true);
364+
pipeline = pipeline.sort(...orderings);
365+
} else {
366+
pipeline = pipeline.sort(...orderings);
367+
if (query.startAt !== null) {
368+
pipeline = pipeline.where(
369+
whereConditionsFromCursor(query.startAt, orderings, 'after')
370+
);
371+
}
372+
if (query.endAt !== null) {
373+
pipeline = pipeline.where(
374+
whereConditionsFromCursor(query.endAt, orderings, 'before')
375+
);
376+
}
377+
378+
if (query.limit !== null) {
379+
pipeline = pipeline.limit(query.limit);
380+
}
343381
}
344382

345383
return pipeline;
346384
}
347385

386+
function whereConditionsFromCursor(
387+
bound: Bound,
388+
orderings: Ordering[],
389+
position: 'before' | 'after'
390+
): And {
391+
const cursors = bound.position.map(value => Constant._fromProto(value));
392+
const filterFunc = position === 'before' ? lt : gt;
393+
const filterInclusiveFunc = position === 'before' ? lte : gte;
394+
const conditions = cursors.map((cursor, index) => {
395+
if (!!bound.inclusive && index === cursors.length - 1) {
396+
return filterInclusiveFunc(orderings[index].expr as Field, cursor);
397+
} else {
398+
return filterFunc(orderings[index].expr as Field, cursor);
399+
}
400+
});
401+
return new And(conditions);
402+
}
403+
348404
function canonifyExpr(expr: Expr): string {
349405
if (expr instanceof Field) {
350406
return `fld(${expr.fieldName()})`;

packages/firestore/src/core/pipeline_run.ts

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -283,15 +283,16 @@ function lastEffectiveSort(pipeline: CorePipeline): Ordering[] {
283283

284284
export function getLastEffectiveLimit(
285285
pipeline: CorePipeline
286-
): number | undefined {
287-
// return the last sort stage, throws exception if it doesn't exist
288-
// TODO(pipeline): this implementation is wrong, there are stages that can invalidate
289-
// the orderings later. The proper way to manipulate the pipeline so that last Sort
290-
// always has effects.
286+
): { limit: number; convertedFromLimitToLast: boolean } | undefined {
287+
// TODO(pipeline): this implementation is wrong, there are stages that can change
288+
// the limit later (findNearest).
291289
for (let i = pipeline.stages.length - 1; i >= 0; i--) {
292290
const stage = pipeline.stages[i];
293291
if (stage instanceof Limit) {
294-
return stage.limit;
292+
return {
293+
limit: stage.limit,
294+
convertedFromLimitToLast: stage.convertedFromLimitTolast
295+
};
295296
}
296297
}
297298
return undefined;

packages/firestore/src/core/sync_engine_impl.ts

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import {
2525
localStoreExecuteQuery,
2626
localStoreGetActiveClients,
2727
localStoreGetCachedTarget,
28+
localStoreGetDocuments,
2829
localStoreGetHighestUnacknowledgedBatchId,
2930
localStoreGetNewDocumentChanges,
3031
localStoreHandleUserChange,
@@ -45,6 +46,7 @@ import { TargetData, TargetPurpose } from '../local/target_data';
4546
import {
4647
DocumentKeySet,
4748
documentKeySet,
49+
documentMap,
4850
DocumentMap,
4951
mutableDocumentMap
5052
} from '../model/collections';
@@ -120,6 +122,7 @@ import {
120122
canonifyQueryOrPipeline,
121123
getPipelineCollection,
122124
getPipelineCollectionId,
125+
getPipelineSourceType,
123126
isPipeline,
124127
QueryOrPipeline,
125128
queryOrPipelineEqual,
@@ -1096,7 +1099,6 @@ export async function syncEngineEmitNewSnapsAndNotifyLocalStore(
10961099
return;
10971100
}
10981101

1099-
// TODO(pipeline): will this work for pipelines?
11001102
syncEngineImpl.queryViewsByQuery.forEach((_, queryView) => {
11011103
debugAssert(
11021104
!!syncEngineImpl.applyDocChanges,
@@ -1548,13 +1550,35 @@ export async function syncEngineApplyTargetState(
15481550
switch (state) {
15491551
case 'current':
15501552
case 'not-current': {
1551-
const changes = await localStoreGetNewDocumentChanges(
1552-
syncEngineImpl.localStore,
1553-
// TODO(pipeline): handle database/documents pipeline
1554-
isPipeline(query[0])
1555-
? getPipelineCollectionId(query[0])!
1556-
: queryCollectionGroup(query[0])
1557-
);
1553+
let changes: DocumentMap;
1554+
if (isPipeline(query[0])) {
1555+
switch (getPipelineSourceType(query[0])) {
1556+
case 'collection_group':
1557+
case 'collection':
1558+
changes = await localStoreGetNewDocumentChanges(
1559+
syncEngineImpl.localStore,
1560+
getPipelineCollectionId(query[0])!
1561+
);
1562+
break;
1563+
case 'documents':
1564+
changes = await localStoreGetDocuments(
1565+
syncEngineImpl.localStore,
1566+
query[0]!
1567+
);
1568+
break;
1569+
case 'database':
1570+
case 'unknown':
1571+
logWarn('');
1572+
changes = documentMap();
1573+
break;
1574+
}
1575+
} else {
1576+
changes = await localStoreGetNewDocumentChanges(
1577+
syncEngineImpl.localStore,
1578+
queryCollectionGroup(query[0])
1579+
);
1580+
}
1581+
15581582
const synthesizedRemoteEvent =
15591583
RemoteEvent.createSynthesizedRemoteEventForCurrentChange(
15601584
targetId,

packages/firestore/src/core/view.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -252,19 +252,25 @@ export class View {
252252

253253
private getLimit(query: QueryOrPipeline): number | undefined {
254254
return isPipeline(query)
255-
? getLastEffectiveLimit(query)
255+
? getLastEffectiveLimit(query)?.limit
256256
: query.limit || undefined;
257257
}
258+
258259
private getLimitType(query: QueryOrPipeline): LimitType {
259-
return isPipeline(query) ? LimitType.First : query.limitType;
260+
return isPipeline(query)
261+
? getLastEffectiveLimit(query)?.convertedFromLimitToLast
262+
? LimitType.Last
263+
: LimitType.First
264+
: query.limitType;
265+
// return isPipeline(query) ? LimitType.First : query.limitType;
260266
}
261267

262268
private getLimitEdges(
263269
query: QueryOrPipeline,
264270
oldDocumentSet: DocumentSet
265271
): [Document | null, Document | null] {
266272
if (isPipeline(query)) {
267-
const limit = getLastEffectiveLimit(query);
273+
const limit = getLastEffectiveLimit(query)?.limit;
268274
return [
269275
oldDocumentSet.size === limit ? oldDocumentSet.last() : null,
270276
null

packages/firestore/src/lite-api/pipeline.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,22 @@ export class Pipeline<AppModelType = DocumentData>
405405
);
406406
}
407407

408+
_limit(
409+
limit: number,
410+
convertedFromLimitTolast: boolean
411+
): Pipeline<AppModelType> {
412+
const copy = this.stages.map(s => s);
413+
copy.push(new Limit(limit, convertedFromLimitTolast));
414+
return new Pipeline(
415+
this.liteDb,
416+
this.userDataReader,
417+
this.userDataWriter,
418+
this.documentReferenceFactory,
419+
copy,
420+
this.converter
421+
);
422+
}
423+
408424
/**
409425
* Returns a set of distinct {@link Expr} values from the inputs to this stage.
410426
*

packages/firestore/src/lite-api/stage.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,10 @@ export class FindNearest implements Stage {
288288
export class Limit implements Stage {
289289
name = 'limit';
290290

291-
constructor(readonly limit: number) {
291+
constructor(
292+
readonly limit: number,
293+
readonly convertedFromLimitTolast: boolean = false
294+
) {
292295
hardAssert(
293296
!isNaN(limit) && limit !== Infinity && limit !== -Infinity,
294297
'Invalid limit value'

packages/firestore/src/local/indexeddb_target_cache.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,6 @@ export class IndexedDbTargetCache implements TargetCache {
273273
const found = fromDbTarget(this.serializer, value);
274274
// After finding a potential match, check that the target is
275275
// actually equal to the requested target.
276-
// TODO(pipeline): This needs to handle pipeline properly.
277276
if (targetOrPipelineEqual(target, found.target)) {
278277
result = found;
279278
control.done();

packages/firestore/src/local/local_documents_view.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -568,8 +568,6 @@ export class LocalDocumentsView {
568568
context?: QueryContext
569569
): PersistencePromise<DocumentMap> {
570570
if (getPipelineSourceType(pipeline) === 'collection_group') {
571-
// TODO(pipeline): rewrite the pipeline as collection pipeline and recurse into this function
572-
// return this.getDocumentsMatchingPipeline(txn, pipeline, offset, context);
573571
const collectionId = getPipelineCollectionGroup(pipeline)!;
574572
let results = documentMap();
575573
return this.indexManager

packages/firestore/src/local/local_store_impl.ts

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -99,11 +99,13 @@ import { Pipeline } from '../lite-api/pipeline';
9999

100100
import {
101101
canonifyTargetOrPipeline,
102+
getPipelineDocuments,
102103
isPipeline,
103104
QueryOrPipeline,
104105
TargetOrPipeline,
105106
targetOrPipelineEqual
106107
} from '../core/pipeline-util';
108+
import { CorePipeline } from '../core/pipeline_run';
107109

108110
export const LOG_TAG = 'LocalStore';
109111

@@ -1041,12 +1043,6 @@ export async function localStoreReleaseTarget(
10411043
const localStoreImpl = debugCast(localStore, LocalStoreImpl);
10421044
const targetData = localStoreImpl.targetDataByTarget.get(targetId);
10431045

1044-
// TODO(pipeline): this is a hack that only works because pipelines are the only ones returning nulls here.
1045-
// REMOVE ASAP.
1046-
if (targetData === null) {
1047-
return;
1048-
}
1049-
10501046
debugAssert(
10511047
targetData !== null,
10521048
`Tried to release nonexistent target: ${targetId}`
@@ -1086,7 +1082,7 @@ export async function localStoreReleaseTarget(
10861082
localStoreImpl.targetDataByTarget =
10871083
localStoreImpl.targetDataByTarget.remove(targetId);
10881084
// TODO(pipeline): This needs to handle pipeline properly.
1089-
localStoreImpl.targetIdByTarget.delete(targetData!.target as Target);
1085+
localStoreImpl.targetIdByTarget.delete(targetData!.target);
10901086
}
10911087

10921088
/**
@@ -1264,6 +1260,24 @@ export function localStoreGetCachedTarget(
12641260
}
12651261
}
12661262

1263+
// PORTING NOTE: Multi-Tab only.
1264+
export function localStoreGetDocuments(
1265+
localStore: LocalStore,
1266+
pipeline: CorePipeline
1267+
): Promise<DocumentMap> {
1268+
const localStoreImpl = debugCast(localStore, LocalStoreImpl);
1269+
1270+
const keys = getPipelineDocuments(pipeline)!;
1271+
const keySet = documentKeySet(...keys.map(k => DocumentKey.fromPath(k)));
1272+
return localStoreImpl.persistence
1273+
.runTransaction('Get documents for pipeline', 'readonly', txn =>
1274+
localStoreImpl.remoteDocuments.getEntries(txn, keySet)
1275+
)
1276+
.then(changedDocs => {
1277+
return changedDocs;
1278+
});
1279+
}
1280+
12671281
/**
12681282
* Returns the set of documents that have been updated since the last call.
12691283
* If this is the first call, returns the set of changes since client

0 commit comments

Comments
 (0)