Skip to content

Commit d5440a1

Browse files
committed
[5/4] Listen Options and Server Timestamp
1 parent 6d1b9e9 commit d5440a1

File tree

10 files changed

+544
-32
lines changed

10 files changed

+544
-32
lines changed

packages/firestore/src/api/pipeline_impl.ts

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,11 @@ import { ensureFirestoreConfigured, Firestore } from './database';
5151
import { Pipeline } from './pipeline'; // Keep this specific Pipeline import if needed alongside LitePipeline
5252
import { RealtimePipeline } from './realtime_pipeline';
5353
import { DocumentReference } from './reference';
54-
import { SnapshotListenOptions, Unsubscribe } from './reference_impl';
54+
import {
55+
PipelineListenOptions,
56+
SnapshotListenOptions,
57+
Unsubscribe
58+
} from './reference_impl';
5559
import { RealtimePipelineSnapshot } from './snapshot';
5660
import { ExpUserDataWriter } from './user_data_writer';
5761

@@ -215,7 +219,7 @@ export function _onRealtimePipelineSnapshot(
215219
*/
216220
export function _onRealtimePipelineSnapshot(
217221
pipeline: RealtimePipeline,
218-
options: SnapshotListenOptions,
222+
options: PipelineListenOptions,
219223
observer: {
220224
next?: (snapshot: RealtimePipelineSnapshot) => void;
221225
error?: (error: FirestoreError) => void;
@@ -238,7 +242,7 @@ export function _onRealtimePipelineSnapshot(
238242
*/
239243
export function _onRealtimePipelineSnapshot(
240244
pipeline: RealtimePipeline,
241-
options: SnapshotListenOptions,
245+
options: PipelineListenOptions,
242246
onNext: (snapshot: RealtimePipelineSnapshot) => void,
243247
onError?: (error: FirestoreError) => void,
244248
onComplete?: () => void
@@ -247,9 +251,10 @@ export function _onRealtimePipelineSnapshot(
247251
pipeline: RealtimePipeline,
248252
...args: unknown[]
249253
): Unsubscribe {
250-
let options: SnapshotListenOptions = {
254+
let options: PipelineListenOptions = {
251255
includeMetadataChanges: false,
252-
source: 'default'
256+
source: 'default',
257+
serverTimestampBehavior: 'none'
253258
};
254259
let currArg = 0;
255260
if (typeof args[currArg] === 'object' && !isPartialObserver(args[currArg])) {
@@ -259,7 +264,8 @@ export function _onRealtimePipelineSnapshot(
259264

260265
const internalOptions = {
261266
includeMetadataChanges: options.includeMetadataChanges,
262-
source: options.source as ListenerDataSource
267+
source: options.source as ListenerDataSource,
268+
serverTimestampBehavior: options.serverTimestampBehavior
263269
};
264270

265271
let userObserver: PartialObserver<RealtimePipelineSnapshot>;
@@ -277,7 +283,9 @@ export function _onRealtimePipelineSnapshot(
277283
const observer = {
278284
next: (snapshot: ViewSnapshot) => {
279285
if (userObserver.next) {
280-
userObserver.next(new RealtimePipelineSnapshot(pipeline, snapshot));
286+
userObserver.next(
287+
new RealtimePipelineSnapshot(pipeline, snapshot, internalOptions)
288+
);
281289
}
282290
},
283291
error: userObserver.error,
@@ -286,7 +294,7 @@ export function _onRealtimePipelineSnapshot(
286294

287295
return firestoreClientListen(
288296
client,
289-
toCorePipeline(pipeline),
297+
toCorePipeline(pipeline, internalOptions),
290298
internalOptions, // Pass parsed options here
291299
observer
292300
);

packages/firestore/src/api/reference_impl.ts

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1070,6 +1070,21 @@ export function onSnapshotResume<
10701070
}
10711071
}
10721072

1073+
export interface PipelineListenOptions {
1074+
/**
1075+
* Include a change even if only the metadata of the query or of a document
1076+
* changed. Default is false.
1077+
*/
1078+
readonly includeMetadataChanges?: boolean;
1079+
1080+
/**
1081+
* Set the source the query listens to. Default to "default", which
1082+
* listens to both cache and server.
1083+
*/
1084+
readonly source?: ListenSource;
1085+
readonly serverTimestampBehavior?: 'estimate' | 'previous' | 'none';
1086+
}
1087+
10731088
export function onPipelineSnapshot(
10741089
query: RealtimePipeline,
10751090
observer: {
@@ -1080,7 +1095,7 @@ export function onPipelineSnapshot(
10801095
): Unsubscribe;
10811096
export function onPipelineSnapshot(
10821097
query: RealtimePipeline,
1083-
options: SnapshotListenOptions,
1098+
options: PipelineListenOptions,
10841099
observer: {
10851100
next?: (snapshot: RealtimePipelineSnapshot) => void;
10861101
error?: (error: FirestoreError) => void;
@@ -1095,7 +1110,7 @@ export function onPipelineSnapshot(
10951110
): Unsubscribe;
10961111
export function onPipelineSnapshot(
10971112
query: RealtimePipeline,
1098-
options: SnapshotListenOptions,
1113+
options: PipelineListenOptions,
10991114
onNext: (snapshot: RealtimePipelineSnapshot) => void,
11001115
onError?: (error: FirestoreError) => void,
11011116
onCompletion?: () => void
@@ -1106,9 +1121,10 @@ export function onPipelineSnapshot(
11061121
): Unsubscribe {
11071122
reference = getModularInstance(reference);
11081123

1109-
let options: SnapshotListenOptions = {
1124+
let options: PipelineListenOptions = {
11101125
includeMetadataChanges: false,
1111-
source: 'default'
1126+
source: 'default',
1127+
serverTimestampBehavior: 'none'
11121128
};
11131129
let currArg = 0;
11141130
if (typeof args[currArg] === 'object' && !isPartialObserver(args[currArg])) {
@@ -1118,7 +1134,8 @@ export function onPipelineSnapshot(
11181134

11191135
const internalOptions = {
11201136
includeMetadataChanges: options.includeMetadataChanges,
1121-
source: options.source as ListenerDataSource
1137+
source: options.source as ListenerDataSource,
1138+
serverTimestampBehavior: options.serverTimestampBehavior
11221139
};
11231140

11241141
if (isPartialObserver(args[currArg])) {
@@ -1136,12 +1153,16 @@ export function onPipelineSnapshot(
11361153

11371154
// RealtimePipeline
11381155
firestore = cast(reference._db, Firestore);
1139-
internalQuery = toCorePipeline(reference);
1156+
internalQuery = toCorePipeline(reference, internalOptions);
11401157
observer = {
11411158
next: snapshot => {
11421159
if (args[currArg]) {
11431160
(args[currArg] as NextFn<RealtimePipelineSnapshot>)(
1144-
new RealtimePipelineSnapshot(reference as RealtimePipeline, snapshot)
1161+
new RealtimePipelineSnapshot(
1162+
reference as RealtimePipeline,
1163+
snapshot,
1164+
internalOptions
1165+
)
11451166
);
11461167
}
11471168
},

packages/firestore/src/api/snapshot.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import { BundleLoader } from '../core/bundle_impl';
1919
import { createBundleReaderSync } from '../core/firestore_client';
20+
import { ListenOptions } from '../core/event_manager';
2021
import { CorePipeline } from '../core/pipeline';
2122
import { isPipeline } from '../core/pipeline-util';
2223
import { newPipelineComparator } from '../core/pipeline_run';
@@ -1175,7 +1176,8 @@ export function resultChangesFromSnapshot(
11751176
new SnapshotMetadata(
11761177
querySnapshot._snapshot.mutatedKeys.has(change.doc.key),
11771178
querySnapshot._snapshot.fromCache
1178-
)
1179+
),
1180+
querySnapshot._listenOptions
11791181
);
11801182
lastDoc = change.doc;
11811183
return {
@@ -1205,7 +1207,8 @@ export function resultChangesFromSnapshot(
12051207
new SnapshotMetadata(
12061208
querySnapshot._snapshot.mutatedKeys.has(change.doc.key),
12071209
querySnapshot._snapshot.fromCache
1208-
)
1210+
),
1211+
querySnapshot._listenOptions
12091212
);
12101213
let oldIndex = -1;
12111214
let newIndex = -1;
@@ -1245,7 +1248,11 @@ export class RealtimePipelineSnapshot {
12451248
private _cachedChangesIncludeMetadataChanges?: boolean;
12461249

12471250
/** @hideconstructor */
1248-
constructor(pipeline: RealtimePipeline, readonly _snapshot: ViewSnapshot) {
1251+
constructor(
1252+
pipeline: RealtimePipeline,
1253+
readonly _snapshot: ViewSnapshot,
1254+
readonly _listenOptions: ListenOptions
1255+
) {
12491256
this.metadata = new SnapshotMetadata(
12501257
_snapshot.hasPendingWrites,
12511258
_snapshot.fromCache
@@ -1257,7 +1264,7 @@ export class RealtimePipelineSnapshot {
12571264
get results(): PipelineResult[] {
12581265
const result: PipelineResult[] = [];
12591266
this._snapshot.docs.forEach(doc =>
1260-
result.push(toPipelineResult(doc, this.pipeline))
1267+
result.push(toPipelineResult(doc, this.pipeline, this._listenOptions))
12611268
);
12621269
return result;
12631270
}

packages/firestore/src/core/event_manager.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,9 @@ export interface ListenOptions {
408408

409409
/** Set the source events raised from. */
410410
readonly source?: ListenerDataSource;
411+
412+
/** Realtime Pipeline Only: Set server timestamp behavior. */
413+
readonly serverTimestampBehavior?: 'estimate' | 'previous' | 'none';
411414
}
412415

413416
/**

packages/firestore/src/core/expressions.ts

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,12 @@ import { objectSize } from '../util/obj';
5858
import { isNegativeZero } from '../util/types';
5959

6060
import { EvaluationContext, PipelineInputOutput } from './pipeline_run';
61+
import {
62+
getLocalWriteTime,
63+
getPreviousValue,
64+
isServerTimestamp
65+
} from '../model/server_timestamps';
66+
import { SnapshotVersion } from './snapshot_version';
6167

6268
export type EvaluateResultType =
6369
| 'ERROR'
@@ -311,8 +317,35 @@ export class CoreField implements EvaluableExpr {
311317
});
312318
}
313319
// Return 'UNSET' if the field doesn't exist, otherwise the Value.
314-
const result = input.data.field(this.expr.fieldPath);
320+
const result = input.data.field(this.expr._fieldPath);
321+
322+
function getServerTimestampValue(
323+
context: EvaluationContext,
324+
value: Value
325+
): Value {
326+
if (context.serverTimestampBehavior === 'estimate') {
327+
return {
328+
timestampValue: toVersion(
329+
context.serializer,
330+
SnapshotVersion.fromTimestamp(getLocalWriteTime(value))
331+
)
332+
};
333+
} else if (context.serverTimestampBehavior === 'previous') {
334+
const previousValue = getPreviousValue(value);
335+
if (previousValue) {
336+
return previousValue;
337+
}
338+
}
339+
return { nullValue: 'NULL_VALUE' };
340+
}
341+
315342
if (!!result) {
343+
if (isServerTimestamp(result)) {
344+
return EvaluateResult.newValue(
345+
getServerTimestampValue(context, result)
346+
);
347+
}
348+
316349
return EvaluateResult.newValue(result);
317350
} else {
318351
return EvaluateResult.newUnset();

packages/firestore/src/core/pipeline-util.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ import {
9393
targetIsPipelineTarget
9494
} from './target';
9595
import { VectorValue } from '../api';
96+
import { ListenOptions } from './event_manager';
9697

9798
/* eslint @typescript-eslint/no-explicit-any: 0 */
9899

@@ -573,7 +574,12 @@ function addSystemFields(
573574
}
574575

575576
export function toCorePipeline(
576-
p: ApiPipeline | RealtimePipeline
577+
p: ApiPipeline | RealtimePipeline,
578+
listenOptions?: ListenOptions
577579
): CorePipeline {
578-
return new CorePipeline(p.userDataReader.serializer, rewriteStages(p.stages));
580+
return new CorePipeline(
581+
p.userDataReader.serializer,
582+
rewriteStages(p.stages),
583+
listenOptions
584+
);
579585
}

packages/firestore/src/core/pipeline.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,14 @@ import { JsonProtoSerializer } from '../remote/serializer';
2828
import { debugAssert } from '../util/assert';
2929

3030
import { PipelineFlavor, PipelineSourceType } from './pipeline-util';
31+
import { ListenOptions } from './event_manager';
3132

3233
export class CorePipeline {
3334
isCorePipeline = true;
3435
constructor(
3536
readonly serializer: JsonProtoSerializer,
36-
readonly stages: Stage[]
37+
readonly stages: Stage[],
38+
readonly listenOptions?: ListenOptions
3739
) {}
3840
getPipelineCollection(): string | undefined {
3941
return getPipelineCollection(this);

packages/firestore/src/core/pipeline_run.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ export type PipelineInputOutput = MutableDocument;
4545

4646
export interface EvaluationContext {
4747
serializer: JsonProtoSerializer;
48+
serverTimestampBehavior?: 'estimate' | 'previous' | 'none';
4849
}
4950

5051
export function runPipeline(
@@ -53,7 +54,14 @@ export function runPipeline(
5354
): PipelineInputOutput[] {
5455
let current = input;
5556
for (const stage of pipeline.stages) {
56-
current = evaluate({ serializer: pipeline.serializer }, stage, current);
57+
current = evaluate(
58+
{
59+
serializer: pipeline.serializer,
60+
serverTimestampBehavior: pipeline.listenOptions?.serverTimestampBehavior
61+
},
62+
stage,
63+
current
64+
);
5765
}
5866

5967
return current;

0 commit comments

Comments
 (0)