Skip to content

Ensure liveQueryCollections are not ready until all it's source collections are ready #390

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/clever-ducks-strive.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"@tanstack/svelte-db": patch
"@tanstack/react-db": patch
"@tanstack/solid-db": patch
"@tanstack/vue-db": patch
"@tanstack/db": patch
---

Ensure that the ready status is correctly returned from a live query
6 changes: 6 additions & 0 deletions packages/db/src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,12 @@ export class CollectionImpl<
for (const listener of this.changeListeners) {
listener([])
}
// Emit to key-specific listeners
for (const [_key, keyListeners] of this.changeKeyListeners) {
for (const listener of keyListeners) {
listener([])
}
}
}

/**
Expand Down
12 changes: 10 additions & 2 deletions packages/db/src/query/live-query-collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,12 @@ export function liveQueryCollectionOptions<
const collections = extractCollectionsFromQuery(query)

const allCollectionsReady = () => {
return Object.values(collections).every(
(collection) => collection.status === `ready`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collection's have a method that checks exactly this: collection.isReady(). Better to use that one.

)
}

const allCollectionsReadyOrInitialCommit = () => {
return Object.values(collections).every(
(collection) =>
collection.status === `ready` || collection.status === `initialCommit`
Expand Down Expand Up @@ -294,7 +300,7 @@ export function liveQueryCollectionOptions<

const maybeRunGraph = () => {
// We only run the graph if all the collections are ready
if (allCollectionsReady()) {
if (allCollectionsReadyOrInitialCommit()) {
graph.run()
// On the initial run, we may need to do an empty commit to ensure that
// the collection is initialized
Expand All @@ -303,7 +309,9 @@ export function liveQueryCollectionOptions<
commit()
}
// Mark the collection as ready after the first successful run
markReady()
if (allCollectionsReady()) {
markReady()
}
}
}

Expand Down
9 changes: 6 additions & 3 deletions packages/db/tests/query/indexes.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ describe(`Query Index Optimization`, () => {
getKey: (item) => item.id,
startSync: true,
sync: {
sync: ({ begin, write, commit }) => {
sync: ({ begin, write, commit, markReady }) => {
// Provide initial data through sync
begin()
for (const item of testData) {
Expand All @@ -247,6 +247,7 @@ describe(`Query Index Optimization`, () => {
})
}
commit()
markReady()

// Listen for mutations and sync them back (only register once)
if (!emitter.all.has(`sync`)) {
Expand Down Expand Up @@ -534,7 +535,7 @@ describe(`Query Index Optimization`, () => {
getKey: (item) => item.id,
startSync: true,
sync: {
sync: ({ begin, write, commit }) => {
sync: ({ begin, write, commit, markReady }) => {
begin()
write({
type: `insert`,
Expand All @@ -547,6 +548,7 @@ describe(`Query Index Optimization`, () => {
},
})
commit()
markReady()
},
},
})
Expand Down Expand Up @@ -622,7 +624,7 @@ describe(`Query Index Optimization`, () => {
getKey: (item) => item.id,
startSync: true,
sync: {
sync: ({ begin, write, commit }) => {
sync: ({ begin, write, commit, markReady }) => {
begin()
write({
type: `insert`,
Expand All @@ -645,6 +647,7 @@ describe(`Query Index Optimization`, () => {
},
})
commit()
markReady()
},
},
})
Expand Down
19 changes: 19 additions & 0 deletions packages/db/tests/query/live-query-collection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,23 @@ describe(`createLiveQueryCollection`, () => {
expect(liveQuery.status).toBe(`ready`)
expect(liveQuery.size).toBe(0)
})

it(`shouldn't call markReady when source collection sync doesn't call markReady`, () => {
const collection = createCollection<{ id: string }>({
sync: {
sync({ begin, commit }) {
begin()
commit()
},
},
getKey: (item) => item.id,
startSync: true,
})

const liveQuery = createLiveQueryCollection({
query: (q) => q.from({ collection }),
startSync: true,
})
expect(liveQuery.isReady()).toBe(false)
})
})
27 changes: 15 additions & 12 deletions packages/react-db/tests/useLiveQuery.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -1077,15 +1077,17 @@ describe(`Query Collections`, () => {
it(`should update isLoading when collection status changes`, async () => {
let beginFn: (() => void) | undefined
let commitFn: (() => void) | undefined
let markReadyFn: (() => void) | undefined

const collection = createCollection<Person>({
id: `status-change-has-loaded-test`,
getKey: (person: Person) => person.id,
startSync: false,
sync: {
sync: ({ begin, commit }) => {
sync: ({ begin, commit, markReady }) => {
beginFn = begin
commitFn = commit
markReadyFn = markReady
// Don't sync immediately
},
},
Expand Down Expand Up @@ -1116,9 +1118,10 @@ describe(`Query Collections`, () => {

// Trigger the first commit to make collection ready
act(() => {
if (beginFn && commitFn) {
if (beginFn && commitFn && markReadyFn) {
beginFn()
commitFn()
markReadyFn()
}
})

Expand Down Expand Up @@ -1202,8 +1205,10 @@ describe(`Query Collections`, () => {
it(`should handle isLoading with complex queries including joins`, async () => {
let personBeginFn: (() => void) | undefined
let personCommitFn: (() => void) | undefined
let personMarkReadyFn: (() => void) | undefined
let issueBeginFn: (() => void) | undefined
let issueCommitFn: (() => void) | undefined
let issueMarkReadyFn: (() => void) | undefined

const personCollection = createCollection<Person>({
id: `join-has-loaded-persons`,
Expand All @@ -1212,10 +1217,8 @@ describe(`Query Collections`, () => {
sync: {
sync: ({ begin, commit, markReady }) => {
personBeginFn = begin
personCommitFn = () => {
commit()
markReady()
}
personCommitFn = commit
personMarkReadyFn = markReady
// Don't sync immediately
},
},
Expand All @@ -1231,10 +1234,8 @@ describe(`Query Collections`, () => {
sync: {
sync: ({ begin, commit, markReady }) => {
issueBeginFn = begin
issueCommitFn = () => {
commit()
markReady()
}
issueCommitFn = commit
issueMarkReadyFn = markReady
// Don't sync immediately
},
},
Expand Down Expand Up @@ -1269,13 +1270,15 @@ describe(`Query Collections`, () => {

// Trigger the first commit for both collections to make them ready
act(() => {
if (personBeginFn && personCommitFn) {
if (personBeginFn && personCommitFn && personMarkReadyFn) {
personBeginFn()
personCommitFn()
personMarkReadyFn()
}
if (issueBeginFn && issueCommitFn) {
if (issueBeginFn && issueCommitFn && issueMarkReadyFn) {
issueBeginFn()
issueCommitFn()
issueMarkReadyFn()
}
})

Expand Down
27 changes: 15 additions & 12 deletions packages/solid-db/tests/useLiveQuery.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -1053,15 +1053,17 @@ describe(`Query Collections`, () => {
it(`should update isLoading when collection status changes`, async () => {
let beginFn: (() => void) | undefined
let commitFn: (() => void) | undefined
let markReadyFn: (() => void) | undefined

const collection = createCollection<Person>({
id: `status-change-has-loaded-test`,
getKey: (person: Person) => person.id,
startSync: false,
sync: {
sync: ({ begin, commit }) => {
sync: ({ begin, commit, markReady }) => {
beginFn = begin
commitFn = commit
markReadyFn = markReady
// Don't sync immediately
},
},
Expand Down Expand Up @@ -1089,9 +1091,10 @@ describe(`Query Collections`, () => {
collection.preload()

// Trigger the first commit to make collection ready
if (beginFn && commitFn) {
if (beginFn && commitFn && markReadyFn) {
beginFn()
commitFn()
markReadyFn()
}

// Insert data
Expand Down Expand Up @@ -1172,8 +1175,10 @@ describe(`Query Collections`, () => {
it(`should handle isLoading with complex queries including joins`, async () => {
let personBeginFn: (() => void) | undefined
let personCommitFn: (() => void) | undefined
let personMarkReadyFn: (() => void) | undefined
let issueBeginFn: (() => void) | undefined
let issueCommitFn: (() => void) | undefined
let issueMarkReadyFn: (() => void) | undefined

const personCollection = createCollection<Person>({
id: `join-has-loaded-persons`,
Expand All @@ -1182,10 +1187,8 @@ describe(`Query Collections`, () => {
sync: {
sync: ({ begin, commit, markReady }) => {
personBeginFn = begin
personCommitFn = () => {
commit()
markReady()
}
personCommitFn = commit
personMarkReadyFn = markReady
// Don't sync immediately
},
},
Expand All @@ -1201,10 +1204,8 @@ describe(`Query Collections`, () => {
sync: {
sync: ({ begin, commit, markReady }) => {
issueBeginFn = begin
issueCommitFn = () => {
commit()
markReady()
}
issueCommitFn = commit
issueMarkReadyFn = markReady
// Don't sync immediately
},
},
Expand Down Expand Up @@ -1236,13 +1237,15 @@ describe(`Query Collections`, () => {
issueCollection.preload()

// Trigger the first commit for both collections to make them ready
if (personBeginFn && personCommitFn) {
if (personBeginFn && personCommitFn && personMarkReadyFn) {
personBeginFn()
personCommitFn()
personMarkReadyFn()
}
if (issueBeginFn && issueCommitFn) {
if (issueBeginFn && issueCommitFn && issueMarkReadyFn) {
issueBeginFn()
issueCommitFn()
issueMarkReadyFn()
}

// Insert data into both collections
Expand Down
16 changes: 14 additions & 2 deletions packages/svelte-db/src/useLiveQuery.svelte.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { untrack } from "svelte"
import { flushSync, untrack } from "svelte"
import { createLiveQueryCollection } from "@tanstack/db"
import { SvelteMap } from "svelte/reactivity"
import type {
Expand Down Expand Up @@ -246,7 +246,10 @@ export function useLiveQuery(

if (isCollection) {
// It's already a collection, ensure sync is started for Svelte helpers
unwrappedParam.startSyncImmediate()
// Only start sync if the collection is in idle state
if (unwrappedParam.status === `idle`) {
unwrappedParam.startSyncImmediate()
}
return unwrappedParam
}

Expand Down Expand Up @@ -312,6 +315,15 @@ export function useLiveQuery(
// Initialize data array in correct order
syncDataFromCollection(currentCollection)

// Listen for the first ready event to catch status transitions
// that might not trigger change events (fixes async status transition bug)
currentCollection.onFirstReady(() => {
// Use flushSync to ensure Svelte reactivity updates properly
flushSync(() => {
status = currentCollection.status
})
})

// Subscribe to collection changes with granular updates
currentUnsubscribe = currentCollection.subscribeChanges(
(changes: Array<ChangeMessage<any>>) => {
Expand Down
Loading