-
Notifications
You must be signed in to change notification settings - Fork 99
Add a scheduler that ensures single batch of changes from live query due to a transaction that touches multiple source collections #628
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: samwillis/fix-self-join
Are you sure you want to change the base?
Changes from all commits
4fb4831
0f359f7
ddcf5fa
22b8b01
6a2308b
6c78222
3422a43
983cb4e
4d5ff08
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
--- | ||
"@tanstack/db": patch | ||
--- | ||
|
||
Add a scheduler that ensures that if a transaction touches multiple collections that feed into a single live query, the liver query only emits a single batch of updates. This fixes an issue where multiple renders could be triggered from a live query under this situation. | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,10 @@ | ||
import { createCollection } from "../collection/index.js" | ||
import { CollectionConfigBuilder } from "./live/collection-config-builder.js" | ||
import { | ||
getBuilderFromConfig, | ||
registerCollectionBuilder, | ||
} from "./live/collection-registry.js" | ||
import type { RunCountUtils } from "./live/collection-config-builder.js" | ||
import type { LiveQueryCollectionConfig } from "./live/types.js" | ||
import type { InitialQueryBuilder, QueryBuilder } from "./builder/index.js" | ||
import type { Collection } from "../collection/index.js" | ||
|
@@ -35,7 +40,7 @@ export function liveQueryCollectionOptions< | |
TResult extends object = GetResult<TContext>, | ||
>( | ||
config: LiveQueryCollectionConfig<TContext, TResult> | ||
): CollectionConfig<TResult> { | ||
): CollectionConfig<TResult> & { utils: RunCountUtils } { | ||
const collectionConfigBuilder = new CollectionConfigBuilder< | ||
TContext, | ||
TResult | ||
|
@@ -83,7 +88,7 @@ export function createLiveQueryCollection< | |
TResult extends object = GetResult<TContext>, | ||
>( | ||
query: (q: InitialQueryBuilder) => QueryBuilder<TContext> | ||
): Collection<TResult, string | number, {}> | ||
): Collection<TResult, string | number, RunCountUtils> | ||
|
||
// Overload 2: Accept full config object with optional utilities | ||
export function createLiveQueryCollection< | ||
|
@@ -92,7 +97,7 @@ export function createLiveQueryCollection< | |
TUtils extends UtilsRecord = {}, | ||
>( | ||
config: LiveQueryCollectionConfig<TContext, TResult> & { utils?: TUtils } | ||
): Collection<TResult, string | number, TUtils> | ||
): Collection<TResult, string | number, RunCountUtils & TUtils> | ||
|
||
// Implementation | ||
export function createLiveQueryCollection< | ||
|
@@ -103,7 +108,7 @@ export function createLiveQueryCollection< | |
configOrQuery: | ||
| (LiveQueryCollectionConfig<TContext, TResult> & { utils?: TUtils }) | ||
| ((q: InitialQueryBuilder) => QueryBuilder<TContext>) | ||
): Collection<TResult, string | number, TUtils> { | ||
): Collection<TResult, string | number, RunCountUtils & TUtils> { | ||
// Determine if the argument is a function (query) or a config object | ||
if (typeof configOrQuery === `function`) { | ||
// Simple query function case | ||
|
@@ -113,35 +118,50 @@ export function createLiveQueryCollection< | |
) => QueryBuilder<TContext>, | ||
} | ||
const options = liveQueryCollectionOptions<TContext, TResult>(config) | ||
return bridgeToCreateCollection(options) | ||
return bridgeToCreateCollection(options) as Collection< | ||
TResult, | ||
string | number, | ||
RunCountUtils & TUtils | ||
> | ||
} else { | ||
// Config object case | ||
const config = configOrQuery as LiveQueryCollectionConfig< | ||
TContext, | ||
TResult | ||
> & { utils?: TUtils } | ||
const options = liveQueryCollectionOptions<TContext, TResult>(config) | ||
return bridgeToCreateCollection({ | ||
...options, | ||
utils: config.utils, | ||
}) | ||
|
||
const collection = bridgeToCreateCollection(options) | ||
|
||
if (config.utils) { | ||
Object.assign(collection.utils, config.utils) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is weird. Why do we need to mutate it after the facts instead of passing it as an option when calling |
||
} | ||
|
||
return collection as Collection< | ||
TResult, | ||
string | number, | ||
RunCountUtils & TUtils | ||
> | ||
} | ||
} | ||
|
||
/** | ||
* Bridge function that handles the type compatibility between query2's TResult | ||
* and core collection's output type without exposing ugly type assertions to users | ||
*/ | ||
function bridgeToCreateCollection< | ||
TResult extends object, | ||
TUtils extends UtilsRecord = {}, | ||
>( | ||
options: CollectionConfig<TResult> & { utils?: TUtils } | ||
): Collection<TResult, string | number, TUtils> { | ||
// This is the only place we need a type assertion, hidden from user API | ||
return createCollection(options as any) as unknown as Collection< | ||
function bridgeToCreateCollection<TResult extends object>( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why removing the |
||
options: CollectionConfig<TResult> & { utils: RunCountUtils } | ||
): Collection<TResult, string | number, RunCountUtils> { | ||
const collection = createCollection(options as any) as unknown as Collection< | ||
TResult, | ||
string | number, | ||
TUtils | ||
RunCountUtils | ||
> | ||
|
||
const builder = getBuilderFromConfig(options) | ||
if (builder) { | ||
registerCollectionBuilder(collection, builder) | ||
} | ||
|
||
return collection | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,14 @@ | ||
import { D2, output } from "@tanstack/db-ivm" | ||
import { compileQuery } from "../compiler/index.js" | ||
import { buildQuery, getQueryIR } from "../builder/index.js" | ||
import { transactionScopedScheduler } from "../../scheduler.js" | ||
import { getActiveTransaction } from "../../transactions.js" | ||
import { CollectionSubscriber } from "./collection-subscriber.js" | ||
import { | ||
attachBuilderToConfig, | ||
getCollectionBuilder, | ||
} from "./collection-registry.js" | ||
import type { SchedulerContextId } from "../../scheduler.js" | ||
import type { CollectionSubscription } from "../../collection/subscription.js" | ||
import type { RootStreamBuilder } from "@tanstack/db-ivm" | ||
import type { OrderByOptimizationInfo } from "../compiler/order-by.js" | ||
|
@@ -11,6 +18,7 @@ import type { | |
KeyedStream, | ||
ResultStream, | ||
SyncConfig, | ||
UtilsRecord, | ||
} from "../../types.js" | ||
import type { Context, GetResult } from "../builder/types.js" | ||
import type { BasicExpression, QueryIR } from "../ir.js" | ||
|
@@ -22,6 +30,16 @@ import type { | |
SyncState, | ||
} from "./types.js" | ||
|
||
export type RunCountUtils = UtilsRecord & { | ||
getRunCount: () => number | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have added a |
||
} | ||
|
||
type PendingGraphRun<TResult extends object> = { | ||
config: Parameters<SyncConfig<TResult>[`sync`]>[0] | ||
syncState: FullSyncState | ||
loadCallbacks: Set<() => boolean> | ||
} | ||
|
||
// Global counter for auto-generated collection IDs | ||
let liveQueryCollectionCounter = 0 | ||
|
||
|
@@ -46,6 +64,7 @@ export class CollectionConfigBuilder< | |
private readonly compare?: (val1: TResult, val2: TResult) => number | ||
|
||
private isGraphRunning = false | ||
private runCount = 0 | ||
|
||
private graphCache: D2 | undefined | ||
private inputsCache: Record<string, RootStreamBuilder<unknown>> | undefined | ||
|
@@ -54,6 +73,15 @@ export class CollectionConfigBuilder< | |
| Map<string, BasicExpression<boolean>> | ||
| undefined | ||
|
||
private readonly aliasDependencies: Record< | ||
string, | ||
Array<CollectionConfigBuilder<any, any>> | ||
> = Object.create(null) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need |
||
|
||
private readonly builderDependencies = new Set< | ||
CollectionConfigBuilder<any, any> | ||
>() | ||
|
||
// Map of source alias to subscription | ||
readonly subscriptions: Record<string, CollectionSubscription> = {} | ||
// Map of source aliases to functions that load keys for that lazy source | ||
|
@@ -92,8 +120,8 @@ export class CollectionConfigBuilder< | |
this.compileBasePipeline() | ||
} | ||
|
||
getConfig(): CollectionConfig<TResult> { | ||
return { | ||
getConfig(): CollectionConfig<TResult> & { utils: RunCountUtils } { | ||
const config = { | ||
id: this.id, | ||
getKey: | ||
this.config.getKey || | ||
|
@@ -106,7 +134,12 @@ export class CollectionConfigBuilder< | |
onUpdate: this.config.onUpdate, | ||
onDelete: this.config.onDelete, | ||
startSync: this.config.startSync, | ||
} | ||
utils: { | ||
getRunCount: this.getRunCount.bind(this), | ||
}, | ||
} as CollectionConfig<TResult> & { utils: RunCountUtils } | ||
attachBuilderToConfig(config, this) | ||
return config | ||
} | ||
|
||
getCollectionIdForAlias(alias: string): string { | ||
|
@@ -176,13 +209,149 @@ export class CollectionConfigBuilder< | |
} | ||
} | ||
|
||
/** | ||
* Schedules a graph run with the transaction-scoped scheduler. | ||
* Ensures each builder runs at most once per transaction, with automatic dependency tracking | ||
* to run parent queries before child queries. Outside a transaction, runs immediately. | ||
* | ||
* Multiple calls during a transaction are coalesced into a single execution. | ||
* Dependencies are auto-discovered from subscribed live queries, or can be overridden. | ||
* Load callbacks are combined when entries merge. | ||
* | ||
* @param config - Collection sync configuration with begin/commit/markReady callbacks | ||
* @param syncState - The full sync state containing the D2 graph, inputs, and pipeline | ||
* @param callback - Optional callback to load more data if needed (returns true when done) | ||
* @param options - Optional scheduling configuration | ||
* @param options.contextId - Transaction ID to group work; defaults to active transaction | ||
* @param options.jobId - Unique identifier for this job; defaults to this builder instance | ||
* @param options.alias - Source alias that triggered this schedule; adds alias-specific dependencies | ||
* @param options.dependencies - Explicit dependency list; overrides auto-discovered dependencies | ||
*/ | ||
scheduleGraphRun( | ||
config: Parameters<SyncConfig<TResult>[`sync`]>[0], | ||
syncState: FullSyncState, | ||
callback?: () => boolean, | ||
options?: { | ||
contextId?: SchedulerContextId | ||
jobId?: unknown | ||
alias?: string | ||
dependencies?: Array<CollectionConfigBuilder<any, any>> | ||
} | ||
) { | ||
const contextId = options?.contextId ?? getActiveTransaction()?.id | ||
// Use the builder instance as the job ID for deduplication. This is memory-safe | ||
// because the scheduler's context Map is deleted after flushing (no long-term retention). | ||
const jobId = options?.jobId ?? this | ||
const dependencyBuilders = (() => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: |
||
if (options?.dependencies) { | ||
return options.dependencies | ||
} | ||
|
||
const deps = new Set(this.builderDependencies) | ||
if (options?.alias) { | ||
const aliasDeps = this.aliasDependencies[options.alias] | ||
if (aliasDeps) { | ||
for (const dep of aliasDeps) { | ||
deps.add(dep) | ||
} | ||
} | ||
} | ||
|
||
deps.delete(this) | ||
|
||
return Array.from(deps) | ||
})() | ||
// We intentionally scope deduplication to the builder instance. Each instance | ||
// owns caches and compiled pipelines, so sharing an entry across instances that | ||
// merely reuse the same string id would bind the wrong `this` in the run closure. | ||
|
||
const createEntry = () => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does this need to be a function? It doesn't take any arguments and it's only called once by the scheduler. We could as well just construct the object that is returned by this function and pass that object to const state: PendingGraphRun<TResult> = {
config,
syncState,
loadCallbacks: callback ? new Set([callback]) : new Set(),
}
const entry = {
state,
run: () => {
...
}
}
transactionScopedScheduler.schedule({
contextId,
jobId,
dependencies: dependencyBuilders,
entry,
updateEntry,
}) |
||
const state: PendingGraphRun<TResult> = { | ||
config, | ||
syncState, | ||
loadCallbacks: new Set(), | ||
} | ||
|
||
if (callback) { | ||
state.loadCallbacks.add(callback) | ||
} | ||
|
||
return { | ||
state, | ||
run: () => { | ||
this.incrementRunCount() | ||
const combinedLoader = | ||
state.loadCallbacks.size > 0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why doing a ternary if test here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would refactor it this way: const combinedLoader = () => {
let allDone = true
let firstError: unknown
state.loadCallbacks.forEach((loader) => {
try {
allDone = loader() && allDone
} catch (error) {
allDone = false
firstError ??= error
}
})
if (firstError) {
throw firstError
}
// Returning false signals that callers should schedule another pass.
return allDone
} |
||
? () => { | ||
let allDone = true | ||
let firstError: unknown | ||
|
||
for (const loader of state.loadCallbacks) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can simplify this for (const loader of state.loadCallbacks) {
try {
allDone = loader() && allDone
} catch (error) {
allDone = false
firstError ??= error
}
} |
||
try { | ||
const result = loader() | ||
if (result === false) { | ||
allDone = false | ||
} | ||
} catch (error) { | ||
allDone = false | ||
if (firstError === undefined) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can simplify to |
||
firstError = error | ||
} | ||
} | ||
} | ||
|
||
if (firstError !== undefined) { | ||
throw firstError | ||
} | ||
|
||
// Returning false signals that callers should schedule another pass. | ||
return allDone | ||
} | ||
: undefined | ||
|
||
try { | ||
this.maybeRunGraph(state.config, state.syncState, combinedLoader) | ||
} finally { | ||
// Clear callbacks after run to avoid carrying stale closures across transactions | ||
state.loadCallbacks.clear() | ||
} | ||
}, | ||
} | ||
} | ||
|
||
const updateEntry = (entry: { state: PendingGraphRun<TResult> }) => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't like that we are creating a function that closes over the // Define this in an utility file or in the Scheduler file
function updateEntry(entry, config, syncState, callback) {
entry.state.config = config
entry.state.syncState = syncState
if (callback) {
entry.state.loadCallbacks.add(callback)
}
}
// The here in `scheduleGraphRun`
transactionScopedScheduler.schedule({
contextId,
jobId,
dependencies: dependencyBuilders,
entry, // see my other comment
// now pass the state explicitly instead of passing a function that closes over it
config,
syncState,
callback
}) |
||
entry.state.config = config | ||
entry.state.syncState = syncState | ||
|
||
if (callback) { | ||
entry.state.loadCallbacks.add(callback) | ||
} | ||
} | ||
|
||
transactionScopedScheduler.schedule({ | ||
contextId, | ||
jobId, | ||
dependencies: dependencyBuilders, | ||
createEntry, | ||
updateEntry, | ||
}) | ||
} | ||
|
||
private getSyncConfig(): SyncConfig<TResult> { | ||
return { | ||
rowUpdateMode: `full`, | ||
sync: this.syncFn.bind(this), | ||
} | ||
} | ||
|
||
incrementRunCount() { | ||
this.runCount++ | ||
} | ||
|
||
getRunCount() { | ||
return this.runCount | ||
} | ||
|
||
private syncFn(config: Parameters<SyncConfig<TResult>[`sync`]>[0]) { | ||
const syncState: SyncState = { | ||
messagesCount: 0, | ||
|
@@ -202,7 +371,7 @@ export class CollectionConfigBuilder< | |
) | ||
|
||
// Initial run with callback to load more data if needed | ||
this.maybeRunGraph(config, fullSyncState, loadMoreDataCallbacks) | ||
this.scheduleGraphRun(config, fullSyncState, loadMoreDataCallbacks) | ||
|
||
// Return the unsubscribe function | ||
return () => { | ||
|
@@ -412,6 +581,14 @@ export class CollectionConfigBuilder< | |
const collection = | ||
this.collectionByAlias[alias] ?? this.collections[collectionId]! | ||
|
||
const dependencyBuilder = getCollectionBuilder(collection) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: |
||
if (dependencyBuilder && dependencyBuilder !== this) { | ||
this.aliasDependencies[alias] = [dependencyBuilder] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If it's an array, should we check if there is already an array for |
||
this.builderDependencies.add(dependencyBuilder) | ||
} else { | ||
this.aliasDependencies[alias] = [] | ||
} | ||
|
||
const collectionSubscriber = new CollectionSubscriber( | ||
alias, | ||
collectionId, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: the liver query