Skip to content

Commit 93fdb23

Browse files
committed
Prepre for serializaion/deserialization
1 parent acbf24f commit 93fdb23

File tree

12 files changed

+116
-28
lines changed

12 files changed

+116
-28
lines changed

packages/firestore/src/core/target.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ import {
5252
orderByEquals,
5353
stringifyOrderBy
5454
} from './order_by';
55+
import { Pipeline } from '../pipelines/api/pipeline';
5556

5657
/**
5758
* A Target represents the WatchTarget representation of a Query, which is used
@@ -215,6 +216,12 @@ export function targetEquals(left: Target, right: Target): boolean {
215216
return boundEquals(left.endAt, right.endAt);
216217
}
217218

219+
export function targetIsPipelineTarget(
220+
target: Target | Pipeline
221+
): target is Pipeline {
222+
return target instanceof Pipeline;
223+
}
224+
218225
export function targetIsDocumentTarget(target: Target): boolean {
219226
return (
220227
DocumentKey.isDocumentKey(target.path) &&

packages/firestore/src/local/indexeddb_schema.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import {
2222
Document as ProtoDocument,
2323
DocumentsTarget as ProtoDocumentsTarget,
2424
QueryTarget as ProtoQueryTarget,
25+
PipelineQueryTarget as ProtoPipelineQueryTarget,
2526
Write as ProtoWrite
2627
} from '../protos/firestore_proto_api';
2728

@@ -253,7 +254,10 @@ export interface DbRemoteDocumentGlobal {
253254
* IndexedDb. We use the proto definitions for these two kinds of queries in
254255
* order to avoid writing extra serialization logic.
255256
*/
256-
export type DbQuery = ProtoQueryTarget | ProtoDocumentsTarget;
257+
export type DbQuery =
258+
| ProtoQueryTarget
259+
| ProtoDocumentsTarget
260+
| ProtoPipelineQueryTarget;
257261

