Skip to content

Commit 684c70e

Browse files
committed
Add onLoadMore callback and call it from the requestSnapshot methods
1 parent 6a3e6f3 commit 684c70e

File tree

6 files changed

+192
-106
lines changed

6 files changed

+192
-106
lines changed

packages/db/src/collection/index.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import type {
2424
InferSchemaInput,
2525
InferSchemaOutput,
2626
InsertConfig,
27+
OnLoadMoreOptions,
2728
OperationConfig,
2829
SubscribeChangesOptions,
2930
Transaction as TransactionType,
@@ -332,6 +333,18 @@ export class CollectionImpl<
332333
this._sync.startSync()
333334
}
334335

336+
/**
337+
* Requests the sync layer to load more data.
338+
* @param options Options to control what data is being loaded
339+
* @returns If data loading is asynchronous, this method returns a promise that resolves when the data is loaded.
340+
* If data loading is synchronous, the data is loaded when the method returns.
341+
*/
342+
public syncMore(options: OnLoadMoreOptions): void | Promise<void> {
343+
if (this._sync.syncOnLoadMoreFn) {
344+
return this._sync.syncOnLoadMoreFn(options)
345+
}
346+
}
347+
335348
/**
336349
* Preload the collection data by starting sync if not already started
337350
* Multiple concurrent calls will share the same promise

packages/db/src/collection/subscription.ts

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import {
44
createFilterFunctionFromExpression,
55
createFilteredCallback,
66
} from "./change-events.js"
7-
import type { BasicExpression } from "../query/ir.js"
7+
import type { BasicExpression, OrderBy } from "../query/ir.js"
88
import type { BaseIndex } from "../indexes/base-index.js"
99
import type { ChangeMessage } from "../types.js"
1010
import type { CollectionImpl } from "./index.js"
@@ -15,8 +15,9 @@ type RequestSnapshotOptions = {
1515
}
1616

1717
type RequestLimitedSnapshotOptions = {
18-
minValue?: any
18+
orderBy: OrderBy
1919
limit: number
20+
minValue?: any
2021
}
2122

2223
type CollectionSubscriptionOptions = {
@@ -117,6 +118,13 @@ export class CollectionSubscription {
117118
this.loadedInitialState = true
118119
}
119120

121+
// Request the sync layer to load more data
122+
// don't await it, we will load the data into the collection when it comes in
123+
this.collection.syncMore({
124+
where: stateOpts.where,
125+
})
126+
127+
// Also load data immediately from the collection
120128
const snapshot = this.collection.currentStateAsChanges(stateOpts)
121129

122130
if (snapshot === undefined) {
@@ -140,7 +148,11 @@ export class CollectionSubscription {
140148
* It uses that range index to load the items in the order of the index.
141149
* Note: it does not send keys that have already been sent before.
142150
*/
143-
requestLimitedSnapshot({ limit, minValue }: RequestLimitedSnapshotOptions) {
151+
requestLimitedSnapshot({
152+
orderBy,
153+
limit,
154+
minValue,
155+
}: RequestLimitedSnapshotOptions) {
144156
if (!limit) throw new Error(`limit is required`)
145157

146158
if (!this.orderByIndex) {
@@ -190,6 +202,14 @@ export class CollectionSubscription {
190202
}
191203

192204
this.callback(changes)
205+
206+
// Request the sync layer to load more data
207+
// don't await it, we will load the data into the collection when it comes in
208+
this.collection.syncMore({
209+
where,
210+
limit,
211+
orderBy,
212+
})
193213
}
194214

195215
/**

packages/db/src/collection/sync.ts

Lines changed: 136 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,13 @@ import {
88
SyncTransactionAlreadyCommittedWriteError,
99
} from "../errors"
1010
import type { StandardSchemaV1 } from "@standard-schema/spec"
11-
import type { ChangeMessage, CollectionConfig } from "../types"
11+
import type {
12+
ChangeMessage,
13+
CollectionConfig,
14+
CleanupFn,
15+
OnLoadMoreOptions,
16+
SyncConfigRes,
17+
} from "../types"
1218
import type { CollectionImpl } from "./index.js"
1319
import type { CollectionStateManager } from "./state"
1420
import type { CollectionLifecycleManager } from "./lifecycle"
@@ -27,6 +33,9 @@ export class CollectionSyncManager<
2733

2834
public preloadPromise: Promise<void> | null = null
2935
public syncCleanupFn: (() => void) | null = null
36+
public syncOnLoadMoreFn:
37+
| ((options: OnLoadMoreOptions) => void | Promise<void>)
38+
| null = null
3039

3140
/**
3241
* Creates a new CollectionSyncManager instance
@@ -51,7 +60,6 @@ export class CollectionSyncManager<
5160
* This is called when the collection is first accessed or preloaded
5261
*/
5362
public startSync(): void {
54-
const state = this.state
5563
if (
5664
this.lifecycle.status !== `idle` &&
5765
this.lifecycle.status !== `cleaned-up`
@@ -62,106 +70,111 @@ export class CollectionSyncManager<
6270
this.lifecycle.setStatus(`loading`)
6371

6472
try {
65-
const cleanupFn = this.config.sync.sync({
66-
collection: this.collection,
67-
begin: () => {
68-
state.pendingSyncedTransactions.push({
69-
committed: false,
70-
operations: [],
71-
deletedKeys: new Set(),
72-
})
73-
},
74-
write: (messageWithoutKey: Omit<ChangeMessage<TOutput>, `key`>) => {
75-
const pendingTransaction =
76-
state.pendingSyncedTransactions[
77-
state.pendingSyncedTransactions.length - 1
78-
]
79-
if (!pendingTransaction) {
80-
throw new NoPendingSyncTransactionWriteError()
81-
}
82-
if (pendingTransaction.committed) {
83-
throw new SyncTransactionAlreadyCommittedWriteError()
84-
}
85-
const key = this.config.getKey(messageWithoutKey.value)
86-
87-
// Check if an item with this key already exists when inserting
88-
if (messageWithoutKey.type === `insert`) {
89-
const insertingIntoExistingSynced = state.syncedData.has(key)
90-
const hasPendingDeleteForKey =
91-
pendingTransaction.deletedKeys.has(key)
92-
const isTruncateTransaction = pendingTransaction.truncate === true
93-
// Allow insert after truncate in the same transaction even if it existed in syncedData
94-
if (
95-
insertingIntoExistingSynced &&
96-
!hasPendingDeleteForKey &&
97-
!isTruncateTransaction
98-
) {
99-
throw new DuplicateKeySyncError(key, this.id)
73+
const syncRes = normalizeSyncFnResult(
74+
this.config.sync.sync({
75+
collection: this.collection,
76+
begin: () => {
77+
this.state.pendingSyncedTransactions.push({
78+
committed: false,
79+
operations: [],
80+
deletedKeys: new Set(),
81+
})
82+
},
83+
write: (messageWithoutKey: Omit<ChangeMessage<TOutput>, `key`>) => {
84+
const pendingTransaction =
85+
this.state.pendingSyncedTransactions[
86+
this.state.pendingSyncedTransactions.length - 1
87+
]
88+
if (!pendingTransaction) {
89+
throw new NoPendingSyncTransactionWriteError()
10090
}
101-
}
102-
103-
const message: ChangeMessage<TOutput> = {
104-
...messageWithoutKey,
105-
key,
106-
}
107-
pendingTransaction.operations.push(message)
108-
109-
if (messageWithoutKey.type === `delete`) {
110-
pendingTransaction.deletedKeys.add(key)
111-
}
112-
},
113-
commit: () => {
114-
const pendingTransaction =
115-
state.pendingSyncedTransactions[
116-
state.pendingSyncedTransactions.length - 1
117-
]
118-
if (!pendingTransaction) {
119-
throw new NoPendingSyncTransactionCommitError()
120-
}
121-
if (pendingTransaction.committed) {
122-
throw new SyncTransactionAlreadyCommittedError()
123-
}
124-
125-
pendingTransaction.committed = true
126-
127-
// Update status to initialCommit when transitioning from loading
128-
// This indicates we're in the process of committing the first transaction
129-
if (this.lifecycle.status === `loading`) {
130-
this.lifecycle.setStatus(`initialCommit`)
131-
}
132-
133-
state.commitPendingTransactions()
134-
},
135-
markReady: () => {
136-
this.lifecycle.markReady()
137-
},
138-
truncate: () => {
139-
const pendingTransaction =
140-
state.pendingSyncedTransactions[
141-
state.pendingSyncedTransactions.length - 1
142-
]
143-
if (!pendingTransaction) {
144-
throw new NoPendingSyncTransactionWriteError()
145-
}
146-
if (pendingTransaction.committed) {
147-
throw new SyncTransactionAlreadyCommittedWriteError()
148-
}
149-
150-
// Clear all operations from the current transaction
151-
pendingTransaction.operations = []
152-
pendingTransaction.deletedKeys.clear()
153-
154-
// Mark the transaction as a truncate operation. During commit, this triggers:
155-
// - Delete events for all previously synced keys (excluding optimistic-deleted keys)
156-
// - Clearing of syncedData/syncedMetadata
157-
// - Subsequent synced ops applied on the fresh base
158-
// - Finally, optimistic mutations re-applied on top (single batch)
159-
pendingTransaction.truncate = true
160-
},
161-
})
91+
if (pendingTransaction.committed) {
92+
throw new SyncTransactionAlreadyCommittedWriteError()
93+
}
94+
const key = this.config.getKey(messageWithoutKey.value)
95+
96+
// Check if an item with this key already exists when inserting
97+
if (messageWithoutKey.type === `insert`) {
98+
const insertingIntoExistingSynced = this.state.syncedData.has(key)
99+
const hasPendingDeleteForKey =
100+
pendingTransaction.deletedKeys.has(key)
101+
const isTruncateTransaction = pendingTransaction.truncate === true
102+
// Allow insert after truncate in the same transaction even if it existed in syncedData
103+
if (
104+
insertingIntoExistingSynced &&
105+
!hasPendingDeleteForKey &&
106+
!isTruncateTransaction
107+
) {
108+
throw new DuplicateKeySyncError(key, this.id)
109+
}
110+
}
111+
112+
const message: ChangeMessage<TOutput> = {
113+
...messageWithoutKey,
114+
key,
115+
}
116+
pendingTransaction.operations.push(message)
117+
118+
if (messageWithoutKey.type === `delete`) {
119+
pendingTransaction.deletedKeys.add(key)
120+
}
121+
},
122+
commit: () => {
123+
const pendingTransaction =
124+
this.state.pendingSyncedTransactions[
125+
this.state.pendingSyncedTransactions.length - 1
126+
]
127+
if (!pendingTransaction) {
128+
throw new NoPendingSyncTransactionCommitError()
129+
}
130+
if (pendingTransaction.committed) {
131+
throw new SyncTransactionAlreadyCommittedError()
132+
}
133+
134+
pendingTransaction.committed = true
135+
136+
// Update status to initialCommit when transitioning from loading
137+
// This indicates we're in the process of committing the first transaction
138+
if (this.lifecycle.status === `loading`) {
139+
this.lifecycle.setStatus(`initialCommit`)
140+
}
141+
142+
this.state.commitPendingTransactions()
143+
},
144+
markReady: () => {
145+
this.lifecycle.markReady()
146+
},
147+
truncate: () => {
148+
const pendingTransaction =
149+
this.state.pendingSyncedTransactions[
150+
this.state.pendingSyncedTransactions.length - 1
151+
]
152+
if (!pendingTransaction) {
153+
throw new NoPendingSyncTransactionWriteError()
154+
}
155+
if (pendingTransaction.committed) {
156+
throw new SyncTransactionAlreadyCommittedWriteError()
157+
}
158+
159+
// Clear all operations from the current transaction
160+
pendingTransaction.operations = []
161+
pendingTransaction.deletedKeys.clear()
162+
163+
// Mark the transaction as a truncate operation. During commit, this triggers:
164+
// - Delete events for all previously synced keys (excluding optimistic-deleted keys)
165+
// - Clearing of syncedData/syncedMetadata
166+
// - Subsequent synced ops applied on the fresh base
167+
// - Finally, optimistic mutations re-applied on top (single batch)
168+
pendingTransaction.truncate = true
169+
},
170+
})
171+
)
162172

163173
// Store cleanup function if provided
164-
this.syncCleanupFn = typeof cleanupFn === `function` ? cleanupFn : null
174+
this.syncCleanupFn = syncRes?.cleanup ?? null
175+
176+
// Store onLoadMore function if provided
177+
this.syncOnLoadMoreFn = syncRes?.onLoadMore ?? null
165178
} catch (error) {
166179
this.lifecycle.setStatus(`error`)
167180
throw error
@@ -210,6 +223,18 @@ export class CollectionSyncManager<
210223
return this.preloadPromise
211224
}
212225

226+
/**
227+
* Requests the sync layer to load more data.
228+
* @param options Options to control what data is being loaded
229+
* @returns If data loading is asynchronous, this method returns a promise that resolves when the data is loaded.
230+
* If data loading is synchronous, the data is loaded when the method returns.
231+
*/
232+
public syncMore(options: OnLoadMoreOptions): void | Promise<void> {
233+
if (this.syncOnLoadMoreFn) {
234+
return this.syncOnLoadMoreFn(options)
235+
}
236+
}
237+
213238
public cleanup(): void {
214239
try {
215240
if (this.syncCleanupFn) {
@@ -233,3 +258,15 @@ export class CollectionSyncManager<
233258
this.preloadPromise = null
234259
}
235260
}
261+
262+
function normalizeSyncFnResult(result: void | CleanupFn | SyncConfigRes) {
263+
if (typeof result === `function`) {
264+
return { cleanup: result }
265+
}
266+
267+
if (typeof result === `object`) {
268+
return result
269+
}
270+
271+
return undefined
272+
}

packages/db/src/query/compiler/order-by.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@ import { findIndexForField } from "../../utils/index-optimization.js"
66
import { compileExpression } from "./evaluators.js"
77
import { replaceAggregatesByRefs } from "./group-by.js"
88
import type { CompiledSingleRowExpression } from "./evaluators.js"
9-
import type { OrderByClause, QueryIR, Select } from "../ir.js"
9+
import type { OrderBy, OrderByClause, QueryIR, Select } from "../ir.js"
1010
import type { NamespacedAndKeyedStream, NamespacedRow } from "../../types.js"
1111
import type { IStreamBuilder, KeyValue } from "@tanstack/db-ivm"
1212
import type { BaseIndex } from "../../indexes/base-index.js"
1313
import type { Collection } from "../../collection/index.js"
1414

1515
export type OrderByOptimizationInfo = {
16+
orderBy: OrderBy
1617
offset: number
1718
limit: number
1819
comparator: (
@@ -163,6 +164,7 @@ export function processOrderBy(
163164
comparator,
164165
valueExtractorForRawRow,
165166
index,
167+
orderBy: orderByClause,
166168
}
167169

168170
optimizableOrderByCollections[followRefCollection.id] =

0 commit comments

Comments
 (0)