Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 27 additions & 99 deletions packages/loader/container-loader/src/serializedStateManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import type {
ITelemetryBaseLogger,
} from "@fluidframework/core-interfaces";
import type { IDisposable } from "@fluidframework/core-interfaces/internal";
import { Timer, assert } from "@fluidframework/core-utils/internal";
import { assert } from "@fluidframework/core-utils/internal";
import {
FetchSource,
type IDocumentStorageService,
Expand All @@ -35,6 +35,7 @@ import {
type ContainerStorageAdapter,
type ISerializableBlobContents,
} from "./containerStorageAdapter.js";
import { SnapshotRefresher } from "./snapshotRefresher.js";
import {
convertISnapshotToSnapshotWithBlobs,
convertSnapshotToSnapshotInfo,
Expand Down Expand Up @@ -127,7 +128,7 @@ export interface SerializedSnapshotInfo extends SnapshotWithBlobs {
snapshotSequenceNumber: number;
}

interface ISnapshotInfo {
export interface ISnapshotInfo {
snapshotSequenceNumber: number;
snapshotFetchedTime?: number | undefined;
snapshot: ISnapshot | ISnapshotTree;
Expand All @@ -144,27 +145,6 @@ interface ISerializerEvent extends IEvent {
(event: "saved", listener: (dirty: boolean) => void): void;
}

class RefreshPromiseTracker {
public get hasPromise(): boolean {
return this.#promise !== undefined;
}
public get Promise(): Promise<number> | undefined {
return this.#promise;
}
constructor(private readonly catchHandler: (error: Error) => void) {}

#promise: Promise<number> | undefined;
setPromise(p: Promise<number>): void {
if (this.hasPromise) {
throw new Error("Cannot set promise while promise exists");
}
this.#promise = p.finally(() => {
this.#promise = undefined;
});
p.catch(this.catchHandler);
}
}

/**
* Helper class to manage the state of the container needed for proper serialization.
*
Expand All @@ -177,20 +157,8 @@ export class SerializedStateManager implements IDisposable {
private readonly mc: MonitoringContext;
private snapshotInfo: ISnapshotInfo | undefined;
private latestSnapshot: ISnapshotInfo | undefined;
private readonly refreshTracker = new RefreshPromiseTracker(
// eslint-disable-next-line unicorn/consistent-function-scoping
(error) =>
this.mc.logger.sendErrorEvent(
{
eventName: "RefreshLatestSnapshotFailed",
},
error,
),
);
private lastSavedOpSequenceNumber: number = 0;
private readonly refreshTimer: Timer | undefined;
private readonly snapshotRefreshTimeoutMs: number = 60 * 60 * 24 * 1000;
readonly #snapshotRefreshEnabled: boolean;
private readonly snapshotRefresher: SnapshotRefresher | undefined;
#disposed: boolean = false;

/**
Expand All @@ -214,24 +182,25 @@ export class SerializedStateManager implements IDisposable {
namespace: "serializedStateManager",
});

this.snapshotRefreshTimeoutMs = snapshotRefreshTimeoutMs ?? this.snapshotRefreshTimeoutMs;

this.#snapshotRefreshEnabled =
this.offlineLoadEnabled &&
(this.mc.config.getBoolean("Fluid.Container.enableOfflineSnapshotRefresh") ??
this.mc.config.getBoolean("Fluid.Container.enableOfflineFull")) === true;

this.refreshTimer = this.#snapshotRefreshEnabled
? new Timer(this.snapshotRefreshTimeoutMs, () => this.tryRefreshSnapshot())
this.snapshotRefresher = this.offlineLoadEnabled
? new SnapshotRefresher(
subLogger,
this.storageAdapter,
this.offlineLoadEnabled,
this.supportGetSnapshotApi,
(snapshot: ISnapshotInfo) => this.handleSnapshotRefreshed(snapshot),
snapshotRefreshTimeoutMs,
)
: undefined;

containerEvent.on("saved", () => this.updateSnapshotAndProcessedOpsMaybe());
}
public get disposed(): boolean {
return this.#disposed;
}
dispose(): void {
this.#disposed = true;
this.refreshTimer?.clear();
this.snapshotRefresher?.dispose();
}

private verifyNotDisposed(): void {
Expand All @@ -245,8 +214,8 @@ export class SerializedStateManager implements IDisposable {
* only intended to be used for testing purposes.
* @returns The snapshot sequence number associated with the latest fetched snapshot
*/
public get refreshSnapshotP(): Promise<number | undefined> | undefined {
return this.refreshTracker.Promise;
public get refreshSnapshotP(): Promise<number> | undefined {
return this.snapshotRefresher?.refreshSnapshotP;
}

/**
Expand Down Expand Up @@ -288,7 +257,7 @@ export class SerializedStateManager implements IDisposable {
const baseSnapshotTree: ISnapshotTree | undefined = getSnapshotTree(snapshot);
const attributes = await getDocumentAttributes(this.storageAdapter, baseSnapshotTree);
if (this.offlineLoadEnabled) {
this.refreshTimer?.start();
this.snapshotRefresher?.startTimer();
this.snapshotInfo = {
snapshot,
snapshotSequenceNumber: attributes.sequenceNumber,
Expand Down Expand Up @@ -326,60 +295,19 @@ export class SerializedStateManager implements IDisposable {
snapshot,
snapshotSequenceNumber: attributes.sequenceNumber,
};
this.tryRefreshSnapshot();
this.snapshotRefresher?.tryRefreshSnapshot();
}
return { snapshot, version: undefined, attributes };
}
}

private tryRefreshSnapshot(): void {
if (
this.#snapshotRefreshEnabled &&
!this.#disposed &&
!this.refreshTracker.hasPromise &&
this.latestSnapshot === undefined
) {
// Don't block on the refresh snapshot call - it is for the next time we serialize, not booting this incarnation
this.refreshTracker.setPromise(this.refreshLatestSnapshot(this.supportGetSnapshotApi()));
}
}

/**
* Fetch the latest snapshot for the container, including delay-loaded groupIds if pendingLocalState was provided and contained any groupIds.
* Note that this will update the StorageAdapter's cached snapshots for the groupIds (if present)
*
* @param supportGetSnapshotApi - a boolean indicating whether to use the fetchISnapshot or fetchISnapshotTree (must be true to fetch by groupIds)
* Handles the snapshotRefreshed event from SnapshotRefresher.
* Decides whether to accept the new snapshot based on processed ops.
* @returns The snapshot sequence number if updated, -1 otherwise
*/
private async refreshLatestSnapshot(supportGetSnapshotApi: boolean): Promise<number> {
this.latestSnapshot = await getLatestSnapshotInfo(
this.mc,
this.storageAdapter,
supportGetSnapshotApi,
);

if (this.#disposed) {
return -1;
}

// These are loading groupIds that the containerRuntime has requested over its lifetime.
// We will fetch the latest snapshot for the groupIds, which will update storageAdapter.loadedGroupIdSnapshots's cache
const downloadedGroupIds = Object.keys(this.storageAdapter.loadedGroupIdSnapshots);
if (supportGetSnapshotApi && downloadedGroupIds.length > 0) {
assert(
this.storageAdapter.getSnapshot !== undefined,
0x972 /* getSnapshot should exist */,
);
// (This is a separate network call from above because it requires work for storage to add a special base groupId)
const snapshot = await this.storageAdapter.getSnapshot({
versionId: undefined,
scenarioName: "getLatestSnapshotInfo",
cacheSnapshot: false,
loadingGroupIds: downloadedGroupIds,
fetchSource: FetchSource.noCache,
});
assert(snapshot !== undefined, 0x973 /* Snapshot should exist */);
}

private handleSnapshotRefreshed(latestSnapshot: ISnapshotInfo): number {
this.latestSnapshot = latestSnapshot;
return this.updateSnapshotAndProcessedOpsMaybe();
}

Expand Down Expand Up @@ -415,14 +343,14 @@ export class SerializedStateManager implements IDisposable {
stashedSnapshotSequenceNumber: this.snapshotInfo?.snapshotSequenceNumber,
});
this.latestSnapshot = undefined;
this.refreshTimer?.restart();
this.snapshotRefresher?.clearLatestSnapshot();
} else if (snapshotSequenceNumber <= lastProcessedOpSequenceNumber) {
// Snapshot seq num is between the first and last processed op.
// Remove the ops that are already part of the snapshot
this.processedOps.splice(0, snapshotSequenceNumber - firstProcessedOpSequenceNumber + 1);
this.snapshotInfo = this.latestSnapshot;
this.latestSnapshot = undefined;
this.refreshTimer?.restart();
this.snapshotRefresher?.clearLatestSnapshot();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these don't seem equivalent, and its not clear why it is being changed. generally, we should avoid mixing refactoring with behavior changes, as it makes it hard to review and detect un-intended behavior changes

Copy link
Contributor Author

@dannimad dannimad Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clearLatestSnapshot is the equivalent of this.latestSnapshot = undefined, we're just clearing snapshotRefresher copy as well. I remove refreshTimer.restart() given that we will restart that timer right after onSnapshotRefreshed inside the refresher. I agree it is not clear to see but this is not a behavior change.

this.mc.logger.sendTelemetryEvent({
eventName: "SnapshotRefreshed",
snapshotSequenceNumber,
Expand All @@ -448,7 +376,7 @@ export class SerializedStateManager implements IDisposable {
snapshotSequenceNumber: snapshot.sequenceNumber ?? 0,
snapshotFetchedTime: Date.now(),
};
this.refreshTimer?.start();
this.snapshotRefresher?.startTimer();
}
}

Expand Down
Loading
Loading