Skip to content

Commit 7bb75b5

Browse files
kevin-dpsamwillis
authored andcommitted
Add onLoadMore callback and call it from the requestSnapshot methods
1 parent 59b406e commit 7bb75b5

File tree

5 files changed

+181
-105
lines changed

5 files changed

+181
-105
lines changed

packages/db/src/collection/subscription.ts

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import {
44
createFilterFunctionFromExpression,
55
createFilteredCallback,
66
} from "./change-events.js"
7-
import type { BasicExpression } from "../query/ir.js"
7+
import { ensureIndexForExpression } from "./indexes/auto-index.js"
8+
import { and } from "./query/index.js"
9+
import type { BasicExpression, OrderBy } from "../query/ir.js"
810
import type { BaseIndex } from "../indexes/base-index.js"
911
import type { ChangeMessage } from "../types.js"
1012
import type { CollectionImpl } from "./index.js"
@@ -15,8 +17,9 @@ type RequestSnapshotOptions = {
1517
}
1618

1719
type RequestLimitedSnapshotOptions = {
18-
minValue?: any
20+
orderBy: OrderBy
1921
limit: number
22+
minValue?: any
2023
}
2124

2225
type CollectionSubscriptionOptions = {
@@ -117,6 +120,13 @@ export class CollectionSubscription {
117120
this.loadedInitialState = true
118121
}
119122

123+
// Request the sync layer to load more data
124+
// don't await it, we will load the data into the collection when it comes in
125+
this.collection.syncMore({
126+
where: stateOpts.where,
127+
})
128+
129+
// Also load data immediately from the collection
120130
const snapshot = this.collection.currentStateAsChanges(stateOpts)
121131

122132
if (snapshot === undefined) {
@@ -140,7 +150,11 @@ export class CollectionSubscription {
140150
* It uses that range index to load the items in the order of the index.
141151
* Note: it does not send keys that have already been sent before.
142152
*/
143-
requestLimitedSnapshot({ limit, minValue }: RequestLimitedSnapshotOptions) {
153+
requestLimitedSnapshot({
154+
orderBy,
155+
limit,
156+
minValue,
157+
}: RequestLimitedSnapshotOptions) {
144158
if (!limit) throw new Error(`limit is required`)
145159

146160
if (!this.orderByIndex) {
@@ -190,6 +204,14 @@ export class CollectionSubscription {
190204
}
191205

192206
this.callback(changes)
207+
208+
// Request the sync layer to load more data
209+
// don't await it, we will load the data into the collection when it comes in
210+
this.collection.syncMore({
211+
where,
212+
limit,
213+
orderBy,
214+
})
193215
}
194216

195217
/**

packages/db/src/collection/sync.ts

Lines changed: 136 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,13 @@ import {
88
SyncTransactionAlreadyCommittedWriteError,
99
} from "../errors"
1010
import type { StandardSchemaV1 } from "@standard-schema/spec"
11-
import type { ChangeMessage, CollectionConfig } from "../types"
11+
import type {
12+
ChangeMessage,
13+
CollectionConfig,
14+
CleanupFn,
15+
OnLoadMoreOptions,
16+
SyncConfigRes,
17+
} from "../types"
1218
import type { CollectionImpl } from "./index.js"
1319
import type { CollectionStateManager } from "./state"
1420
import type { CollectionLifecycleManager } from "./lifecycle"
@@ -27,6 +33,9 @@ export class CollectionSyncManager<
2733

2834
public preloadPromise: Promise<void> | null = null
2935
public syncCleanupFn: (() => void) | null = null
36+
public syncOnLoadMoreFn:
37+
| ((options: OnLoadMoreOptions) => void | Promise<void>)
38+
| null = null
3039

3140
/**
3241
* Creates a new CollectionSyncManager instance
@@ -62,106 +71,111 @@ export class CollectionSyncManager<
6271
this.lifecycle.setStatus(`loading`)
6372

6473
try {
65-
const cleanupFn = this.config.sync.sync({
66-
collection: this.collection,
67-
begin: () => {
68-
state.pendingSyncedTransactions.push({
69-
committed: false,
70-
operations: [],
71-
deletedKeys: new Set(),
72-
})
73-
},
74-
write: (messageWithoutKey: Omit<ChangeMessage<TOutput>, `key`>) => {
75-
const pendingTransaction =
76-
state.pendingSyncedTransactions[
77-
state.pendingSyncedTransactions.length - 1
78-
]
79-
if (!pendingTransaction) {
80-
throw new NoPendingSyncTransactionWriteError()
81-
}
82-
if (pendingTransaction.committed) {
83-
throw new SyncTransactionAlreadyCommittedWriteError()
84-
}
85-
const key = this.config.getKey(messageWithoutKey.value)
86-
87-
// Check if an item with this key already exists when inserting
88-
if (messageWithoutKey.type === `insert`) {
89-
const insertingIntoExistingSynced = state.syncedData.has(key)
90-
const hasPendingDeleteForKey =
91-
pendingTransaction.deletedKeys.has(key)
92-
const isTruncateTransaction = pendingTransaction.truncate === true
93-
// Allow insert after truncate in the same transaction even if it existed in syncedData
94-
if (
95-
insertingIntoExistingSynced &&
96-
!hasPendingDeleteForKey &&
97-
!isTruncateTransaction
98-
) {
99-
throw new DuplicateKeySyncError(key, this.id)
74+
const syncRes = normalizeSyncFnResult(
75+
this.config.sync.sync({
76+
collection: this,
77+
begin: () => {
78+
this.pendingSyncedTransactions.push({
79+
committed: false,
80+
operations: [],
81+
deletedKeys: new Set(),
82+
})
83+
},
84+
write: (messageWithoutKey: Omit<ChangeMessage<TOutput>, `key`>) => {
85+
const pendingTransaction =
86+
this.pendingSyncedTransactions[
87+
this.pendingSyncedTransactions.length - 1
88+
]
89+
if (!pendingTransaction) {
90+
throw new NoPendingSyncTransactionWriteError()
10091
}
101-
}
102-
103-
const message: ChangeMessage<TOutput> = {
104-
...messageWithoutKey,
105-
key,
106-
}
107-
pendingTransaction.operations.push(message)
108-
109-
if (messageWithoutKey.type === `delete`) {
110-
pendingTransaction.deletedKeys.add(key)
111-
}
112-
},
113-
commit: () => {
114-
const pendingTransaction =
115-
state.pendingSyncedTransactions[
116-
state.pendingSyncedTransactions.length - 1
117-
]
118-
if (!pendingTransaction) {
119-
throw new NoPendingSyncTransactionCommitError()
120-
}
121-
if (pendingTransaction.committed) {
122-
throw new SyncTransactionAlreadyCommittedError()
123-
}
124-
125-
pendingTransaction.committed = true
126-
127-
// Update status to initialCommit when transitioning from loading
128-
// This indicates we're in the process of committing the first transaction
129-
if (this.lifecycle.status === `loading`) {
130-
this.lifecycle.setStatus(`initialCommit`)
131-
}
132-
133-
state.commitPendingTransactions()
134-
},
135-
markReady: () => {
136-
this.lifecycle.markReady()
137-
},
138-
truncate: () => {
139-
const pendingTransaction =
140-
state.pendingSyncedTransactions[
141-
state.pendingSyncedTransactions.length - 1
142-
]
143-
if (!pendingTransaction) {
144-
throw new NoPendingSyncTransactionWriteError()
145-
}
146-
if (pendingTransaction.committed) {
147-
throw new SyncTransactionAlreadyCommittedWriteError()
148-
}
149-
150-
// Clear all operations from the current transaction
151-
pendingTransaction.operations = []
152-
pendingTransaction.deletedKeys.clear()
153-
154-
// Mark the transaction as a truncate operation. During commit, this triggers:
155-
// - Delete events for all previously synced keys (excluding optimistic-deleted keys)
156-
// - Clearing of syncedData/syncedMetadata
157-
// - Subsequent synced ops applied on the fresh base
158-
// - Finally, optimistic mutations re-applied on top (single batch)
159-
pendingTransaction.truncate = true
160-
},
161-
})
92+
if (pendingTransaction.committed) {
93+
throw new SyncTransactionAlreadyCommittedWriteError()
94+
}
95+
const key = this.getKeyFromItem(messageWithoutKey.value)
96+
97+
// Check if an item with this key already exists when inserting
98+
if (messageWithoutKey.type === `insert`) {
99+
const insertingIntoExistingSynced = this.syncedData.has(key)
100+
const hasPendingDeleteForKey =
101+
pendingTransaction.deletedKeys.has(key)
102+
const isTruncateTransaction = pendingTransaction.truncate === true
103+
// Allow insert after truncate in the same transaction even if it existed in syncedData
104+
if (
105+
insertingIntoExistingSynced &&
106+
!hasPendingDeleteForKey &&
107+
!isTruncateTransaction
108+
) {
109+
throw new DuplicateKeySyncError(key, this.id)
110+
}
111+
}
112+
113+
const message: ChangeMessage<TOutput> = {
114+
...messageWithoutKey,
115+
key,
116+
}
117+
pendingTransaction.operations.push(message)
118+
119+
if (messageWithoutKey.type === `delete`) {
120+
pendingTransaction.deletedKeys.add(key)
121+
}
122+
},
123+
commit: () => {
124+
const pendingTransaction =
125+
this.pendingSyncedTransactions[
126+
this.pendingSyncedTransactions.length - 1
127+
]
128+
if (!pendingTransaction) {
129+
throw new NoPendingSyncTransactionCommitError()
130+
}
131+
if (pendingTransaction.committed) {
132+
throw new SyncTransactionAlreadyCommittedError()
133+
}
134+
135+
pendingTransaction.committed = true
136+
137+
// Update status to initialCommit when transitioning from loading
138+
// This indicates we're in the process of committing the first transaction
139+
if (this._status === `loading`) {
140+
this.setStatus(`initialCommit`)
141+
}
142+
143+
this.commitPendingTransactions()
144+
},
145+
markReady: () => {
146+
this.markReady()
147+
},
148+
truncate: () => {
149+
const pendingTransaction =
150+
this.pendingSyncedTransactions[
151+
this.pendingSyncedTransactions.length - 1
152+
]
153+
if (!pendingTransaction) {
154+
throw new NoPendingSyncTransactionWriteError()
155+
}
156+
if (pendingTransaction.committed) {
157+
throw new SyncTransactionAlreadyCommittedWriteError()
158+
}
159+
160+
// Clear all operations from the current transaction
161+
pendingTransaction.operations = []
162+
pendingTransaction.deletedKeys.clear()
163+
164+
// Mark the transaction as a truncate operation. During commit, this triggers:
165+
// - Delete events for all previously synced keys (excluding optimistic-deleted keys)
166+
// - Clearing of syncedData/syncedMetadata
167+
// - Subsequent synced ops applied on the fresh base
168+
// - Finally, optimistic mutations re-applied on top (single batch)
169+
pendingTransaction.truncate = true
170+
},
171+
})
172+
)
162173

163174
// Store cleanup function if provided
164-
this.syncCleanupFn = typeof cleanupFn === `function` ? cleanupFn : null
175+
this.syncCleanupFn = syncRes?.cleanup ?? null
176+
177+
// Store onLoadMore function if provided
178+
this.syncOnLoadMoreFn = syncRes?.onLoadMore ?? null
165179
} catch (error) {
166180
this.lifecycle.setStatus(`error`)
167181
throw error
@@ -210,6 +224,18 @@ export class CollectionSyncManager<
210224
return this.preloadPromise
211225
}
212226

227+
/**
228+
* Requests the sync layer to load more data.
229+
* @param options Options to control what data is being loaded
230+
* @returns If data loading is asynchronous, this method returns a promise that resolves when the data is loaded.
231+
* If data loading is synchronous, the data is loaded when the method returns.
232+
*/
233+
public syncMore(options: OnLoadMoreOptions): void | Promise<void> {
234+
if (this.syncOnLoadMoreFn) {
235+
return this.syncOnLoadMoreFn(options)
236+
}
237+
}
238+
213239
public cleanup(): void {
214240
try {
215241
if (this.syncCleanupFn) {
@@ -233,3 +259,15 @@ export class CollectionSyncManager<
233259
this.preloadPromise = null
234260
}
235261
}
262+
263+
function normalizeSyncFnResult(result: void | CleanupFn | SyncConfigRes) {
264+
if (typeof result === `function`) {
265+
return { cleanup: result }
266+
}
267+
268+
if (typeof result === `object`) {
269+
return result
270+
}
271+
272+
return undefined
273+
}

packages/db/src/query/compiler/order-by.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,14 @@ import { compileExpression } from "./evaluators.js"
77
import { replaceAggregatesByRefs } from "./group-by.js"
88
import { followRef } from "./index.js"
99
import type { CompiledSingleRowExpression } from "./evaluators.js"
10-
import type { OrderByClause, QueryIR, Select } from "../ir.js"
10+
import type { OrderBy, OrderByClause, QueryIR, Select } from "../ir.js"
1111
import type { NamespacedAndKeyedStream, NamespacedRow } from "../../types.js"
1212
import type { IStreamBuilder, KeyValue } from "@tanstack/db-ivm"
1313
import type { BaseIndex } from "../../indexes/base-index.js"
1414
import type { Collection } from "../../collection/index.js"
1515

1616
export type OrderByOptimizationInfo = {
17+
orderBy: OrderBy
1718
offset: number
1819
limit: number
1920
comparator: (
@@ -164,6 +165,7 @@ export function processOrderBy(
164165
comparator,
165166
valueExtractorForRawRow,
166167
index,
168+
orderBy: orderByClause,
167169
}
168170

169171
optimizableOrderByCollections[followRefCollection.id] =

packages/db/src/query/live/collection-subscriber.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ export class CollectionSubscriber<
128128
private subscribeToOrderedChanges(
129129
whereExpression: BasicExpression<boolean> | undefined
130130
) {
131-
const { offset, limit, comparator, dataNeeded, index } =
131+
const { orderBy, offset, limit, comparator, dataNeeded, index } =
132132
this.collectionConfigBuilder.optimizableOrderByCollections[
133133
this.collectionId
134134
]!
@@ -168,6 +168,7 @@ export class CollectionSubscriber<
168168
// i.e. the K items from the collection that fall into the requested range: [offset, offset + limit[
169169
subscription.requestLimitedSnapshot({
170170
limit: offset + limit,
171+
orderBy,
171172
})
172173

173174
return subscription
@@ -225,7 +226,7 @@ export class CollectionSubscriber<
225226
// Loads the next `n` items from the collection
226227
// starting from the biggest item it has sent
227228
private loadNextItems(n: number, subscription: CollectionSubscription) {
228-
const { valueExtractorForRawRow } =
229+
const { orderBy, valueExtractorForRawRow } =
229230
this.collectionConfigBuilder.optimizableOrderByCollections[
230231
this.collectionId
231232
]!
@@ -235,6 +236,7 @@ export class CollectionSubscriber<
235236
: biggestSentRow
236237
// Take the `n` items after the biggest sent value
237238
subscription.requestLimitedSnapshot({
239+
orderBy,
238240
limit: n,
239241
minValue: biggestSentValue,
240242
})

0 commit comments

Comments
 (0)