Skip to content

Commit 2a796ff

Browse files
committed
Fix root cause of Electric infinite loop with ORDER BY/LIMIT queries
The infinite loop occurred because `loadMoreIfNeeded` kept trying to load data even when the local index was exhausted. This happened when: 1. TopK had fewer items than limit (WHERE filtered out data) 2. loadMoreIfNeeded tried to load more → no local data found 3. Loop continued indefinitely since TopK still needed data Root cause fix: - Add `localIndexExhausted` flag to CollectionSubscriber - Track when local index has no more data for current cursor - Stop calling loadMoreIfNeeded when exhausted - Reset flag when new data arrives from sync layer (inserts) - requestLimitedSnapshot now returns boolean indicating if data was found Error handling improvements (per review feedback): - D2.run() now throws Error when iteration limit exceeded - Caller catches and calls transitionToError() for proper error state - requestLimitedSnapshot returns false when iteration limit hit - Live query properly shows error state if safeguard limits are hit This fixes the issue for both eager and progressive syncMode.
1 parent d8f95da commit 2a796ff

File tree

4 files changed

+76
-24
lines changed

4 files changed

+76
-24
lines changed

packages/db-ivm/src/d2.ts

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,14 +64,11 @@ export class D2 implements ID2 {
6464
let iterations = 0
6565

6666
while (this.pendingWork()) {
67-
iterations++
68-
if (iterations > MAX_RUN_ITERATIONS) {
69-
console.error(
70-
`[D2 Graph] Execution exceeded ${MAX_RUN_ITERATIONS} iterations. ` +
71-
`This may indicate an infinite loop in the dataflow graph. ` +
72-
`Breaking out to prevent app freeze.`,
67+
if (++iterations > MAX_RUN_ITERATIONS) {
68+
throw new Error(
69+
`D2 graph execution exceeded ${MAX_RUN_ITERATIONS} iterations. ` +
70+
`This may indicate an infinite loop in the dataflow graph.`,
7371
)
74-
break
7572
}
7673
this.step()
7774
}

packages/db/src/collection/subscription.ts

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -410,13 +410,15 @@ export class CollectionSubscription
410410
* Note 1: it may load more rows than the provided LIMIT because it loads all values equal to the first cursor value + limit values greater.
411411
* This is needed to ensure that it does not accidentally skip duplicate values when the limit falls in the middle of some duplicated values.
412412
* Note 2: it does not send keys that have already been sent before.
413+
*
414+
* @returns true if local data was found and sent, false if the local index was exhausted
413415
*/
414416
requestLimitedSnapshot({
415417
orderBy,
416418
limit,
417419
minValues,
418420
offset,
419-
}: RequestLimitedSnapshotOptions) {
421+
}: RequestLimitedSnapshotOptions): boolean {
420422
if (!limit) throw new Error(`limit is required`)
421423

422424
if (!this.orderByIndex) {
@@ -508,15 +510,16 @@ export class CollectionSubscription
508510
// for any legitimate use case.
509511
const MAX_SNAPSHOT_ITERATIONS = 10000
510512
let snapshotIterations = 0
513+
let hitIterationLimit = false
511514

512515
while (valuesNeeded() > 0 && !collectionExhausted()) {
513-
snapshotIterations++
514-
if (snapshotIterations > MAX_SNAPSHOT_ITERATIONS) {
516+
if (++snapshotIterations > MAX_SNAPSHOT_ITERATIONS) {
515517
console.error(
516518
`[TanStack DB] requestLimitedSnapshot exceeded ${MAX_SNAPSHOT_ITERATIONS} iterations. ` +
517519
`This may indicate an infinite loop in index iteration or filtering. ` +
518520
`Breaking out to prevent app freeze. Collection: ${this.collection.id}`,
519521
)
522+
hitIterationLimit = true
520523
break
521524
}
522525

@@ -611,6 +614,14 @@ export class CollectionSubscription
611614
// Track this loadSubset call
612615
this.loadedSubsets.push(loadOptions)
613616
this.trackLoadSubsetPromise(syncResult)
617+
618+
// Return whether local data was found and iteration completed normally.
619+
// Return false if:
620+
// - No local data was found (index exhausted)
621+
// - Iteration limit was hit (abnormal exit)
622+
// Either case signals that the caller should stop trying to load more.
623+
// The async loadSubset may still return data later.
624+
return changes.length > 0 && !hitIterationLimit
614625
}
615626

616627
// TODO: also add similar test but that checks that it can also load it from the collection's loadSubset function

packages/db/src/query/live/collection-config-builder.ts

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -346,18 +346,25 @@ export class CollectionConfigBuilder<
346346
let iterations = 0
347347

348348
while (syncState.graph.pendingWork()) {
349-
iterations++
350-
if (iterations > MAX_GRAPH_ITERATIONS) {
351-
console.error(
352-
`[TanStack DB] Graph execution exceeded ${MAX_GRAPH_ITERATIONS} iterations. ` +
353-
`This may indicate an infinite loop caused by data loading triggering ` +
354-
`continuous graph updates. Breaking out of the loop to prevent app freeze. ` +
355-
`Query ID: ${this.id}`,
349+
if (++iterations > MAX_GRAPH_ITERATIONS) {
350+
this.transitionToError(
351+
`Graph execution exceeded ${MAX_GRAPH_ITERATIONS} iterations. ` +
352+
`This likely indicates an infinite loop caused by data loading ` +
353+
`triggering continuous graph updates.`,
356354
)
357-
break
355+
return
358356
}
359357

360-
syncState.graph.run()
358+
try {
359+
syncState.graph.run()
360+
} catch (error) {
361+
// D2 graph throws when it exceeds its internal iteration limit
362+
// Transition to error state so callers can detect incomplete data
363+
this.transitionToError(
364+
error instanceof Error ? error.message : String(error),
365+
)
366+
return
367+
}
361368
// Flush accumulated changes after each graph step to commit them as one transaction.
362369
// This ensures intermediate join states (like null on one side) don't cause
363370
// duplicate key errors when the full join result arrives in the same step.

packages/db/src/query/live/collection-subscriber.ts

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,12 @@ export class CollectionSubscriber<
3737
// can potentially send the same item to D2 multiple times.
3838
private sentToD2Keys = new Set<string | number>()
3939

40+
// Track when the local index has been exhausted for the current cursor position.
41+
// When true, loadMoreIfNeeded will not try to load more data until new data arrives.
42+
// This prevents infinite loops when the TopK can't be filled because WHERE filters
43+
// out all available data.
44+
private localIndexExhausted = false
45+
4046
constructor(
4147
private alias: string,
4248
private collectionId: string,
@@ -301,11 +307,25 @@ export class CollectionSubscriber<
301307
return true
302308
}
303309

310+
// If we've already exhausted the local index, don't try to load more.
311+
// This prevents infinite loops when the TopK can't be filled because
312+
// the WHERE clause filters out all available local data.
313+
// The flag is reset when new data arrives from the sync layer.
314+
if (this.localIndexExhausted) {
315+
return true
316+
}
317+
304318
// `dataNeeded` probes the orderBy operator to see if it needs more data
305319
// if it needs more data, it returns the number of items it needs
306320
const n = dataNeeded()
307321
if (n > 0) {
308-
this.loadNextItems(n, subscription)
322+
const foundLocalData = this.loadNextItems(n, subscription)
323+
if (!foundLocalData) {
324+
// No local data found - mark the index as exhausted so we don't
325+
// keep trying in subsequent graph iterations. The sync layer's
326+
// loadSubset has been called and may return data asynchronously.
327+
this.localIndexExhausted = true
328+
}
309329
}
310330
return true
311331
}
@@ -320,7 +340,19 @@ export class CollectionSubscriber<
320340
return
321341
}
322342

323-
const trackedChanges = this.trackSentValues(changes, orderByInfo.comparator)
343+
// Reset localIndexExhausted when new data arrives from the sync layer.
344+
// This allows loadMoreIfNeeded to try loading again since there's new data.
345+
// We only reset on inserts since updates/deletes don't add new data to load.
346+
const changesArray = Array.isArray(changes) ? changes : [...changes]
347+
const hasInserts = changesArray.some((c) => c.type === `insert`)
348+
if (hasInserts) {
349+
this.localIndexExhausted = false
350+
}
351+
352+
const trackedChanges = this.trackSentValues(
353+
changesArray,
354+
orderByInfo.comparator,
355+
)
324356

325357
// Cache the loadMoreIfNeeded callback on the subscription using a symbol property.
326358
// This ensures we pass the same function instance to the scheduler each time,
@@ -342,10 +374,14 @@ export class CollectionSubscriber<
342374

343375
// Loads the next `n` items from the collection
344376
// starting from the biggest item it has sent
345-
private loadNextItems(n: number, subscription: CollectionSubscription) {
377+
// Returns true if local data was found, false if the local index is exhausted
378+
private loadNextItems(
379+
n: number,
380+
subscription: CollectionSubscription,
381+
): boolean {
346382
const orderByInfo = this.getOrderByInfo()
347383
if (!orderByInfo) {
348-
return
384+
return false
349385
}
350386
const { orderBy, valueExtractorForRawRow, offset } = orderByInfo
351387
const biggestSentRow = this.biggest
@@ -369,7 +405,8 @@ export class CollectionSubscriber<
369405

370406
// Take the `n` items after the biggest sent value
371407
// Pass the current window offset to ensure proper deduplication
372-
subscription.requestLimitedSnapshot({
408+
// Returns true if local data was found
409+
return subscription.requestLimitedSnapshot({
373410
orderBy: normalizedOrderBy,
374411
limit: n,
375412
minValues,

0 commit comments

Comments
 (0)