Skip to content

Commit e7b8314

Browse files
committed
[5/4] Listen Options and Server Timestamp
1 parent 0eb1e66 commit e7b8314

File tree

10 files changed

+543
-31
lines changed

10 files changed

+543
-31
lines changed

packages/firestore/src/api/pipeline_impl.ts

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,11 @@ import { ensureFirestoreConfigured, Firestore } from './database';
4242
import { Pipeline } from './pipeline'; // Keep this specific Pipeline import if needed alongside LitePipeline
4343
import { RealtimePipeline } from './realtime_pipeline';
4444
import { DocumentReference } from './reference';
45-
import { SnapshotListenOptions, Unsubscribe } from './reference_impl';
45+
import {
46+
PipelineListenOptions,
47+
SnapshotListenOptions,
48+
Unsubscribe
49+
} from './reference_impl';
4650
import { RealtimePipelineSnapshot } from './snapshot';
4751
import { ExpUserDataWriter } from './user_data_writer';
4852

@@ -165,7 +169,7 @@ export function _onRealtimePipelineSnapshot(
165169
*/
166170
export function _onRealtimePipelineSnapshot(
167171
pipeline: RealtimePipeline,
168-
options: SnapshotListenOptions,
172+
options: PipelineListenOptions,
169173
observer: {
170174
next?: (snapshot: RealtimePipelineSnapshot) => void;
171175
error?: (error: FirestoreError) => void;
@@ -188,7 +192,7 @@ export function _onRealtimePipelineSnapshot(
188192
*/
189193
export function _onRealtimePipelineSnapshot(
190194
pipeline: RealtimePipeline,
191-
options: SnapshotListenOptions,
195+
options: PipelineListenOptions,
192196
onNext: (snapshot: RealtimePipelineSnapshot) => void,
193197
onError?: (error: FirestoreError) => void,
194198
onComplete?: () => void
@@ -197,9 +201,10 @@ export function _onRealtimePipelineSnapshot(
197201
pipeline: RealtimePipeline,
198202
...args: unknown[]
199203
): Unsubscribe {
200-
let options: SnapshotListenOptions = {
204+
let options: PipelineListenOptions = {
201205
includeMetadataChanges: false,
202-
source: 'default'
206+
source: 'default',
207+
serverTimestampBehavior: 'none'
203208
};
204209
let currArg = 0;
205210
if (typeof args[currArg] === 'object' && !isPartialObserver(args[currArg])) {
@@ -209,7 +214,8 @@ export function _onRealtimePipelineSnapshot(
209214

210215
const internalOptions = {
211216
includeMetadataChanges: options.includeMetadataChanges,
212-
source: options.source as ListenerDataSource
217+
source: options.source as ListenerDataSource,
218+
serverTimestampBehavior: options.serverTimestampBehavior
213219
};
214220

215221
let userObserver: PartialObserver<RealtimePipelineSnapshot>;
@@ -227,7 +233,9 @@ export function _onRealtimePipelineSnapshot(
227233
const observer = {
228234
next: (snapshot: ViewSnapshot) => {
229235
if (userObserver.next) {
230-
userObserver.next(new RealtimePipelineSnapshot(pipeline, snapshot));
236+
userObserver.next(
237+
new RealtimePipelineSnapshot(pipeline, snapshot, internalOptions)
238+
);
231239
}
232240
},
233241
error: userObserver.error,
@@ -236,7 +244,7 @@ export function _onRealtimePipelineSnapshot(
236244

237245
return firestoreClientListen(
238246
client,
239-
toCorePipeline(pipeline),
247+
toCorePipeline(pipeline, internalOptions),
240248
internalOptions, // Pass parsed options here
241249
observer
242250
);

packages/firestore/src/api/reference_impl.ts

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -757,6 +757,21 @@ export function onSnapshot<AppModelType, DbModelType extends DocumentData>(
757757
);
758758
}
759759

760+
export interface PipelineListenOptions {
761+
/**
762+
* Include a change even if only the metadata of the query or of a document
763+
* changed. Default is false.
764+
*/
765+
readonly includeMetadataChanges?: boolean;
766+
767+
/**
768+
* Set the source the query listens to. Default to "default", which
769+
* listens to both cache and server.
770+
*/
771+
readonly source?: ListenSource;
772+
readonly serverTimestampBehavior?: 'estimate' | 'previous' | 'none';
773+
}
774+
760775
export function onPipelineSnapshot(
761776
query: RealtimePipeline,
762777
observer: {
@@ -767,7 +782,7 @@ export function onPipelineSnapshot(
767782
): Unsubscribe;
768783
export function onPipelineSnapshot(
769784
query: RealtimePipeline,
770-
options: SnapshotListenOptions,
785+
options: PipelineListenOptions,
771786
observer: {
772787
next?: (snapshot: RealtimePipelineSnapshot) => void;
773788
error?: (error: FirestoreError) => void;
@@ -782,7 +797,7 @@ export function onPipelineSnapshot(
782797
): Unsubscribe;
783798
export function onPipelineSnapshot(
784799
query: RealtimePipeline,
785-
options: SnapshotListenOptions,
800+
options: PipelineListenOptions,
786801
onNext: (snapshot: RealtimePipelineSnapshot) => void,
787802
onError?: (error: FirestoreError) => void,
788803
onCompletion?: () => void
@@ -793,9 +808,10 @@ export function onPipelineSnapshot(
793808
): Unsubscribe {
794809
reference = getModularInstance(reference);
795810

796-
let options: SnapshotListenOptions = {
811+
let options: PipelineListenOptions = {
797812
includeMetadataChanges: false,
798-
source: 'default'
813+
source: 'default',
814+
serverTimestampBehavior: 'none'
799815
};
800816
let currArg = 0;
801817
if (typeof args[currArg] === 'object' && !isPartialObserver(args[currArg])) {
@@ -805,7 +821,8 @@ export function onPipelineSnapshot(
805821

806822
const internalOptions = {
807823
includeMetadataChanges: options.includeMetadataChanges,
808-
source: options.source as ListenerDataSource
824+
source: options.source as ListenerDataSource,
825+
serverTimestampBehavior: options.serverTimestampBehavior
809826
};
810827

811828
if (isPartialObserver(args[currArg])) {
@@ -823,12 +840,16 @@ export function onPipelineSnapshot(
823840

824841
// RealtimePipeline
825842
firestore = cast(reference._db, Firestore);
826-
internalQuery = toCorePipeline(reference);
843+
internalQuery = toCorePipeline(reference, internalOptions);
827844
observer = {
828845
next: snapshot => {
829846
if (args[currArg]) {
830847
(args[currArg] as NextFn<RealtimePipelineSnapshot>)(
831-
new RealtimePipelineSnapshot(reference as RealtimePipeline, snapshot)
848+
new RealtimePipelineSnapshot(
849+
reference as RealtimePipeline,
850+
snapshot,
851+
internalOptions
852+
)
832853
);
833854
}
834855
},

packages/firestore/src/api/snapshot.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18+
import { ListenOptions } from '../core/event_manager';
1819
import { CorePipeline } from '../core/pipeline';
1920
import { isPipeline } from '../core/pipeline-util';
2021
import { newPipelineComparator } from '../core/pipeline_run';
@@ -847,7 +848,8 @@ export function resultChangesFromSnapshot(
847848
new SnapshotMetadata(
848849
querySnapshot._snapshot.mutatedKeys.has(change.doc.key),
849850
querySnapshot._snapshot.fromCache
850-
)
851+
),
852+
querySnapshot._listenOptions
851853
);
852854
lastDoc = change.doc;
853855
return {
@@ -877,7 +879,8 @@ export function resultChangesFromSnapshot(
877879
new SnapshotMetadata(
878880
querySnapshot._snapshot.mutatedKeys.has(change.doc.key),
879881
querySnapshot._snapshot.fromCache
880-
)
882+
),
883+
querySnapshot._listenOptions
881884
);
882885
let oldIndex = -1;
883886
let newIndex = -1;
@@ -917,7 +920,11 @@ export class RealtimePipelineSnapshot {
917920
private _cachedChangesIncludeMetadataChanges?: boolean;
918921

919922
/** @hideconstructor */
920-
constructor(pipeline: RealtimePipeline, readonly _snapshot: ViewSnapshot) {
923+
constructor(
924+
pipeline: RealtimePipeline,
925+
readonly _snapshot: ViewSnapshot,
926+
readonly _listenOptions: ListenOptions
927+
) {
921928
this.metadata = new SnapshotMetadata(
922929
_snapshot.hasPendingWrites,
923930
_snapshot.fromCache
@@ -929,7 +936,7 @@ export class RealtimePipelineSnapshot {
929936
get results(): PipelineResult[] {
930937
const result: PipelineResult[] = [];
931938
this._snapshot.docs.forEach(doc =>
932-
result.push(toPipelineResult(doc, this.pipeline))
939+
result.push(toPipelineResult(doc, this.pipeline, this._listenOptions))
933940
);
934941
return result;
935942
}

packages/firestore/src/core/event_manager.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,9 @@ export interface ListenOptions {
408408

409409
/** Set the source events raised from. */
410410
readonly source?: ListenerDataSource;
411+
412+
/** Realtime Pipeline Only: Set server timestamp behavior. */
413+
readonly serverTimestampBehavior?: 'estimate' | 'previous' | 'none';
411414
}
412415

413416
/**

packages/firestore/src/core/expressions.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ import { objectSize } from '../util/obj';
5757
import { isNegativeZero } from '../util/types';
5858

5959
import { EvaluationContext, PipelineInputOutput } from './pipeline_run';
60+
import {
61+
getLocalWriteTime,
62+
getPreviousValue,
63+
isServerTimestamp
64+
} from '../model/server_timestamps';
65+
import { SnapshotVersion } from './snapshot_version';
6066

6167
export type EvaluateResultType =
6268
| 'ERROR'
@@ -309,7 +315,34 @@ export class CoreField implements EvaluableExpr {
309315
}
310316
// Return 'UNSET' if the field doesn't exist, otherwise the Value.
311317
const result = input.data.field(this.expr._fieldPath);
318+
319+
function getServerTimestampValue(
320+
context: EvaluationContext,
321+
value: Value
322+
): Value {
323+
if (context.serverTimestampBehavior === 'estimate') {
324+
return {
325+
timestampValue: toVersion(
326+
context.serializer,
327+
SnapshotVersion.fromTimestamp(getLocalWriteTime(value))
328+
)
329+
};
330+
} else if (context.serverTimestampBehavior === 'previous') {
331+
const previousValue = getPreviousValue(value);
332+
if (previousValue) {
333+
return previousValue;
334+
}
335+
}
336+
return { nullValue: 'NULL_VALUE' };
337+
}
338+
312339
if (!!result) {
340+
if (isServerTimestamp(result)) {
341+
return EvaluateResult.newValue(
342+
getServerTimestampValue(context, result)
343+
);
344+
}
345+
313346
return EvaluateResult.newValue(result);
314347
} else {
315348
return EvaluateResult.newUnset();

packages/firestore/src/core/pipeline-util.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ import {
9393
targetIsPipelineTarget
9494
} from './target';
9595
import { VectorValue } from '../api';
96+
import { ListenOptions } from './event_manager';
9697

9798
/* eslint @typescript-eslint/no-explicit-any: 0 */
9899

@@ -596,7 +597,12 @@ function addSystemFields(fields: Map<string, Expr>): Map<string, Expr> {
596597
}
597598

598599
export function toCorePipeline(
599-
p: ApiPipeline | RealtimePipeline
600+
p: ApiPipeline | RealtimePipeline,
601+
listenOptions?: ListenOptions
600602
): CorePipeline {
601-
return new CorePipeline(p.userDataReader.serializer, rewriteStages(p.stages));
603+
return new CorePipeline(
604+
p.userDataReader.serializer,
605+
rewriteStages(p.stages),
606+
listenOptions
607+
);
602608
}

packages/firestore/src/core/pipeline.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,14 @@ import { JsonProtoSerializer } from '../remote/serializer';
2828
import { debugAssert } from '../util/assert';
2929

3030
import { PipelineFlavor, PipelineSourceType } from './pipeline-util';
31+
import { ListenOptions } from './event_manager';
3132

3233
export class CorePipeline {
3334
isCorePipeline = true;
3435
constructor(
3536
readonly serializer: JsonProtoSerializer,
36-
readonly stages: Stage[]
37+
readonly stages: Stage[],
38+
readonly listenOptions?: ListenOptions
3739
) {}
3840
getPipelineCollection(): string | undefined {
3941
return getPipelineCollection(this);

packages/firestore/src/core/pipeline_run.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ export type PipelineInputOutput = MutableDocument;
4545

4646
export interface EvaluationContext {
4747
serializer: JsonProtoSerializer;
48+
serverTimestampBehavior?: 'estimate' | 'previous' | 'none';
4849
}
4950

5051
export function runPipeline(
@@ -53,7 +54,14 @@ export function runPipeline(
5354
): PipelineInputOutput[] {
5455
let current = input;
5556
for (const stage of pipeline.stages) {
56-
current = evaluate({ serializer: pipeline.serializer }, stage, current);
57+
current = evaluate(
58+
{
59+
serializer: pipeline.serializer,
60+
serverTimestampBehavior: pipeline.listenOptions?.serverTimestampBehavior
61+
},
62+
stage,
63+
current
64+
);
5765
}
5866

5967
return current;

0 commit comments

Comments
 (0)