Skip to content

Commit 0b3e2d1

Browse files
committed
Handling local mutations, acknowledgement and rejections
1 parent 8667fa5 commit 0b3e2d1

File tree

8 files changed

+376
-58
lines changed

8 files changed

+376
-58
lines changed

packages/firestore/src/core/sync_engine_impl.ts

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -400,8 +400,17 @@ export function toLocalChangesToApplyToView(
400400
results: DocumentMap
401401
): LocalChangesToApplyToView {
402402
return isPipeline(query) && getPipelineFlavor(query) !== 'exact'
403-
? { changedDocs: documentMap(), augmentedResults: results }
404-
: { changedDocs: results, augmentedResults: undefined };
403+
? {
404+
changedDocs: documentMap(),
405+
augmentedResults: { results, mutatedKeys: documentKeySet() }
406+
}
407+
: {
408+
changedDocs: results,
409+
augmentedResults: {
410+
results: documentMap(),
411+
mutatedKeys: documentKeySet()
412+
}
413+
};
405414
}
406415

407416
/**
@@ -566,10 +575,10 @@ export async function syncEngineWrite(
566575
);
567576
syncEngineImpl.sharedClientState.addPendingMutation(result.batchId);
568577
addMutationCallback(syncEngineImpl, result.batchId, userCallback);
569-
await syncEngineEmitNewSnapsAndNotifyLocalStore(syncEngineImpl, {
570-
changedDocs: result.changes,
571-
newAugmentedResults: new Map()
572-
});
578+
await syncEngineEmitNewSnapsAndNotifyLocalStore(
579+
syncEngineImpl,
580+
result.changes
581+
);
573582
await fillWritePipeline(syncEngineImpl.remoteStore);
574583
} catch (e) {
575584
// If we can't persist the mutation, we reject the user callback and
@@ -798,10 +807,7 @@ export async function syncEngineApplySuccessfulWrite(
798807
batchId,
799808
'acknowledged'
800809
);
801-
await syncEngineEmitNewSnapsAndNotifyLocalStore(syncEngineImpl, {
802-
changedDocs: changes,
803-
newAugmentedResults: new Map()
804-
});
810+
await syncEngineEmitNewSnapsAndNotifyLocalStore(syncEngineImpl, changes);
805811
} catch (error) {
806812
await ignoreIfPrimaryLeaseLoss(error as FirestoreError);
807813
}
@@ -832,10 +838,7 @@ export async function syncEngineRejectFailedWrite(
832838
'rejected',
833839
error
834840
);
835-
await syncEngineEmitNewSnapsAndNotifyLocalStore(syncEngineImpl, {
836-
changedDocs: changes,
837-
newAugmentedResults: new Map()
838-
});
841+
await syncEngineEmitNewSnapsAndNotifyLocalStore(syncEngineImpl, changes);
839842
} catch (error) {
840843
await ignoreIfPrimaryLeaseLoss(error as FirestoreError);
841844
}

packages/firestore/src/core/view.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import {
5050
newPipelineComparator,
5151
queryOrPipelineMatches
5252
} from './pipeline_run';
53+
import { MergedPipelineResults } from '../local/local_documents_view';
5354

5455
export type LimboDocumentChange = AddedLimboDocument | RemovedLimboDocument;
5556
export class AddedLimboDocument {
@@ -82,7 +83,7 @@ export interface ViewChange {
8283

8384
export interface LocalChangesToApplyToView {
8485
changedDocs: DocumentMap;
85-
augmentedResults: DocumentMap | undefined;
86+
augmentedResults: MergedPipelineResults | undefined;
8687
}
8788

8889
/**
@@ -152,14 +153,14 @@ export class View {
152153
}
153154

154155
computeAugmentedResultChanges(
155-
augmentedResults: DocumentMap | undefined
156+
augmentedResults: MergedPipelineResults | undefined
156157
): ViewDocumentChanges {
157158
const changeSet = new DocumentChangeSet();
158159
const oldDocumentSet = this.documentSet;
159-
let newMutatedKeys = documentKeySet();
160+
let newMutatedKeys = augmentedResults?.mutatedKeys ?? documentKeySet();
160161
let newDocumentSet = new DocumentSet(this.docComparator);
161162
let needsRefill = false;
162-
augmentedResults?.inorderTraversal((key, entry) => {
163+
augmentedResults?.results.inorderTraversal((key, entry) => {
163164
const oldDoc = oldDocumentSet.get(key);
164165
const newDoc = entry;
165166
newDocumentSet = newDocumentSet.add(newDoc);

packages/firestore/src/local/index_backfiller.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import { Persistence, Scheduler } from './persistence';
3131
import { PersistencePromise } from './persistence_promise';
3232
import { PersistenceTransaction } from './persistence_transaction';
3333
import { isIndexedDbTransactionError } from './simple_db';
34+
import { NextDocuments } from './local_documents_view';
3435

3536
const LOG_TAG = 'IndexBackfiller';
3637

@@ -203,7 +204,7 @@ export class IndexBackfiller {
203204
/** Returns the next offset based on the provided documents. */
204205
private getNewOffset(
205206
existingOffset: IndexOffset,
206-
lookupResult: LocalWriteResult
207+
lookupResult: NextDocuments
207208
): IndexOffset {
208209
let maxOffset: IndexOffset = existingOffset;
209210
lookupResult.changes.forEach((key, document) => {

packages/firestore/src/local/local_documents_view.ts

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ import { SortedMap } from '../util/sorted_map';
5656

5757
import { DocumentOverlayCache } from './document_overlay_cache';
5858
import { IndexManager } from './index_manager';
59-
import { LocalWriteResult } from './local_store_impl';
6059
import { MutationQueue } from './mutation_queue';
6160
import { OverlayedDocument } from './overlayed_document';
6261
import { PersistencePromise } from './persistence_promise';
@@ -73,22 +72,30 @@ import {
7372
isPipeline,
7473
QueryOrPipeline
7574
} from '../core/pipeline-util';
76-
import { Pipeline } from '../lite-api/pipeline';
7775
import { FirestoreError } from '../util/error';
7876
import {
7977
CorePipeline,
8078
pipelineEvaluate,
8179
runPipeline
8280
} from '../core/pipeline_run';
83-
import { SortedSet } from '../util/sorted_set';
8481
import {
8582
PipelineCachedResults,
8683
PipelineResultsCache
8784
} from './pipeline_results_cache';
88-
import { TargetId } from '../core/types';
85+
import { BatchId, TargetId } from '../core/types';
8986
import { TargetData } from './target_data';
9087
import { targetIsPipelineTarget } from '../core/target';
9188

89+
export interface NextDocuments {
90+
batchId: BatchId;
91+
changes: DocumentMap;
92+
}
93+
94+
export interface MergedPipelineResults {
95+
results: DocumentMap;
96+
mutatedKeys: DocumentKeySet;
97+
}
98+
9299
/**
93100
* A readonly view of the local state of all documents we're tracking (i.e. we
94101
* have a cached version in remoteDocumentCache or local mutations for the
@@ -433,14 +440,14 @@ export class LocalDocumentsView {
433440
* @param collectionGroup The collection group for the documents.
434441
* @param offset The offset to index into.
435442
* @param count The number of documents to return
436-
* @return A LocalWriteResult with the documents that follow the provided offset and the last processed batch id.
443+
* @return A NextDocuments with the documents that follow the provided offset and the last processed batch id.
437444
*/
438445
getNextDocuments(
439446
transaction: PersistenceTransaction,
440447
collectionGroup: string,
441448
offset: IndexOffset,
442449
count: number
443-
): PersistencePromise<LocalWriteResult> {
450+
): PersistencePromise<NextDocuments> {
444451
return this.remoteDocumentCache
445452
.getAllFromCollectionGroup(transaction, collectionGroup, offset, count)
446453
.next((originalDocs: MutableDocumentMap) => {
@@ -788,9 +795,12 @@ export class LocalDocumentsView {
788795

789796
calculateMergedAugmentPipelineResults(
790797
targetMap: SortedMap<TargetId, TargetData>,
791-
currentAugmentPipelineResults: Map<TargetId, PipelineCachedResults>,
792-
changedDocs: SortedMap<DocumentKey, Document>
793-
): Map<TargetId, MutableDocumentMap> {
798+
currentAugmentPipelineResults: Map<
799+
TargetId,
800+
PipelineCachedResults | undefined
801+
>,
802+
changedDocs: DocumentMap
803+
): Map<TargetId, MergedPipelineResults> {
794804
// We only care about documents with pending writes because changedDocs are results
795805
// of two scenarios:
796806
// 1. Global snapshot, which means currentAugmentPipelineResults should include all
@@ -803,7 +813,7 @@ export class LocalDocumentsView {
803813
}
804814
});
805815

806-
const mergedResults = new Map<TargetId, MutableDocumentMap>();
816+
const mergedResults = new Map<TargetId, MergedPipelineResults>();
807817
currentAugmentPipelineResults.forEach((results, targetId) => {
808818
const pipeline = targetMap.get(targetId)?.target;
809819
debugAssert(!!pipeline, `Target Id ${targetId} not found`);
@@ -812,13 +822,21 @@ export class LocalDocumentsView {
812822
`Target Id ${targetId} is not a pipeline target`
813823
);
814824

815-
const merged = results.results;
825+
let merged = results?.results ?? mutableDocumentMap();
826+
let mutatedKeys = documentKeySet();
816827
// TODO(pipeline): We need to handle limit pipelines here!!
817828
const pipelineResult = runPipeline(pipeline, docsWithMutations);
818829
for (const result of pipelineResult) {
819-
merged.insert(result.key, result);
830+
mutatedKeys = mutatedKeys.add(result.key);
831+
merged = merged.insert(result.key, result);
832+
}
833+
for (const doc of docsWithMutations) {
834+
if (!mutatedKeys.has(doc.key) && !!merged.get(doc.key)) {
835+
merged = merged.remove(doc.key);
836+
mutatedKeys = mutatedKeys.add(doc.key);
837+
}
820838
}
821-
mergedResults.set(targetId, merged);
839+
mergedResults.set(targetId, { results: merged, mutatedKeys });
822840
});
823841

824842
return mergedResults;

0 commit comments

Comments
 (0)