Skip to content
1 change: 1 addition & 0 deletions packages/dds/tree/src/simple-tree/core/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export {
treeNodeFromAnchor,
getSimpleNodeSchemaFromInnerNode,
SimpleContextSlot,
pauseTreeEvents,
} from "./treeNodeKernel.js";
export { type WithType, typeNameSymbol, typeSchemaSymbol } from "./withType.js";
export {
Expand Down
299 changes: 278 additions & 21 deletions packages/dds/tree/src/simple-tree/core/treeNodeKernel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,27 @@
*/

import { createEmitter } from "@fluid-internal/client-utils";
import type { Listenable, Off } from "@fluidframework/core-interfaces";
import { assert, Lazy, fail, debugAssert } from "@fluidframework/core-utils/internal";
import type {
HasListeners,
IEmitter,
Listenable,
Off,
} from "@fluidframework/core-interfaces/internal";
import {
assert,
Lazy,
fail,
debugAssert,
unreachableCase,
} from "@fluidframework/core-utils/internal";
import { UsageError } from "@fluidframework/telemetry-utils/internal";

import {
anchorSlot,
type AnchorEvents,
type AnchorNode,
type AnchorSet,
type FieldKey,
type TreeValue,
type UpPath,
} from "../../core/index.js";
Expand Down Expand Up @@ -126,7 +138,7 @@ export class TreeNodeKernel {
* This means optimizations like skipping processing data in subtrees where no subtreeChanged events are subscribed to would be able to work,
* since the kernel does not unconditionally subscribe to those events (like a design which simply forwards all events would).
*/
readonly #unhydratedEvents = new Lazy(createEmitter<KernelEvents>);
readonly #events = new Lazy(() => new TreeNodeEventBuffer());

/**
* Create a TreeNodeKernel which can be looked up with {@link getKernel}.
Expand Down Expand Up @@ -156,7 +168,7 @@ export class TreeNodeKernel {
this.#hydrationState = {
innerNode,
off: innerNode.events.on("childrenChangedAfterBatch", ({ changedFields }) => {
this.#unhydratedEvents.value.emit("childrenChangedAfterBatch", {
this.#events.value.emit("childrenChangedAfterBatch", {
changedFields,
});

Expand All @@ -165,7 +177,7 @@ export class TreeNodeKernel {
const treeNode = unhydratedNode.treeNode;
if (treeNode !== undefined) {
const kernel = getKernel(treeNode);
kernel.#unhydratedEvents.value.emit("subtreeChangedAfterBatch");
kernel.#events.value.emit("subtreeChangedAfterBatch");
}
const parentNode: FlexTreeNode | undefined =
unhydratedNode.parentField.parent.parent;
Expand All @@ -181,6 +193,15 @@ export class TreeNodeKernel {
// Hydrated case
this.#hydrationState = this.createHydratedState(innerNode.anchorNode);
this.#hydrationState.innerNode = innerNode;

// Forward relevant anchorNode events to our event handler
for (const eventName of kernelEvents) {
this.#hydrationState.offAnchorNode.add(
innerNode.anchorNode.events.on(eventName, (arg) =>
this.#events.value.emit(eventName, arg),
),
);
}
}
}

Expand Down Expand Up @@ -213,18 +234,11 @@ export class TreeNodeKernel {
this.#hydrationState = this.createHydratedState(anchorNode);
this.#hydrationState.offAnchorNode.add(() => anchors.forget(anchor));

// If needed, register forwarding emitters for events from before hydration
if (this.#unhydratedEvents.evaluated) {
const events = this.#unhydratedEvents.value;
for (const eventName of kernelEvents) {
if (events.hasListeners(eventName)) {
this.#hydrationState.offAnchorNode.add(
// Argument is forwarded between matching events, so the type should be correct.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
anchorNode.events.on(eventName, (arg: any) => events.emit(eventName, arg)),
);
}
}
// Forward relevant anchorNode events to our event handler
for (const eventName of kernelEvents) {
this.#hydrationState.offAnchorNode.add(
anchorNode.events.on(eventName, (arg) => this.#events.value.emit(eventName, arg)),
);
}
}

Expand Down Expand Up @@ -267,10 +281,7 @@ export class TreeNodeKernel {
}

public get events(): Listenable<KernelEvents> {
// Retrieve the correct events object based on whether this node is pre or post hydration.
return isHydrated(this.#hydrationState)
? this.#hydrationState.anchorNode.events
: this.#unhydratedEvents.value;
return this.#events.value;
}

public dispose(): void {
Expand All @@ -281,6 +292,9 @@ export class TreeNodeKernel {
off();
}
}
if (this.#events.evaluated) {
this.#events.value.dispose();
}
// TODO: go to the context and remove myself from withAnchors
}

Expand Down Expand Up @@ -354,6 +368,249 @@ const kernelEvents = ["childrenChangedAfterBatch", "subtreeChangedAfterBatch"] a

type KernelEvents = Pick<AnchorEvents, (typeof kernelEvents)[number]>;

// #region Document this thoroughly

/**
* Tracks the number of times {@link pauseTreeEvents} has been called without
* being matched by a corresponding resume call.
* Events will be paused while this counter is \> 0.
*/
let pauseTreeEventsStack: number = 0;

/**
* Pause events emitted by {@link TreeNode}s.
*
* @remarks
* Events that would otherwise have been emitted are buffered until the returned function is called.
*
* Note: this should be used with caution. User application behaviors are implicitly coupled to event timing.
* Disrupting this timing can lead to unexpected behavior.
*
* It is also vitally important that the returned callback be invoked to ensure events are resumed.
* Failing to do so will result in events never being emitted again.
*
* @privateRemarks
* If we had access to `Symbol.dispose`, that would probably be a better pattern than returning a callback.
* Users could then use this API with `using` to ensure proper cleanup.
*
* @returns A function that, when called, resumes event emission and flushes any buffered events.
*/
export function pauseTreeEvents(): () => void {
pauseTreeEventsStack++;

return () => {
pauseTreeEventsStack--;
assert(pauseTreeEventsStack >= 0, "pauseEvents count should never be negative");
if (pauseTreeEventsStack === 0) {
flushEventsEmitter.emit("flush");
}
};
}

/**
* Event emitter to notify subscribers when tree events buffered due to {@link pauseTreeEvents} should be flushed.
*/
const flushEventsEmitter = createEmitter<{
flush: () => void;
}>();

/**
* Event emitter for {@link TreeNodeKernel}, which optionally buffers events based on {@link pauseTreeEvents}.
* @remarks Listens to {@link flushEventsEmitter} to know when to flush any buffered events.
*/
class TreeNodeEventBuffer
implements
Listenable<KernelEvents>,
Pick<IEmitter<KernelEvents>, "emit">,
HasListeners<KernelEvents>
{
private disposed: boolean = false;

/**
* Listen to {@link flushEventsEmitter} to know when to flush buffered events.
*/
private readonly disposeOnFlushListener = flushEventsEmitter.on("flush", () => {
this.flush();
});

/**
* {@link AnchorEvents.childrenChangedAfterBatch} listeners.
*/
private readonly childrenChangedListeners: Set<
(arg: {
changedFields: ReadonlySet<FieldKey>;
}) => void
> = new Set();

/**
* {@link AnchorEvents.subTreeChanged} listeners.
*/
private readonly subTreeChangedListeners: Set<() => void> = new Set();

/**
* Buffer of fields that have changed since events were paused.
* When events are flushed, a single {@link AnchorEvents.childrenChangedAfterBatch} event will be emitted
* containing the accumulated set of changed fields.
*/
private readonly childrenChangedBuffer: Set<FieldKey> = new Set();

/**
* Whether or not the subtree has changed since events were paused.
* When events are flushed, a single {@link AnchorEvents.subTreeChanged} event will be emitted if and only
* if the subtree has changed.
*/
private subTreeChangedBuffer: boolean = false;

public hasListeners(eventName: keyof KernelEvents): boolean {
switch (eventName) {
case "childrenChangedAfterBatch":
return this.childrenChangedListeners.size > 0;
case "subtreeChangedAfterBatch":
return this.subTreeChangedListeners.size > 0;
default: {
unreachableCase(eventName);
}
}
}

public emit(
eventName: keyof KernelEvents,
arg?: {
changedFields: ReadonlySet<FieldKey>;
},
): void {
this.assertNotDisposed();
switch (eventName) {
case "childrenChangedAfterBatch":
assert(arg !== undefined, "childrenChangedAfterBatch should have arg");
return this.handleChildrenChangedAfterBatch(arg.changedFields);
case "subtreeChangedAfterBatch":
return this.handleSubtreeChangedAfterBatch();
default:
unreachableCase(eventName);
}
}

private handleChildrenChangedAfterBatch(changedFields: ReadonlySet<FieldKey>): void {
if (pauseTreeEventsStack) {
for (const fieldKey of changedFields) {
this.childrenChangedBuffer.add(fieldKey);
}
} else {
for (const listener of this.childrenChangedListeners) {
listener({ changedFields });
}
}
}

private handleSubtreeChangedAfterBatch(): void {
if (pauseTreeEventsStack) {
this.subTreeChangedBuffer = true;
} else {
for (const listener of this.subTreeChangedListeners) {
listener();
}
}
}

public on(
eventName: "childrenChangedAfterBatch",
listener: (arg: {
changedFields: ReadonlySet<FieldKey>;
}) => void,
): () => void;
public on(eventName: "subtreeChangedAfterBatch", listener: () => void): () => void;
public on(
eventName: keyof KernelEvents,
listener: (arg: {
changedFields: ReadonlySet<FieldKey>;
}) => void | (() => void),
): () => void {
this.assertNotDisposed();
switch (eventName) {
case "childrenChangedAfterBatch":
this.childrenChangedListeners.add(
listener as (arg: {
changedFields: ReadonlySet<FieldKey>;
}) => void,
);
return () => this.off("childrenChangedAfterBatch", listener);
case "subtreeChangedAfterBatch":
this.subTreeChangedListeners.add(listener as () => void);
return () => this.off("subtreeChangedAfterBatch", listener as () => void);
default:
unreachableCase(eventName);
}
}

public off(
eventName: "childrenChangedAfterBatch",
listener: (arg: {
changedFields: ReadonlySet<FieldKey>;
}) => void,
): void;
public off(eventName: "subtreeChangedAfterBatch", listener: () => void): void;
public off(
eventName: keyof KernelEvents,
listener: (arg: {
changedFields: ReadonlySet<FieldKey>;
}) => void | (() => void),
): void {
this.assertNotDisposed();
switch (eventName) {
case "childrenChangedAfterBatch":
this.childrenChangedListeners.delete(
listener as (arg: {
changedFields: ReadonlySet<FieldKey>;
}) => void,
);
break;
case "subtreeChangedAfterBatch":
this.subTreeChangedListeners.delete(listener as () => void);
break;
default:
unreachableCase(eventName);
}
}

/**
* Flushes any events buffered due to {@link pauseTreeEvents}.
*/
public flush(): void {
this.assertNotDisposed();

if (this.childrenChangedBuffer.size > 0) {
for (const listener of this.childrenChangedListeners) {
listener({ changedFields: this.childrenChangedBuffer });
}
this.childrenChangedBuffer.clear();
}

if (this.subTreeChangedBuffer) {
for (const listener of this.subTreeChangedListeners) {
listener();
}
this.subTreeChangedBuffer = false;
}
}

private assertNotDisposed(): void {
assert(!this.disposed, "Event handler disposed.");
}

public dispose(): void {
if (this.disposed) {
return;
}

this.disposeOnFlushListener();

this.disposed = true;
}
}

// #endregion

/**
* For "cooked" nodes this is a HydratedFlexTreeNode thats a projection of forest content.
* For {@link Unhydrated} nodes this is a UnhydratedFlexTreeNode.
Expand Down
1 change: 1 addition & 0 deletions packages/dds/tree/src/simple-tree/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ export {
walkAllowedTypes,
type SchemaVisitor,
type SimpleNodeSchemaBase,
pauseTreeEvents,
} from "./core/index.js";
export { walkFieldSchema } from "./walkFieldSchema.js";
export type { UnsafeUnknownSchema, Insertable } from "./unsafeUnknownSchema.js";
Expand Down
Loading
Loading