diff --git a/.changeset/olive-crews-love.md b/.changeset/olive-crews-love.md new file mode 100644 index 00000000..00494cc5 --- /dev/null +++ b/.changeset/olive-crews-love.md @@ -0,0 +1,5 @@ +--- +"@tanstack/db": patch +--- + +Add a scheduler that ensures that if a transaction touches multiple collections that feed into a single live query, the liver query only emits a single batch of updates. This fixes an issue where multiple renders could be triggered from a live query under this situation. diff --git a/packages/db/src/query/live-query-collection.ts b/packages/db/src/query/live-query-collection.ts index ec631cd8..30de83aa 100644 --- a/packages/db/src/query/live-query-collection.ts +++ b/packages/db/src/query/live-query-collection.ts @@ -1,5 +1,10 @@ import { createCollection } from "../collection/index.js" import { CollectionConfigBuilder } from "./live/collection-config-builder.js" +import { + getBuilderFromConfig, + registerCollectionBuilder, +} from "./live/collection-registry.js" +import type { RunCountUtils } from "./live/collection-config-builder.js" import type { LiveQueryCollectionConfig } from "./live/types.js" import type { InitialQueryBuilder, QueryBuilder } from "./builder/index.js" import type { Collection } from "../collection/index.js" @@ -35,7 +40,7 @@ export function liveQueryCollectionOptions< TResult extends object = GetResult, >( config: LiveQueryCollectionConfig -): CollectionConfig { +): CollectionConfig & { utils: RunCountUtils } { const collectionConfigBuilder = new CollectionConfigBuilder< TContext, TResult @@ -83,7 +88,7 @@ export function createLiveQueryCollection< TResult extends object = GetResult, >( query: (q: InitialQueryBuilder) => QueryBuilder -): Collection +): Collection // Overload 2: Accept full config object with optional utilities export function createLiveQueryCollection< @@ -92,7 +97,7 @@ export function createLiveQueryCollection< TUtils extends UtilsRecord = {}, >( config: LiveQueryCollectionConfig & { utils?: TUtils } -): Collection +): Collection // Implementation export function createLiveQueryCollection< @@ -103,7 +108,7 @@ export function createLiveQueryCollection< configOrQuery: | (LiveQueryCollectionConfig & { utils?: TUtils }) | ((q: InitialQueryBuilder) => QueryBuilder) -): Collection { +): Collection { // Determine if the argument is a function (query) or a config object if (typeof configOrQuery === `function`) { // Simple query function case @@ -113,7 +118,11 @@ export function createLiveQueryCollection< ) => QueryBuilder, } const options = liveQueryCollectionOptions(config) - return bridgeToCreateCollection(options) + return bridgeToCreateCollection(options) as Collection< + TResult, + string | number, + RunCountUtils & TUtils + > } else { // Config object case const config = configOrQuery as LiveQueryCollectionConfig< @@ -121,10 +130,18 @@ export function createLiveQueryCollection< TResult > & { utils?: TUtils } const options = liveQueryCollectionOptions(config) - return bridgeToCreateCollection({ - ...options, - utils: config.utils, - }) + + const collection = bridgeToCreateCollection(options) + + if (config.utils) { + Object.assign(collection.utils, config.utils) + } + + return collection as Collection< + TResult, + string | number, + RunCountUtils & TUtils + > } } @@ -132,16 +149,19 @@ export function createLiveQueryCollection< * Bridge function that handles the type compatibility between query2's TResult * and core collection's output type without exposing ugly type assertions to users */ -function bridgeToCreateCollection< - TResult extends object, - TUtils extends UtilsRecord = {}, ->( - options: CollectionConfig & { utils?: TUtils } -): Collection { - // This is the only place we need a type assertion, hidden from user API - return createCollection(options as any) as unknown as Collection< +function bridgeToCreateCollection( + options: CollectionConfig & { utils: RunCountUtils } +): Collection { + const collection = createCollection(options as any) as unknown as Collection< TResult, string | number, - TUtils + RunCountUtils > + + const builder = getBuilderFromConfig(options) + if (builder) { + registerCollectionBuilder(collection, builder) + } + + return collection } diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index a81d1df2..86d37b7a 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -1,7 +1,14 @@ import { D2, output } from "@tanstack/db-ivm" import { compileQuery } from "../compiler/index.js" import { buildQuery, getQueryIR } from "../builder/index.js" +import { transactionScopedScheduler } from "../../scheduler.js" +import { getActiveTransaction } from "../../transactions.js" import { CollectionSubscriber } from "./collection-subscriber.js" +import { + attachBuilderToConfig, + getCollectionBuilder, +} from "./collection-registry.js" +import type { SchedulerContextId } from "../../scheduler.js" import type { CollectionSubscription } from "../../collection/subscription.js" import type { RootStreamBuilder } from "@tanstack/db-ivm" import type { OrderByOptimizationInfo } from "../compiler/order-by.js" @@ -11,6 +18,7 @@ import type { KeyedStream, ResultStream, SyncConfig, + UtilsRecord, } from "../../types.js" import type { Context, GetResult } from "../builder/types.js" import type { BasicExpression, QueryIR } from "../ir.js" @@ -22,6 +30,16 @@ import type { SyncState, } from "./types.js" +export type RunCountUtils = UtilsRecord & { + getRunCount: () => number +} + +type PendingGraphRun = { + config: Parameters[`sync`]>[0] + syncState: FullSyncState + loadCallbacks: Set<() => boolean> +} + // Global counter for auto-generated collection IDs let liveQueryCollectionCounter = 0 @@ -46,6 +64,7 @@ export class CollectionConfigBuilder< private readonly compare?: (val1: TResult, val2: TResult) => number private isGraphRunning = false + private runCount = 0 private graphCache: D2 | undefined private inputsCache: Record> | undefined @@ -54,6 +73,15 @@ export class CollectionConfigBuilder< | Map> | undefined + private readonly aliasDependencies: Record< + string, + Array> + > = Object.create(null) + + private readonly builderDependencies = new Set< + CollectionConfigBuilder + >() + // Map of source alias to subscription readonly subscriptions: Record = {} // Map of source aliases to functions that load keys for that lazy source @@ -92,8 +120,8 @@ export class CollectionConfigBuilder< this.compileBasePipeline() } - getConfig(): CollectionConfig { - return { + getConfig(): CollectionConfig & { utils: RunCountUtils } { + const config = { id: this.id, getKey: this.config.getKey || @@ -106,7 +134,12 @@ export class CollectionConfigBuilder< onUpdate: this.config.onUpdate, onDelete: this.config.onDelete, startSync: this.config.startSync, - } + utils: { + getRunCount: this.getRunCount.bind(this), + }, + } as CollectionConfig & { utils: RunCountUtils } + attachBuilderToConfig(config, this) + return config } getCollectionIdForAlias(alias: string): string { @@ -176,6 +209,134 @@ export class CollectionConfigBuilder< } } + /** + * Schedules a graph run with the transaction-scoped scheduler. + * Ensures each builder runs at most once per transaction, with automatic dependency tracking + * to run parent queries before child queries. Outside a transaction, runs immediately. + * + * Multiple calls during a transaction are coalesced into a single execution. + * Dependencies are auto-discovered from subscribed live queries, or can be overridden. + * Load callbacks are combined when entries merge. + * + * @param config - Collection sync configuration with begin/commit/markReady callbacks + * @param syncState - The full sync state containing the D2 graph, inputs, and pipeline + * @param callback - Optional callback to load more data if needed (returns true when done) + * @param options - Optional scheduling configuration + * @param options.contextId - Transaction ID to group work; defaults to active transaction + * @param options.jobId - Unique identifier for this job; defaults to this builder instance + * @param options.alias - Source alias that triggered this schedule; adds alias-specific dependencies + * @param options.dependencies - Explicit dependency list; overrides auto-discovered dependencies + */ + scheduleGraphRun( + config: Parameters[`sync`]>[0], + syncState: FullSyncState, + callback?: () => boolean, + options?: { + contextId?: SchedulerContextId + jobId?: unknown + alias?: string + dependencies?: Array> + } + ) { + const contextId = options?.contextId ?? getActiveTransaction()?.id + // Use the builder instance as the job ID for deduplication. This is memory-safe + // because the scheduler's context Map is deleted after flushing (no long-term retention). + const jobId = options?.jobId ?? this + const dependencyBuilders = (() => { + if (options?.dependencies) { + return options.dependencies + } + + const deps = new Set(this.builderDependencies) + if (options?.alias) { + const aliasDeps = this.aliasDependencies[options.alias] + if (aliasDeps) { + for (const dep of aliasDeps) { + deps.add(dep) + } + } + } + + deps.delete(this) + + return Array.from(deps) + })() + // We intentionally scope deduplication to the builder instance. Each instance + // owns caches and compiled pipelines, so sharing an entry across instances that + // merely reuse the same string id would bind the wrong `this` in the run closure. + + const createEntry = () => { + const state: PendingGraphRun = { + config, + syncState, + loadCallbacks: new Set(), + } + + if (callback) { + state.loadCallbacks.add(callback) + } + + return { + state, + run: () => { + this.incrementRunCount() + const combinedLoader = + state.loadCallbacks.size > 0 + ? () => { + let allDone = true + let firstError: unknown + + for (const loader of state.loadCallbacks) { + try { + const result = loader() + if (result === false) { + allDone = false + } + } catch (error) { + allDone = false + if (firstError === undefined) { + firstError = error + } + } + } + + if (firstError !== undefined) { + throw firstError + } + + // Returning false signals that callers should schedule another pass. + return allDone + } + : undefined + + try { + this.maybeRunGraph(state.config, state.syncState, combinedLoader) + } finally { + // Clear callbacks after run to avoid carrying stale closures across transactions + state.loadCallbacks.clear() + } + }, + } + } + + const updateEntry = (entry: { state: PendingGraphRun }) => { + entry.state.config = config + entry.state.syncState = syncState + + if (callback) { + entry.state.loadCallbacks.add(callback) + } + } + + transactionScopedScheduler.schedule({ + contextId, + jobId, + dependencies: dependencyBuilders, + createEntry, + updateEntry, + }) + } + private getSyncConfig(): SyncConfig { return { rowUpdateMode: `full`, @@ -183,6 +344,14 @@ export class CollectionConfigBuilder< } } + incrementRunCount() { + this.runCount++ + } + + getRunCount() { + return this.runCount + } + private syncFn(config: Parameters[`sync`]>[0]) { const syncState: SyncState = { messagesCount: 0, @@ -202,7 +371,7 @@ export class CollectionConfigBuilder< ) // Initial run with callback to load more data if needed - this.maybeRunGraph(config, fullSyncState, loadMoreDataCallbacks) + this.scheduleGraphRun(config, fullSyncState, loadMoreDataCallbacks) // Return the unsubscribe function return () => { @@ -412,6 +581,14 @@ export class CollectionConfigBuilder< const collection = this.collectionByAlias[alias] ?? this.collections[collectionId]! + const dependencyBuilder = getCollectionBuilder(collection) + if (dependencyBuilder && dependencyBuilder !== this) { + this.aliasDependencies[alias] = [dependencyBuilder] + this.builderDependencies.add(dependencyBuilder) + } else { + this.aliasDependencies[alias] = [] + } + const collectionSubscriber = new CollectionSubscriber( alias, collectionId, diff --git a/packages/db/src/query/live/collection-registry.ts b/packages/db/src/query/live/collection-registry.ts new file mode 100644 index 00000000..98546535 --- /dev/null +++ b/packages/db/src/query/live/collection-registry.ts @@ -0,0 +1,68 @@ +import type { Collection } from "../../collection/index.js" +import type { CollectionConfigBuilder } from "./collection-config-builder.js" + +const BUILDER_SYMBOL = Symbol.for(`@tanstack/db.collection-config-builder`) + +const collectionBuilderRegistry = new WeakMap< + Collection, + CollectionConfigBuilder +>() + +/** + * Attaches a builder to a config object via a non-enumerable symbol property. + * Used for dependency tracking between live queries. + * + * @param config - The collection config object to attach the builder to + * @param builder - The builder instance to attach + */ +export function attachBuilderToConfig( + config: object, + builder: CollectionConfigBuilder +): void { + Object.defineProperty(config, BUILDER_SYMBOL, { + value: builder, + configurable: false, + enumerable: false, + writable: false, + }) +} + +/** + * Retrieves the builder attached to a config object. + * + * @param config - The collection config object + * @returns The attached builder, or `undefined` if none exists + */ +export function getBuilderFromConfig( + config: object +): CollectionConfigBuilder | undefined { + return (config as any)[BUILDER_SYMBOL] +} + +/** + * Registers a builder for a collection in the global registry. + * Used to detect when a live query depends on another live query, + * enabling the scheduler to ensure parent queries run first. + * + * @param collection - The collection to register the builder for + * @param builder - The builder that produces this collection + */ +export function registerCollectionBuilder( + collection: Collection, + builder: CollectionConfigBuilder +): void { + collectionBuilderRegistry.set(collection, builder) +} + +/** + * Retrieves the builder registered for a collection. + * Used to discover dependencies when a live query subscribes to another live query. + * + * @param collection - The collection to look up + * @returns The registered builder, or `undefined` if none exists + */ +export function getCollectionBuilder( + collection: Collection +): CollectionConfigBuilder | undefined { + return collectionBuilderRegistry.get(collection) +} diff --git a/packages/db/src/query/live/collection-subscriber.ts b/packages/db/src/query/live/collection-subscriber.ts index d196ecd7..b4989878 100644 --- a/packages/db/src/query/live/collection-subscriber.ts +++ b/packages/db/src/query/live/collection-subscriber.ts @@ -11,6 +11,8 @@ import type { OrderByOptimizationInfo } from "../compiler/order-by.js" import type { CollectionConfigBuilder } from "./collection-config-builder.js" import type { CollectionSubscription } from "../../collection/subscription.js" +const loadMoreCallbackSymbol = Symbol(`tanstack.db.loadMore`) + export class CollectionSubscriber< TContext extends Context, TResult extends object = GetResult, @@ -80,18 +82,13 @@ export class CollectionSubscriber< this.collection.config.getKey ) - // Do not provide the callback that loads more data - // if there's no more data to load - // otherwise we end up in an infinite loop trying to load more data const dataLoader = sentChanges > 0 ? callback : undefined - // We need to call `maybeRunGraph` even if there's no data to load - // because we need to mark the collection as ready if it's not already - // and that's only done in `maybeRunGraph` - this.collectionConfigBuilder.maybeRunGraph( + this.collectionConfigBuilder.scheduleGraphRun( this.config, this.syncState, - dataLoader + dataLoader, + { alias: this.alias } ) } @@ -201,10 +198,22 @@ export class CollectionSubscriber< } const trackedChanges = this.trackSentValues(changes, orderByInfo.comparator) - this.sendChangesToPipeline( - trackedChanges, - this.loadMoreIfNeeded.bind(this, subscription) - ) + + // Cache the bound loader on the subscription using a symbol property. + // This ensures we pass the same function instance to the scheduler each time, + // allowing it to deduplicate callbacks when multiple changes arrive during a transaction. + type SubscriptionWithLoader = CollectionSubscription & { + [loadMoreCallbackSymbol]?: () => boolean + } + + const subscriptionWithLoader = subscription as SubscriptionWithLoader + + const boundLoader = + subscriptionWithLoader[loadMoreCallbackSymbol] ?? + (subscriptionWithLoader[loadMoreCallbackSymbol] = + this.loadMoreIfNeeded.bind(this, subscription)) + + this.sendChangesToPipeline(trackedChanges, boundLoader) } // Loads the next `n` items from the collection diff --git a/packages/db/src/scheduler.ts b/packages/db/src/scheduler.ts new file mode 100644 index 00000000..bb3f3e68 --- /dev/null +++ b/packages/db/src/scheduler.ts @@ -0,0 +1,204 @@ +/** + * Identifier used to scope scheduled work. Maps to a transaction id for live queries. + */ +export type SchedulerContextId = string | symbol + +/** + * Internal representation of a job queued by the scheduler. + * Mutable `state` accumulates information before `run` executes the work. + */ +interface SchedulerEntry { + state: TState + run: () => void +} + +/** + * Options for {@link Scheduler.schedule}. Jobs are identified by `jobId` within a context + * and may declare dependencies. `createEntry` is called once; `updateEntry` merges state. + */ +interface ScheduleOptions { + contextId?: SchedulerContextId + jobId: unknown + dependencies?: Iterable + createEntry: () => SchedulerEntry + updateEntry?: (entry: SchedulerEntry) => void +} + +/** + * State per context. Queue preserves order, entries hold jobs, dependencies track + * prerequisites, and completed records which jobs have run during the current flush. + */ +interface SchedulerContextState { + queue: Array + entries: Map> + dependencies: Map> + completed: Set +} + +/** + * Scoped scheduler that coalesces work by context and job. + * + * - **context** (e.g. transaction id) defines the batching boundary; work is queued until flushed. + * - **job id** deduplicates work within a context; subsequent schedules update the entry. + * - Without a context id, work executes immediately. + * + * Each entry has mutable state so callers can merge data before `run()` executes. + */ +export class Scheduler { + private contexts = new Map() + + /** + * Get or create the state bucket for a context. + */ + private getOrCreateContext( + contextId: SchedulerContextId + ): SchedulerContextState { + let context = this.contexts.get(contextId) + if (!context) { + context = { + queue: [], + entries: new Map(), + dependencies: new Map(), + completed: new Set(), + } + this.contexts.set(contextId, context) + } + return context + } + + /** + * Schedule work. Without a context id, executes immediately. + * Otherwise queues the job to be flushed once dependencies are satisfied. + */ + schedule({ + contextId, + jobId, + dependencies, + createEntry, + updateEntry, + }: ScheduleOptions): void { + if (typeof contextId === `undefined`) { + const entry = createEntry() + updateEntry?.(entry) + entry.run() + return + } + + const context = this.getOrCreateContext(contextId) + + let entry = context.entries.get(jobId) as SchedulerEntry | undefined + if (!entry) { + entry = createEntry() + context.entries.set(jobId, entry) + context.queue.push(jobId) + } + + updateEntry?.(entry) + + if (dependencies) { + const depSet = new Set() + for (const dep of dependencies) { + if (dep !== jobId) { + depSet.add(dep) + } + } + context.dependencies.set(jobId, depSet) + } else if (!context.dependencies.has(jobId)) { + context.dependencies.set(jobId, new Set()) + } + + context.completed.delete(jobId) + } + + /** + * Flush all queued work for a context. Jobs with unmet dependencies are retried. + * Throws if a pass completes without running any job (dependency cycle). + */ + flush(contextId: SchedulerContextId): void { + const context = this.contexts.get(contextId) + if (!context) return + + const { queue, entries, dependencies, completed } = context + + while (queue.length > 0) { + let ranThisPass = false + const jobsThisPass = queue.length + + for (let i = 0; i < jobsThisPass; i++) { + const jobId = queue.shift()! + const entry = entries.get(jobId) + if (!entry) { + dependencies.delete(jobId) + completed.delete(jobId) + continue + } + + const deps = dependencies.get(jobId) + const ready = + !deps || + deps.size === 0 || + [...deps].every((dep) => dep === jobId || completed.has(dep)) + + if (ready) { + entries.delete(jobId) + dependencies.delete(jobId) + // Run the job. If it throws, we don't mark it complete, allowing the + // error to propagate while maintaining scheduler state consistency. + entry.run() + completed.add(jobId) + ranThisPass = true + } else { + queue.push(jobId) + } + } + + if (!ranThisPass) { + throw new Error( + `Scheduler detected unresolved dependencies for context ${String( + contextId + )}.` + ) + } + } + + this.contexts.delete(contextId) + } + + /** + * Flush all contexts with pending work. Useful during tear-down. + */ + flushAll(): void { + for (const contextId of Array.from(this.contexts.keys())) { + this.flush(contextId) + } + } + + /** Clear all scheduled jobs for a context. */ + clear(contextId: SchedulerContextId): void { + this.contexts.delete(contextId) + } + + /** Check if a context has pending jobs. */ + hasPendingJobs(contextId: SchedulerContextId): boolean { + const context = this.contexts.get(contextId) + if (!context) return false + return context.entries.size > 0 + } + + /** Remove a single job from a context and clean up its dependencies. */ + clearJob(contextId: SchedulerContextId, jobId: unknown): void { + const context = this.contexts.get(contextId) + if (!context) return + + context.entries.delete(jobId) + context.dependencies.delete(jobId) + context.completed.delete(jobId) + context.queue = context.queue.filter((id) => id !== jobId) + + if (context.entries.size === 0) { + this.contexts.delete(contextId) + } + } +} + +export const transactionScopedScheduler = new Scheduler() diff --git a/packages/db/src/transactions.ts b/packages/db/src/transactions.ts index 8d4500c2..9fc79238 100644 --- a/packages/db/src/transactions.ts +++ b/packages/db/src/transactions.ts @@ -5,6 +5,7 @@ import { TransactionNotPendingCommitError, TransactionNotPendingMutateError, } from "./errors" +import { transactionScopedScheduler } from "./scheduler.js" import type { Deferred } from "./deferred" import type { MutationFn, @@ -179,11 +180,21 @@ export function getActiveTransaction(): Transaction | undefined { } function registerTransaction(tx: Transaction) { + // Clear any stale work that may have been left behind if a previous mutate + // scope aborted before we could flush. + transactionScopedScheduler.clear(tx.id) transactionStack.push(tx) } function unregisterTransaction(tx: Transaction) { - transactionStack = transactionStack.filter((t) => t.id !== tx.id) + // Always flush pending work for this transaction before removing it from + // the ambient stack – this runs even if the mutate callback throws. + // If flush throws (e.g., due to a job error), we still clean up the stack. + try { + transactionScopedScheduler.flush(tx.id) + } finally { + transactionStack = transactionStack.filter((t) => t.id !== tx.id) + } } function removeFromPendingList(tx: Transaction) { diff --git a/packages/db/tests/query/scheduler.test.ts b/packages/db/tests/query/scheduler.test.ts new file mode 100644 index 00000000..6eab940b --- /dev/null +++ b/packages/db/tests/query/scheduler.test.ts @@ -0,0 +1,523 @@ +import { afterEach, describe, expect, it, vi } from "vitest" +import { createCollection } from "../../src/collection/index.js" +import { createLiveQueryCollection, eq } from "../../src/query/index.js" +import { createTransaction } from "../../src/transactions.js" +import { transactionScopedScheduler } from "../../src/scheduler.js" +import { CollectionConfigBuilder } from "../../src/query/live/collection-config-builder.js" +import type { FullSyncState } from "../../src/query/live/types.js" +import type { SyncConfig } from "../../src/types.js" + +interface ChangeMessageLike { + type: string + value: any +} + +interface User { + id: number + name: string +} + +interface Task { + id: number + userId: number + title: string +} + +function setupLiveQueryCollections(id: string) { + const users = createCollection({ + id: `${id}-users`, + getKey: (user) => user.id, + startSync: true, + sync: { + sync: ({ begin, commit, markReady }) => { + begin() + commit() + markReady() + }, + }, + }) + + const tasks = createCollection({ + id: `${id}-tasks`, + getKey: (task) => task.id, + startSync: true, + sync: { + sync: ({ begin, commit, markReady }) => { + begin() + commit() + markReady() + }, + }, + }) + + const assignments = createLiveQueryCollection({ + id: `${id}-assignments`, + startSync: true, + query: (q) => + q + .from({ user: users }) + .join({ task: tasks }, ({ user, task }) => eq(user.id, task.userId)) + .select(({ user, task }) => ({ + userId: user.id, + taskId: task?.id, + title: task?.title, + })), + }) + + return { users, tasks, assignments } +} + +function recordBatches(collection: any) { + const batches: Array> = [] + const subscription = collection.subscribeChanges((changes: any) => { + batches.push(changes as Array) + }) + return { + batches, + unsubscribe: () => subscription.unsubscribe(), + } +} + +afterEach(() => { + transactionScopedScheduler.flushAll() +}) + +describe(`live query scheduler`, () => { + it(`runs the live query graph once per transaction that touches multiple collections`, async () => { + const { users, tasks, assignments } = + setupLiveQueryCollections(`single-batch`) + await assignments.preload() + + const recorder = recordBatches(assignments) + + const transaction = createTransaction({ + mutationFn: async () => {}, + autoCommit: false, + }) + + transaction.mutate(() => { + users.insert({ id: 1, name: `Alice` }) + tasks.insert({ id: 1, userId: 1, title: `Write tests` }) + }) + + expect(recorder.batches).toHaveLength(1) + expect(recorder.batches[0]).toHaveLength(1) + expect(recorder.batches[0]![0]).toMatchObject({ + type: `insert`, + value: { + userId: 1, + taskId: 1, + title: `Write tests`, + }, + }) + + recorder.unsubscribe() + transaction.rollback() + }) + + it(`handles nested transactions without emitting duplicate batches`, async () => { + const { users, tasks, assignments } = setupLiveQueryCollections(`nested`) + await assignments.preload() + + const recorder = recordBatches(assignments) + + const outerTx = createTransaction({ + mutationFn: async () => {}, + autoCommit: false, + }) + const innerTx = createTransaction({ + mutationFn: async () => {}, + autoCommit: false, + }) + + outerTx.mutate(() => { + users.insert({ id: 11, name: `Nested User` }) + innerTx.mutate(() => { + tasks.insert({ id: 21, userId: 11, title: `Nested Task` }) + }) + }) + + expect(recorder.batches).toHaveLength(1) + expect(recorder.batches[0]![0]).toMatchObject({ + value: { + userId: 11, + taskId: 21, + title: `Nested Task`, + }, + }) + + recorder.unsubscribe() + innerTx.rollback() + outerTx.rollback() + }) + + it(`clears pending jobs when a transaction rolls back due to an error`, async () => { + const { users, tasks, assignments } = setupLiveQueryCollections(`rollback`) + await assignments.preload() + + const recorder = recordBatches(assignments) + const tx = createTransaction({ + mutationFn: async () => {}, + autoCommit: false, + }) + + expect(() => { + tx.mutate(() => { + users.insert({ id: 31, name: `Temp` }) + tasks.insert({ id: 41, userId: 31, title: `Temp Task` }) + throw new Error(`boom`) + }) + }).toThrowError(`boom`) + + tx.rollback() + + const batchesBeforeFlush = recorder.batches.length + transactionScopedScheduler.flush(tx.id) + expect(recorder.batches.length).toBeGreaterThanOrEqual(batchesBeforeFlush) + if (recorder.batches.length > batchesBeforeFlush) { + const latestBatch = recorder.batches.at(-1)! + expect(latestBatch[0]?.type).toBe(`delete`) + } + expect(transactionScopedScheduler.hasPendingJobs(tx.id)).toBe(false) + // We emit the optimistic insert and, after the explicit rollback, possibly a + // compensating delete – but no duplicate inserts. + expect(recorder.batches[0]![0]).toMatchObject({ type: `insert` }) + + recorder.unsubscribe() + }) + + it(`dedupes batches across multiple subscribers`, async () => { + const { users, tasks, assignments } = + setupLiveQueryCollections(`multi-subscriber`) + await assignments.preload() + + const first = recordBatches(assignments) + const second = recordBatches(assignments) + + const tx = createTransaction({ + mutationFn: async () => {}, + autoCommit: false, + }) + tx.mutate(() => { + users.insert({ id: 51, name: `Multi` }) + tasks.insert({ id: 61, userId: 51, title: `Subscriber Task` }) + }) + + expect(first.batches).toHaveLength(1) + expect(second.batches).toHaveLength(1) + expect(first.batches[0]![0]).toMatchObject({ + value: { + userId: 51, + taskId: 61, + title: `Subscriber Task`, + }, + }) + + first.unsubscribe() + second.unsubscribe() + tx.rollback() + }) + + it(`runs join live queries once after their parent queries settle`, async () => { + const collectionA = createCollection<{ id: number; value: string }>({ + id: `diamond-A`, + getKey: (row) => row.id, + startSync: true, + sync: { + sync: ({ begin, commit, markReady }) => { + begin() + commit() + markReady() + }, + }, + }) + + const collectionB = createCollection<{ id: number; value: string }>({ + id: `diamond-B`, + getKey: (row) => row.id, + startSync: true, + sync: { + sync: ({ begin, commit, markReady }) => { + begin() + commit() + markReady() + }, + }, + }) + + const liveQueryA = createLiveQueryCollection({ + id: `diamond-lqA`, + startSync: true, + query: (q) => + q + .from({ a: collectionA }) + .select(({ a }) => ({ id: a.id, value: a.value })), + }) + + const liveQueryB = createLiveQueryCollection({ + id: `diamond-lqB`, + startSync: true, + query: (q) => + q + .from({ b: collectionB }) + .select(({ b }) => ({ id: b.id, value: b.value })), + }) + + const liveQueryJoin = createLiveQueryCollection({ + id: `diamond-join`, + startSync: true, + query: (q) => + q + .from({ left: liveQueryA }) + .join( + { right: liveQueryB }, + ({ left, right }) => eq(left.id, right.id), + `full` + ) + .select(({ left, right }) => ({ + left: left?.value, + right: right?.value, + })), + }) + + await Promise.all([ + liveQueryA.preload(), + liveQueryB.preload(), + liveQueryJoin.preload(), + ]) + const baseRunCount = liveQueryJoin.utils.getRunCount() + + const tx = createTransaction({ + mutationFn: async () => {}, + autoCommit: false, + }) + + tx.mutate(() => { + collectionA.insert({ id: 1, value: `A1` }) + collectionB.insert({ id: 1, value: `B1` }) + }) + + expect(liveQueryJoin.toArray).toEqual([{ left: `A1`, right: `B1` }]) + expect(liveQueryJoin.utils.getRunCount()).toBe(baseRunCount + 1) + + tx.mutate(() => { + collectionA.update(1, (draft) => { + draft.value = `A1b` + }) + collectionB.update(1, (draft) => { + draft.value = `B1b` + }) + }) + + expect(liveQueryJoin.toArray).toEqual([{ left: `A1b`, right: `B1b` }]) + expect(liveQueryJoin.utils.getRunCount()).toBe(baseRunCount + 2) + tx.rollback() + }) + + it(`runs hybrid joins once when they observe both a live query and a collection`, async () => { + const collectionA = createCollection<{ id: number; value: string }>({ + id: `hybrid-A`, + getKey: (row) => row.id, + startSync: true, + sync: { + sync: ({ begin, commit, markReady }) => { + begin() + commit() + markReady() + }, + }, + }) + + const collectionB = createCollection<{ id: number; value: string }>({ + id: `hybrid-B`, + getKey: (row) => row.id, + startSync: true, + sync: { + sync: ({ begin, commit, markReady }) => { + begin() + commit() + markReady() + }, + }, + }) + + const liveQueryA = createLiveQueryCollection({ + id: `hybrid-lqA`, + startSync: true, + query: (q) => + q + .from({ a: collectionA }) + .select(({ a }) => ({ id: a.id, value: a.value })), + }) + + const hybridJoin = createLiveQueryCollection({ + id: `hybrid-join`, + startSync: true, + query: (q) => + q + .from({ left: liveQueryA }) + .join( + { right: collectionB }, + ({ left, right }) => eq(left.id, right.id), + `full` + ) + .select(({ left, right }) => ({ + left: left?.value, + right: right?.value, + })), + }) + + await Promise.all([liveQueryA.preload(), hybridJoin.preload()]) + const baseRunCount = hybridJoin.utils.getRunCount() + + const tx = createTransaction({ + mutationFn: async () => {}, + autoCommit: false, + }) + + tx.mutate(() => { + collectionA.insert({ id: 7, value: `A7` }) + collectionB.insert({ id: 7, value: `B7` }) + }) + + expect(hybridJoin.toArray).toEqual([{ left: `A7`, right: `B7` }]) + expect(hybridJoin.utils.getRunCount()).toBe(baseRunCount + 1) + + tx.mutate(() => { + collectionA.update(7, (draft) => { + draft.value = `A7b` + }) + collectionB.update(7, (draft) => { + draft.value = `B7b` + }) + }) + + expect(hybridJoin.toArray).toEqual([{ left: `A7b`, right: `B7b` }]) + expect(hybridJoin.utils.getRunCount()).toBe(baseRunCount + 2) + tx.rollback() + }) + + it(`currently single batch when the join sees right-side data before the left`, async () => { + const collectionA = createCollection<{ id: number; value: string }>({ + id: `ordering-A`, + getKey: (row) => row.id, + startSync: true, + sync: { + sync: ({ begin, commit, markReady }) => { + begin() + commit() + markReady() + }, + }, + }) + + const collectionB = createCollection<{ id: number; value: string }>({ + id: `ordering-B`, + getKey: (row) => row.id, + startSync: true, + sync: { + sync: ({ begin, commit, markReady }) => { + begin() + commit() + markReady() + }, + }, + }) + + const liveQueryA = createLiveQueryCollection({ + id: `ordering-lqA`, + startSync: true, + query: (q) => + q + .from({ a: collectionA }) + .select(({ a }) => ({ id: a.id, value: a.value })), + }) + + const join = createLiveQueryCollection({ + id: `ordering-join`, + startSync: true, + query: (q) => + q + .from({ left: liveQueryA }) + .join( + { right: collectionB }, + ({ left, right }) => eq(left.id, right.id), + `full` + ) + .select(({ left, right }) => ({ + left: left?.value, + right: right?.value, + })), + }) + + await Promise.all([liveQueryA.preload(), join.preload()]) + const baseRunCount = join.utils.getRunCount() + + const tx = createTransaction({ + mutationFn: async () => {}, + autoCommit: false, + }) + + tx.mutate(() => { + collectionB.insert({ id: 42, value: `right-first` }) + collectionA.insert({ id: 42, value: `left-later` }) + }) + + expect(join.toArray).toEqual([{ left: `left-later`, right: `right-first` }]) + expect(join.utils.getRunCount()).toBe(baseRunCount + 1) + tx.rollback() + }) + + it(`coalesces load-more callbacks scheduled within the same context`, () => { + const baseCollection = createCollection({ + id: `loader-users`, + getKey: (user) => user.id, + sync: { + sync: () => () => {}, + }, + }) + + const builder = new CollectionConfigBuilder({ + id: `loader-builder`, + query: (q) => q.from({ user: baseCollection }), + }) + + const contextId = Symbol(`loader-context`) + const loader = vi.fn(() => true) + const config = { + begin: vi.fn(), + write: vi.fn(), + commit: vi.fn(), + markReady: vi.fn(), + truncate: vi.fn(), + } as unknown as Parameters[`sync`]>[0] + + const syncState = { + messagesCount: 0, + subscribedToAllCollections: true, + unsubscribeCallbacks: new Set<() => void>(), + graph: { + pendingWork: () => false, + run: vi.fn(), + }, + inputs: {}, + pipeline: {}, + } as unknown as FullSyncState + + const maybeRunGraphSpy = vi + .spyOn(builder, `maybeRunGraph`) + .mockImplementation((_config, _syncState, combinedLoader) => { + combinedLoader?.() + }) + + builder.scheduleGraphRun(config, syncState, loader, { contextId }) + builder.scheduleGraphRun(config, syncState, loader, { contextId }) + + transactionScopedScheduler.flush(contextId) + + expect(loader).toHaveBeenCalledTimes(1) + expect(maybeRunGraphSpy).toHaveBeenCalledTimes(1) + + maybeRunGraphSpy.mockRestore() + }) +})