Skip to content

Commit 3353b5f

Browse files
committed
Add onLoadMore callback and call it from the requestSnapshot methods
1 parent 9914bf5 commit 3353b5f

File tree

5 files changed

+171
-100
lines changed

5 files changed

+171
-100
lines changed

packages/db/src/collection-subscription.ts

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import {
44
} from "./change-events.js"
55
import { ensureIndexForExpression } from "./indexes/auto-index.js"
66
import { and } from "./query/index.js"
7-
import type { BasicExpression } from "./query/ir.js"
7+
import type { BasicExpression, OrderBy } from "./query/ir.js"
88
import type { BaseIndex } from "./indexes/base-index.js"
99
import type { ChangeMessage } from "./types.js"
1010
import type { Collection } from "./collection.js"
@@ -15,8 +15,9 @@ type RequestSnapshotOptions = {
1515
}
1616

1717
type RequestLimitedSnapshotOptions = {
18-
minValue?: any
18+
orderBy: OrderBy
1919
limit: number
20+
minValue?: any
2021
}
2122

2223
type CollectionSubscriptionOptions = {
@@ -117,6 +118,13 @@ export class CollectionSubscription {
117118
this.loadedInitialState = true
118119
}
119120

121+
// Request the sync layer to load more data
122+
// don't await it, we will load the data into the collection when it comes in
123+
this.collection.syncMore({
124+
where: stateOpts.where,
125+
})
126+
127+
// Also load data immediately from the collection
120128
const snapshot = this.collection.currentStateAsChanges(stateOpts)
121129

122130
if (snapshot === undefined) {
@@ -140,7 +148,11 @@ export class CollectionSubscription {
140148
* It uses that range index to load the items in the order of the index.
141149
* Note: it does not send keys that have already been sent before.
142150
*/
143-
requestLimitedSnapshot({ limit, minValue }: RequestLimitedSnapshotOptions) {
151+
requestLimitedSnapshot({
152+
orderBy,
153+
limit,
154+
minValue,
155+
}: RequestLimitedSnapshotOptions) {
144156
if (!limit) throw new Error(`limit is required`)
145157

146158
if (!this.orderByIndex) {
@@ -190,6 +202,14 @@ export class CollectionSubscription {
190202
}
191203

192204
this.callback(changes)
205+
206+
// Request the sync layer to load more data
207+
// don't await it, we will load the data into the collection when it comes in
208+
this.collection.syncMore({
209+
where,
210+
limit,
211+
orderBy,
212+
})
193213
}
194214

195215
/**

packages/db/src/collection.ts

Lines changed: 128 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -37,29 +37,32 @@ import {
3737
UpdateKeyNotFoundError,
3838
} from "./errors"
3939
import { CollectionEvents } from "./collection-events.js"
40+
import { currentStateAsChanges } from "./change-events"
41+
import { CollectionSubscription } from "./collection-subscription.js"
4042
import type {
4143
AllCollectionEvents,
4244
CollectionEventHandler,
4345
} from "./collection-events.js"
44-
import { currentStateAsChanges } from "./change-events"
45-
import { CollectionSubscription } from "./collection-subscription.js"
4646
import type { Transaction } from "./transactions"
4747
import type { StandardSchemaV1 } from "@standard-schema/spec"
4848
import type { SingleRowRefProxy } from "./query/builder/ref-proxy"
4949
import type {
5050
ChangeMessage,
51+
CleanupFn,
5152
CollectionConfig,
5253
CollectionStatus,
5354
CurrentStateAsChangesOptions,
5455
Fn,
5556
InferSchemaInput,
5657
InferSchemaOutput,
5758
InsertConfig,
59+
OnLoadMoreOptions,
5860
OperationConfig,
5961
OptimisticChangeMessage,
6062
PendingMutation,
6163
StandardSchema,
6264
SubscribeChangesOptions,
65+
SyncConfigRes,
6366
Transaction as TransactionType,
6467
TransactionWithMutations,
6568
UtilsRecord,
@@ -266,6 +269,9 @@ export class CollectionImpl<
266269
private gcTimeoutId: ReturnType<typeof setTimeout> | null = null
267270
private preloadPromise: Promise<void> | null = null
268271
private syncCleanupFn: (() => void) | null = null
272+
private syncOnLoadMoreFn:
273+
| ((options: OnLoadMoreOptions) => void | Promise<void>)
274+
| null = null
269275

270276
// Event system
271277
private events: CollectionEvents
@@ -488,106 +494,111 @@ export class CollectionImpl<
488494
this.setStatus(`loading`)
489495

490496
try {
491-
const cleanupFn = this.config.sync.sync({
492-
collection: this,
493-
begin: () => {
494-
this.pendingSyncedTransactions.push({
495-
committed: false,
496-
operations: [],
497-
deletedKeys: new Set(),
498-
})
499-
},
500-
write: (messageWithoutKey: Omit<ChangeMessage<TOutput>, `key`>) => {
501-
const pendingTransaction =
502-
this.pendingSyncedTransactions[
503-
this.pendingSyncedTransactions.length - 1
504-
]
505-
if (!pendingTransaction) {
506-
throw new NoPendingSyncTransactionWriteError()
507-
}
508-
if (pendingTransaction.committed) {
509-
throw new SyncTransactionAlreadyCommittedWriteError()
510-
}
511-
const key = this.getKeyFromItem(messageWithoutKey.value)
512-
513-
// Check if an item with this key already exists when inserting
514-
if (messageWithoutKey.type === `insert`) {
515-
const insertingIntoExistingSynced = this.syncedData.has(key)
516-
const hasPendingDeleteForKey =
517-
pendingTransaction.deletedKeys.has(key)
518-
const isTruncateTransaction = pendingTransaction.truncate === true
519-
// Allow insert after truncate in the same transaction even if it existed in syncedData
520-
if (
521-
insertingIntoExistingSynced &&
522-
!hasPendingDeleteForKey &&
523-
!isTruncateTransaction
524-
) {
525-
throw new DuplicateKeySyncError(key, this.id)
497+
const syncRes = normalizeSyncFnResult(
498+
this.config.sync.sync({
499+
collection: this,
500+
begin: () => {
501+
this.pendingSyncedTransactions.push({
502+
committed: false,
503+
operations: [],
504+
deletedKeys: new Set(),
505+
})
506+
},
507+
write: (messageWithoutKey: Omit<ChangeMessage<TOutput>, `key`>) => {
508+
const pendingTransaction =
509+
this.pendingSyncedTransactions[
510+
this.pendingSyncedTransactions.length - 1
511+
]
512+
if (!pendingTransaction) {
513+
throw new NoPendingSyncTransactionWriteError()
514+
}
515+
if (pendingTransaction.committed) {
516+
throw new SyncTransactionAlreadyCommittedWriteError()
517+
}
518+
const key = this.getKeyFromItem(messageWithoutKey.value)
519+
520+
// Check if an item with this key already exists when inserting
521+
if (messageWithoutKey.type === `insert`) {
522+
const insertingIntoExistingSynced = this.syncedData.has(key)
523+
const hasPendingDeleteForKey =
524+
pendingTransaction.deletedKeys.has(key)
525+
const isTruncateTransaction = pendingTransaction.truncate === true
526+
// Allow insert after truncate in the same transaction even if it existed in syncedData
527+
if (
528+
insertingIntoExistingSynced &&
529+
!hasPendingDeleteForKey &&
530+
!isTruncateTransaction
531+
) {
532+
throw new DuplicateKeySyncError(key, this.id)
533+
}
526534
}
527-
}
528535

529-
const message: ChangeMessage<TOutput> = {
530-
...messageWithoutKey,
531-
key,
532-
}
533-
pendingTransaction.operations.push(message)
536+
const message: ChangeMessage<TOutput> = {
537+
...messageWithoutKey,
538+
key,
539+
}
540+
pendingTransaction.operations.push(message)
534541

535-
if (messageWithoutKey.type === `delete`) {
536-
pendingTransaction.deletedKeys.add(key)
537-
}
538-
},
539-
commit: () => {
540-
const pendingTransaction =
541-
this.pendingSyncedTransactions[
542-
this.pendingSyncedTransactions.length - 1
543-
]
544-
if (!pendingTransaction) {
545-
throw new NoPendingSyncTransactionCommitError()
546-
}
547-
if (pendingTransaction.committed) {
548-
throw new SyncTransactionAlreadyCommittedError()
549-
}
542+
if (messageWithoutKey.type === `delete`) {
543+
pendingTransaction.deletedKeys.add(key)
544+
}
545+
},
546+
commit: () => {
547+
const pendingTransaction =
548+
this.pendingSyncedTransactions[
549+
this.pendingSyncedTransactions.length - 1
550+
]
551+
if (!pendingTransaction) {
552+
throw new NoPendingSyncTransactionCommitError()
553+
}
554+
if (pendingTransaction.committed) {
555+
throw new SyncTransactionAlreadyCommittedError()
556+
}
550557

551-
pendingTransaction.committed = true
558+
pendingTransaction.committed = true
552559

553-
// Update status to initialCommit when transitioning from loading
554-
// This indicates we're in the process of committing the first transaction
555-
if (this._status === `loading`) {
556-
this.setStatus(`initialCommit`)
557-
}
558-
559-
this.commitPendingTransactions()
560-
},
561-
markReady: () => {
562-
this.markReady()
563-
},
564-
truncate: () => {
565-
const pendingTransaction =
566-
this.pendingSyncedTransactions[
567-
this.pendingSyncedTransactions.length - 1
568-
]
569-
if (!pendingTransaction) {
570-
throw new NoPendingSyncTransactionWriteError()
571-
}
572-
if (pendingTransaction.committed) {
573-
throw new SyncTransactionAlreadyCommittedWriteError()
574-
}
560+
// Update status to initialCommit when transitioning from loading
561+
// This indicates we're in the process of committing the first transaction
562+
if (this._status === `loading`) {
563+
this.setStatus(`initialCommit`)
564+
}
575565

576-
// Clear all operations from the current transaction
577-
pendingTransaction.operations = []
578-
pendingTransaction.deletedKeys.clear()
566+
this.commitPendingTransactions()
567+
},
568+
markReady: () => {
569+
this.markReady()
570+
},
571+
truncate: () => {
572+
const pendingTransaction =
573+
this.pendingSyncedTransactions[
574+
this.pendingSyncedTransactions.length - 1
575+
]
576+
if (!pendingTransaction) {
577+
throw new NoPendingSyncTransactionWriteError()
578+
}
579+
if (pendingTransaction.committed) {
580+
throw new SyncTransactionAlreadyCommittedWriteError()
581+
}
579582

580-
// Mark the transaction as a truncate operation. During commit, this triggers:
581-
// - Delete events for all previously synced keys (excluding optimistic-deleted keys)
582-
// - Clearing of syncedData/syncedMetadata
583-
// - Subsequent synced ops applied on the fresh base
584-
// - Finally, optimistic mutations re-applied on top (single batch)
585-
pendingTransaction.truncate = true
586-
},
587-
})
583+
// Clear all operations from the current transaction
584+
pendingTransaction.operations = []
585+
pendingTransaction.deletedKeys.clear()
586+
587+
// Mark the transaction as a truncate operation. During commit, this triggers:
588+
// - Delete events for all previously synced keys (excluding optimistic-deleted keys)
589+
// - Clearing of syncedData/syncedMetadata
590+
// - Subsequent synced ops applied on the fresh base
591+
// - Finally, optimistic mutations re-applied on top (single batch)
592+
pendingTransaction.truncate = true
593+
},
594+
})
595+
)
588596

589597
// Store cleanup function if provided
590-
this.syncCleanupFn = typeof cleanupFn === `function` ? cleanupFn : null
598+
this.syncCleanupFn = syncRes?.cleanup ?? null
599+
600+
// Store onLoadMore function if provided
601+
this.syncOnLoadMoreFn = syncRes?.onLoadMore ?? null
591602
} catch (error) {
592603
this.setStatus(`error`)
593604
throw error
@@ -633,6 +644,18 @@ export class CollectionImpl<
633644
return this.preloadPromise
634645
}
635646

647+
/**
648+
* Requests the sync layer to load more data.
649+
* @param options Options to control what data is being loaded
650+
* @returns If data loading is asynchronous, this method returns a promise that resolves when the data is loaded.
651+
* If data loading is synchronous, the data is loaded when the method returns.
652+
*/
653+
public syncMore(options: OnLoadMoreOptions): void | Promise<void> {
654+
if (this.syncOnLoadMoreFn) {
655+
return this.syncOnLoadMoreFn(options)
656+
}
657+
}
658+
636659
/**
637660
* Clean up the collection by stopping sync and clearing data
638661
* This can be called manually or automatically by garbage collection
@@ -2478,3 +2501,15 @@ export class CollectionImpl<
24782501
return this.events.waitFor(event, timeout)
24792502
}
24802503
}
2504+
2505+
function normalizeSyncFnResult(result: void | CleanupFn | SyncConfigRes) {
2506+
if (typeof result === `function`) {
2507+
return { cleanup: result }
2508+
}
2509+
2510+
if (typeof result === `object`) {
2511+
return result
2512+
}
2513+
2514+
return undefined
2515+
}

packages/db/src/query/compiler/order-by.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,14 @@ import { compileExpression } from "./evaluators.js"
77
import { replaceAggregatesByRefs } from "./group-by.js"
88
import { followRef } from "./index.js"
99
import type { CompiledSingleRowExpression } from "./evaluators.js"
10-
import type { OrderByClause, QueryIR, Select } from "../ir.js"
10+
import type { OrderBy, OrderByClause, QueryIR, Select } from "../ir.js"
1111
import type { NamespacedAndKeyedStream, NamespacedRow } from "../../types.js"
1212
import type { IStreamBuilder, KeyValue } from "@tanstack/db-ivm"
1313
import type { BaseIndex } from "../../indexes/base-index.js"
1414
import type { Collection } from "../../collection.js"
1515

1616
export type OrderByOptimizationInfo = {
17+
orderBy: OrderBy
1718
offset: number
1819
limit: number
1920
comparator: (
@@ -164,6 +165,7 @@ export function processOrderBy(
164165
comparator,
165166
valueExtractorForRawRow,
166167
index,
168+
orderBy: orderByClause,
167169
}
168170

169171
optimizableOrderByCollections[followRefCollection.id] =

0 commit comments

Comments
 (0)