258262
/**
259263
* An object to be stored in the 'targets' store in IndexedDb.

packages/firestore/src/local/indexeddb_target_cache.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,8 @@ export class IndexedDbTargetCache implements TargetCache {
268268
const found = fromDbTarget(value);
269269
// After finding a potential match, check that the target is
270270
// actually equal to the requested target.
271-
if (targetEquals(target, found.target)) {
271+
// TODO(pipeline): This needs to handle pipeline properly.
272+
if (targetEquals(target, found.target as Target)) {
272273
result = found;
273274
control.done();
274275
}

packages/firestore/src/local/local_serializer.ts

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,12 @@ import { Timestamp } from '../api/timestamp';
1919
import { BundleMetadata, NamedQuery } from '../core/bundle';
2020
import { LimitType, Query, queryWithLimit } from '../core/query';
2121
import { SnapshotVersion } from '../core/snapshot_version';
22-
import { canonifyTarget, Target, targetIsDocumentTarget } from '../core/target';
22+
import {
23+
canonifyTarget,
24+
Target,
25+
targetIsDocumentTarget,
26+
targetIsPipelineTarget
27+
} from '../core/target';
2328
import { MutableDocument } from '../model/document';
2429
import { DocumentKey } from '../model/document_key';
2530
import {
@@ -36,18 +41,23 @@ import {
3641
BundleMetadata as ProtoBundleMetadata,
3742
NamedQuery as ProtoNamedQuery
3843
} from '../protos/firestore_bundle_proto';
39-
import { DocumentsTarget as PublicDocumentsTarget } from '../protos/firestore_proto_api';
44+
import {
45+
DocumentsTarget as PublicDocumentsTarget,
46+
PipelineQueryTarget as PublicPipelineQueryTarget
47+
} from '../protos/firestore_proto_api';
4048
import {
4149
convertQueryTargetToQuery,
4250
fromDocument,
4351
fromDocumentsTarget,
4452
fromMutation,
53+
fromPipelineTarget,
4554
fromQueryTarget,
4655
fromVersion,
4756
JsonProtoSerializer,
4857
toDocument,
4958
toDocumentsTarget,
5059
toMutation,
60+
toPipelineTarget,
5161
toQueryTarget
5262
} from '../remote/serializer';
5363
import { debugAssert, fail } from '../util/assert';
@@ -71,6 +81,7 @@ import {
7181
} from './indexeddb_schema';
7282
import { DbDocumentOverlayKey, DbTimestampKey } from './indexeddb_sentinels';
7383
import { TargetData, TargetPurpose } from './target_data';
84+
import { Pipeline } from '../pipelines/api/pipeline';
7485

7586
/** Serializer for values stored in the LocalStore. */
7687
export class LocalSerializer {
@@ -241,8 +252,10 @@ export function fromDbTarget(dbTarget: DbTarget): TargetData {
241252
? fromDbTimestamp(dbTarget.lastLimboFreeSnapshotVersion)
242253
: SnapshotVersion.min();
243254

244-
let target: Target;
245-
if (isDocumentQuery(dbTarget.query)) {
255+
let target: Target | Pipeline;
256+
if (isPipelineQueryTarget(dbTarget.query)) {
257+
target = fromPipelineTarget(dbTarget.query);
258+
} else if (isDocumentQuery(dbTarget.query)) {
246259
target = fromDocumentsTarget(dbTarget.query);
247260
} else {
248261
target = fromQueryTarget(dbTarget.query);
@@ -275,7 +288,21 @@ export function toDbTarget(
275288
targetData.lastLimboFreeSnapshotVersion
276289
);
277290
let queryProto: DbQuery;
278-
if (targetIsDocumentTarget(targetData.target)) {
291+
if (targetIsPipelineTarget(targetData.target)) {
292+
queryProto = toPipelineTarget(
293+
localSerializer.remoteSerializer,
294+
targetData.target
295+
);
296+
return {
297+
targetId: targetData.targetId,
298+
canonicalId: '',
299+
readTime: dbTimestamp,
300+
resumeToken: '',
301+
lastListenSequenceNumber: targetData.sequenceNumber,
302+
lastLimboFreeSnapshotVersion: dbLastLimboFreeTimestamp,
303+
query: queryProto
304+
};
305+
} else if (targetIsDocumentTarget(targetData.target)) {
279306
queryProto = toDocumentsTarget(
280307
localSerializer.remoteSerializer,
281308
targetData.target
@@ -303,6 +330,12 @@ export function toDbTarget(
303330
};
304331
}
305332

333+
function isPipelineQueryTarget(
334+
dbQuery: DbQuery
335+
): dbQuery is PublicPipelineQueryTarget {
336+
return (dbQuery as PublicPipelineQueryTarget).pipeline !== undefined;
337+
}
338+
306339
/**
307340
* A helper function for figuring out what kind of query has been stored.
308341
*/

packages/firestore/src/local/local_store_impl.ts

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1063,7 +1063,8 @@ export async function localStoreReleaseTarget(
10631063

10641064
localStoreImpl.targetDataByTarget =
10651065
localStoreImpl.targetDataByTarget.remove(targetId);
1066-
localStoreImpl.targetIdByTarget.delete(targetData!.target);
1066+
// TODO(pipeline): This needs to handle pipeline properly.
1067+
localStoreImpl.targetIdByTarget.delete(targetData!.target as Target);
10671068
}
10681069

10691070
/**
@@ -1220,15 +1221,21 @@ export function localStoreGetCachedTarget(
12201221
);
12211222
const cachedTargetData = localStoreImpl.targetDataByTarget.get(targetId);
12221223
if (cachedTargetData) {
1223-
return Promise.resolve(cachedTargetData.target);
1224+
// TODO(pipeline): This needs to handle pipeline properly.
1225+
return Promise.resolve(cachedTargetData.target as Target);
12241226
} else {
12251227
return localStoreImpl.persistence.runTransaction(
12261228
'Get target data',
12271229
'readonly',
12281230
txn => {
1229-
return targetCacheImpl
1230-
.getTargetDataForTarget(txn, targetId)
1231-
.next(targetData => (targetData ? targetData.target : null));
1231+
return (
1232+
targetCacheImpl
1233+
.getTargetDataForTarget(txn, targetId)
1234+
// TODO(pipeline): This needs to handle pipeline properly.
1235+
.next(targetData =>
1236+
targetData ? (targetData.target as Target) : null
1237+
)
1238+
);
12321239
}
12331240
);
12341241
}

packages/firestore/src/local/memory_target_cache.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ export class MemoryTargetCache implements TargetCache {
101101
}
102102

103103
private saveTargetData(targetData: TargetData): void {
104-
this.targets.set(targetData.target, targetData);
104+
// TODO(pipeline): This needs to handle pipeline properly.
105+
this.targets.set(targetData.target as Target, targetData);
105106
const targetId = targetData.targetId;
106107
if (targetId > this.highestTargetId) {
107108
this.targetIdGenerator = new TargetIdGenerator(targetId);
@@ -117,7 +118,8 @@ export class MemoryTargetCache implements TargetCache {
117118
targetData: TargetData
118119
): PersistencePromise<void> {
119120
debugAssert(
120-
!this.targets.has(targetData.target),
121+
// TODO(pipeline): This needs to handle pipeline properly.
122+
!this.targets.has(targetData.target as Target),
121123
'Adding a target that already exists'
122124
);
123125
this.saveTargetData(targetData);
@@ -130,7 +132,8 @@ export class MemoryTargetCache implements TargetCache {
130132
targetData: TargetData
131133
): PersistencePromise<void> {
132134
debugAssert(
133-
this.targets.has(targetData.target),
135+
// TODO(pipeline): This needs to handle pipeline properly.
136+
this.targets.has(targetData.target as Target),
134137
'Updating a nonexistent target'
135138
);
136139
this.saveTargetData(targetData);
@@ -143,10 +146,11 @@ export class MemoryTargetCache implements TargetCache {
143146
): PersistencePromise<void> {
144147
debugAssert(this.targetCount > 0, 'Removing a target from an empty cache');
145148
debugAssert(
146-
this.targets.has(targetData.target),
149+
// TODO(pipeline): This needs to handle pipeline properly.
150+
this.targets.has(targetData.target as Target),
147151
'Removing a nonexistent target from the cache'
148152
);
149-
this.targets.delete(targetData.target);
153+
this.targets.delete(targetData.target as Target);
150154
this.references.removeReferencesForId(targetData.targetId);
151155
this.targetCount -= 1;
152156
return PersistencePromise.resolve();

packages/firestore/src/local/target_data.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import { SnapshotVersion } from '../core/snapshot_version';
1919
import { Target } from '../core/target';
2020
import { ListenSequenceNumber, TargetId } from '../core/types';
2121
import { ByteString } from '../util/byte_string';
22+
import { Pipeline } from '../pipelines/api/pipeline';
2223

2324
/** An enumeration of the different purposes we have for targets. */
2425
export const enum TargetPurpose {
@@ -47,7 +48,7 @@ export const enum TargetPurpose {
4748
export class TargetData {
4849
constructor(
4950
/** The target being listened to. */
50-
readonly target: Target,
51+
readonly target: Target | Pipeline,
5152
/**
5253
* The target ID to which the target corresponds; Assigned by the
5354
* LocalStore for user listens and by the SyncEngine for limbo watches.

packages/firestore/src/protos/firestore_proto_api.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,8 @@ export declare type Pipeline = firestoreV1ApiClientInterfaces.Pipeline;
559559
export declare type Precondition = firestoreV1ApiClientInterfaces.Precondition;
560560
export declare type Projection = firestoreV1ApiClientInterfaces.Projection;
561561
export declare type QueryTarget = firestoreV1ApiClientInterfaces.QueryTarget;
562+
export declare type PipelineQueryTarget =
563+
firestoreV1ApiClientInterfaces.PipelineQueryTarget;
562564
export declare type ReadOnly = firestoreV1ApiClientInterfaces.ReadOnly;
563565
export declare type ReadWrite = firestoreV1ApiClientInterfaces.ReadWrite;
564566
export declare type RollbackRequest =

packages/firestore/src/remote/serializer.ts

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,11 @@ import {
3535
queryToTarget
3636
} from '../core/query';
3737
import { SnapshotVersion } from '../core/snapshot_version';
38-
import { targetIsDocumentTarget, Target } from '../core/target';
38+
import {
39+
targetIsDocumentTarget,
40+
Target,
41+
targetIsPipelineTarget
42+
} from '../core/target';
3943
import { TargetId } from '../core/types';
4044
import { Bytes } from '../lite-api/bytes';
4145
import { GeoPoint } from '../lite-api/geo_point';
@@ -84,6 +88,7 @@ import {
8488
OrderDirection as ProtoOrderDirection,
8589
Precondition as ProtoPrecondition,
8690
QueryTarget as ProtoQueryTarget,
91+
PipelineQueryTarget as ProtoPipelineQueryTarget,
8792
RunAggregationQueryRequest as ProtoRunAggregationQueryRequest,
8893
Aggregation as ProtoAggregation,
8994
Status as ProtoStatus,
@@ -111,6 +116,7 @@ import {
111116
WatchTargetChange,
112117
WatchTargetChangeState
113118
} from './watch_change';
119+
import { Pipeline } from '../api';
114120

115121
const DIRECTIONS = (() => {
116122
const dirs: { [dir: string]: ProtoOrderDirection } = {};
@@ -1087,14 +1093,28 @@ export function toLabel(purpose: TargetPurpose): string | null {
10871093
}
10881094
}
10891095

1096+
export function fromPipelineTarget(target: ProtoPipelineQueryTarget): Pipeline {
1097+
return {} as Pipeline;
1098+
}
1099+
1100+
export function toPipelineTarget(
1101+
serializer: JsonProtoSerializer,
1102+
target: Pipeline
1103+
): ProtoPipelineQueryTarget {
1104+
return {
1105+
pipeline: {}
1106+
};
1107+
}
1108+
10901109
export function toTarget(
10911110
serializer: JsonProtoSerializer,
10921111
targetData: TargetData
10931112
): ProtoTarget {
10941113
let result: ProtoTarget;
10951114
const target = targetData.target;
1096-
1097-
if (targetIsDocumentTarget(target)) {
1115+
if (targetIsPipelineTarget(target)) {
1116+
result = { pipelineQuery: toPipelineTarget(serializer, target) };
1117+
} else if (targetIsDocumentTarget(target)) {
10981118
result = { documents: toDocumentsTarget(serializer, target) };
10991119
} else {
11001120
result = { query: toQueryTarget(serializer, target).queryTarget };

packages/firestore/src/remote/watch_change.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import { DatabaseId } from '../core/database_info';
1919
import { SnapshotVersion } from '../core/snapshot_version';
20-
import { targetIsDocumentTarget } from '../core/target';
20+
import { targetIsDocumentTarget, targetIsPipelineTarget } from '../core/target';
2121
import { TargetId } from '../core/types';
2222
import { ChangeType } from '../core/view_snapshot';
2323
import { TargetData, TargetPurpose } from '../local/target_data';
@@ -414,7 +414,9 @@ export class WatchChangeAggregator {
414414
const targetData = this.targetDataForActiveTarget(targetId);
415415
if (targetData) {
416416
const target = targetData.target;
417-
if (targetIsDocumentTarget(target)) {
417+
if (targetIsPipelineTarget(target)) {
418+
//TODO(pipeline): handle existence filter correctly for pipelines
419+
} else if (targetIsDocumentTarget(target)) {
418420
if (expectedCount === 0) {
419421
// The existence filter told us the document does not exist. We deduce
420422
// that this document does not exist and apply a deleted document to
@@ -584,7 +586,11 @@ export class WatchChangeAggregator {
584586
this.targetStates.forEach((targetState, targetId) => {
585587
const targetData = this.targetDataForActiveTarget(targetId);
586588
if (targetData) {
587-
if (targetState.current && targetIsDocumentTarget(targetData.target)) {
589+
if (
590+
targetState.current &&
591+
!targetIsPipelineTarget(targetData.target) &&
592+
targetIsDocumentTarget(targetData.target)
593+
) {
588594
// Document queries for document that don't exist can produce an empty
589595
// result set. To update our local cache, we synthesize a document
590596
// delete if we have not previously received the document. This

0 commit comments

Comments
 (0)