Skip to content

Commit 73c02c9

Browse files
committed
Add onLoadMore callback and call it from the requestSnapshot methods
1 parent c807114 commit 73c02c9

File tree

6 files changed

+191
-105
lines changed

6 files changed

+191
-105
lines changed

packages/db/src/collection/index.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import type {
2525
InferSchemaOutput,
2626
InsertConfig,
2727
NonSingleResult,
28+
OnLoadMoreOptions,
2829
OperationConfig,
2930
SingleResult,
3031
SubscribeChangesOptions,
@@ -362,6 +363,18 @@ export class CollectionImpl<
362363
this._sync.startSync()
363364
}
364365

366+
/**
367+
* Requests the sync layer to load more data.
368+
* @param options Options to control what data is being loaded
369+
* @returns If data loading is asynchronous, this method returns a promise that resolves when the data is loaded.
370+
* If data loading is synchronous, the data is loaded when the method returns.
371+
*/
372+
public syncMore(options: OnLoadMoreOptions): void | Promise<void> {
373+
if (this._sync.syncOnLoadMoreFn) {
374+
return this._sync.syncOnLoadMoreFn(options)
375+
}
376+
}
377+
365378
/**
366379
* Preload the collection data by starting sync if not already started
367380
* Multiple concurrent calls will share the same promise

packages/db/src/collection/subscription.ts

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ import {
44
createFilterFunctionFromExpression,
55
createFilteredCallback,
66
} from "./change-events.js"
7-
import type { BasicExpression } from "../query/ir.js"
87
import type { IndexInterface } from "../indexes/base-index.js"
8+
import type { BasicExpression, OrderBy } from "../query/ir.js"
99
import type { ChangeMessage } from "../types.js"
1010
import type { CollectionImpl } from "./index.js"
1111

@@ -15,8 +15,9 @@ type RequestSnapshotOptions = {
1515
}
1616

1717
type RequestLimitedSnapshotOptions = {
18-
minValue?: any
18+
orderBy: OrderBy
1919
limit: number
20+
minValue?: any
2021
}
2122

2223
type CollectionSubscriptionOptions = {
@@ -117,6 +118,13 @@ export class CollectionSubscription {
117118
this.loadedInitialState = true
118119
}
119120

121+
// Request the sync layer to load more data
122+
// don't await it, we will load the data into the collection when it comes in
123+
this.collection.syncMore({
124+
where: stateOpts.where,
125+
})
126+
127+
// Also load data immediately from the collection
120128
const snapshot = this.collection.currentStateAsChanges(stateOpts)
121129

122130
if (snapshot === undefined) {
@@ -140,7 +148,11 @@ export class CollectionSubscription {
140148
* It uses that range index to load the items in the order of the index.
141149
* Note: it does not send keys that have already been sent before.
142150
*/
143-
requestLimitedSnapshot({ limit, minValue }: RequestLimitedSnapshotOptions) {
151+
requestLimitedSnapshot({
152+
orderBy,
153+
limit,
154+
minValue,
155+
}: RequestLimitedSnapshotOptions) {
144156
if (!limit) throw new Error(`limit is required`)
145157

146158
if (!this.orderByIndex) {
@@ -190,6 +202,14 @@ export class CollectionSubscription {
190202
}
191203

192204
this.callback(changes)
205+
206+
// Request the sync layer to load more data
207+
// don't await it, we will load the data into the collection when it comes in
208+
this.collection.syncMore({
209+
where,
210+
limit,
211+
orderBy,
212+
})
193213
}
194214

195215
/**

packages/db/src/collection/sync.ts

Lines changed: 135 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,13 @@ import {
99
} from "../errors"
1010
import { deepEquals } from "../utils"
1111
import type { StandardSchemaV1 } from "@standard-schema/spec"
12-
import type { ChangeMessage, CollectionConfig } from "../types"
12+
import type {
13+
ChangeMessage,
14+
CollectionConfig,
15+
CleanupFn,
16+
OnLoadMoreOptions,
17+
SyncConfigRes,
18+
} from "../types"
1319
import type { CollectionImpl } from "./index.js"
1420
import type { CollectionStateManager } from "./state"
1521
import type { CollectionLifecycleManager } from "./lifecycle"
@@ -28,6 +34,9 @@ export class CollectionSyncManager<
2834

2935
public preloadPromise: Promise<void> | null = null
3036
public syncCleanupFn: (() => void) | null = null
37+
public syncOnLoadMoreFn:
38+
| ((options: OnLoadMoreOptions) => void | Promise<void>)
39+
| null = null
3140

3241
/**
3342
* Creates a new CollectionSyncManager instance
@@ -52,7 +61,6 @@ export class CollectionSyncManager<
5261
* This is called when the collection is first accessed or preloaded
5362
*/
5463
public startSync(): void {
55-
const state = this.state
5664
if (
5765
this.lifecycle.status !== `idle` &&
5866
this.lifecycle.status !== `cleaned-up`
@@ -63,43 +71,44 @@ export class CollectionSyncManager<
6371
this.lifecycle.setStatus(`loading`)
6472

6573
try {
66-
const cleanupFn = this.config.sync.sync({
67-
collection: this.collection,
68-
begin: () => {
69-
state.pendingSyncedTransactions.push({
70-
committed: false,
71-
operations: [],
72-
deletedKeys: new Set(),
73-
})
74-
},
75-
write: (messageWithoutKey: Omit<ChangeMessage<TOutput>, `key`>) => {
76-
const pendingTransaction =
77-
state.pendingSyncedTransactions[
78-
state.pendingSyncedTransactions.length - 1
79-
]
80-
if (!pendingTransaction) {
81-
throw new NoPendingSyncTransactionWriteError()
82-
}
83-
if (pendingTransaction.committed) {
84-
throw new SyncTransactionAlreadyCommittedWriteError()
85-
}
86-
const key = this.config.getKey(messageWithoutKey.value)
74+
const syncRes = normalizeSyncFnResult(
75+
this.config.sync.sync({
76+
collection: this.collection,
77+
begin: () => {
78+
this.state.pendingSyncedTransactions.push({
79+
committed: false,
80+
operations: [],
81+
deletedKeys: new Set(),
82+
})
83+
},
84+
write: (messageWithoutKey: Omit<ChangeMessage<TOutput>, `key`>) => {
85+
const pendingTransaction =
86+
this.state.pendingSyncedTransactions[
87+
this.state.pendingSyncedTransactions.length - 1
88+
]
89+
if (!pendingTransaction) {
90+
throw new NoPendingSyncTransactionWriteError()
91+
}
92+
if (pendingTransaction.committed) {
93+
throw new SyncTransactionAlreadyCommittedWriteError()
94+
}
95+
const key = this.config.getKey(messageWithoutKey.value)
8796

8897
let messageType = messageWithoutKey.type
8998

90-
// Check if an item with this key already exists when inserting
91-
if (messageWithoutKey.type === `insert`) {
92-
const insertingIntoExistingSynced = state.syncedData.has(key)
93-
const hasPendingDeleteForKey =
94-
pendingTransaction.deletedKeys.has(key)
95-
const isTruncateTransaction = pendingTransaction.truncate === true
96-
// Allow insert after truncate in the same transaction even if it existed in syncedData
97-
if (
98-
insertingIntoExistingSynced &&
99-
!hasPendingDeleteForKey &&
100-
!isTruncateTransaction
101-
) {
102-
const existingValue = state.syncedData.get(key)
99+
// Check if an item with this key already exists when inserting
100+
if (messageWithoutKey.type === `insert`) {
101+
const insertingIntoExistingSynced = this.state.syncedData.has(key)
102+
const hasPendingDeleteForKey =
103+
pendingTransaction.deletedKeys.has(key)
104+
const isTruncateTransaction = pendingTransaction.truncate === true
105+
// Allow insert after truncate in the same transaction even if it existed in syncedData
106+
if (
107+
insertingIntoExistingSynced &&
108+
!hasPendingDeleteForKey &&
109+
!isTruncateTransaction
110+
) {
111+
const existingValue = state.syncedData.get(key)
103112
if (
104113
existingValue !== undefined &&
105114
deepEquals(existingValue, messageWithoutKey.value)
@@ -110,73 +119,77 @@ export class CollectionSyncManager<
110119
messageType = `update`
111120
} else {
112121
throw new DuplicateKeySyncError(key, this.id)
113-
}
122+
}
123+
}
114124
}
115-
}
116125

117-
const message: ChangeMessage<TOutput> = {
118-
...messageWithoutKey,
119-
type: messageType,
126+
const message: ChangeMessage<TOutput> = {
127+
...messageWithoutKey,
128+
type: messageType,
120129
key,
121-
}
122-
pendingTransaction.operations.push(message)
123-
124-
if (messageType === `delete`) {
125-
pendingTransaction.deletedKeys.add(key)
126-
}
127-
},
128-
commit: () => {
129-
const pendingTransaction =
130-
state.pendingSyncedTransactions[
131-
state.pendingSyncedTransactions.length - 1
132-
]
133-
if (!pendingTransaction) {
134-
throw new NoPendingSyncTransactionCommitError()
135-
}
136-
if (pendingTransaction.committed) {
137-
throw new SyncTransactionAlreadyCommittedError()
138-
}
139-
140-
pendingTransaction.committed = true
141-
142-
// Update status to initialCommit when transitioning from loading
143-
// This indicates we're in the process of committing the first transaction
144-
if (this.lifecycle.status === `loading`) {
145-
this.lifecycle.setStatus(`initialCommit`)
146-
}
147-
148-
state.commitPendingTransactions()
149-
},
150-
markReady: () => {
151-
this.lifecycle.markReady()
152-
},
153-
truncate: () => {
154-
const pendingTransaction =
155-
state.pendingSyncedTransactions[
156-
state.pendingSyncedTransactions.length - 1
157-
]
158-
if (!pendingTransaction) {
159-
throw new NoPendingSyncTransactionWriteError()
160-
}
161-
if (pendingTransaction.committed) {
162-
throw new SyncTransactionAlreadyCommittedWriteError()
163-
}
164-
165-
// Clear all operations from the current transaction
166-
pendingTransaction.operations = []
167-
pendingTransaction.deletedKeys.clear()
168-
169-
// Mark the transaction as a truncate operation. During commit, this triggers:
170-
// - Delete events for all previously synced keys (excluding optimistic-deleted keys)
171-
// - Clearing of syncedData/syncedMetadata
172-
// - Subsequent synced ops applied on the fresh base
173-
// - Finally, optimistic mutations re-applied on top (single batch)
174-
pendingTransaction.truncate = true
175-
},
176-
})
130+
}
131+
pendingTransaction.operations.push(message)
132+
133+
if (messageType === `delete`) {
134+
pendingTransaction.deletedKeys.add(key)
135+
}
136+
},
137+
commit: () => {
138+
const pendingTransaction =
139+
this.state.pendingSyncedTransactions[
140+
this.state.pendingSyncedTransactions.length - 1
141+
]
142+
if (!pendingTransaction) {
143+
throw new NoPendingSyncTransactionCommitError()
144+
}
145+
if (pendingTransaction.committed) {
146+
throw new SyncTransactionAlreadyCommittedError()
147+
}
148+
149+
pendingTransaction.committed = true
150+
151+
// Update status to initialCommit when transitioning from loading
152+
// This indicates we're in the process of committing the first transaction
153+
if (this.lifecycle.status === `loading`) {
154+
this.lifecycle.setStatus(`initialCommit`)
155+
}
156+
157+
this.state.commitPendingTransactions()
158+
},
159+
markReady: () => {
160+
this.lifecycle.markReady()
161+
},
162+
truncate: () => {
163+
const pendingTransaction =
164+
this.state.pendingSyncedTransactions[
165+
this.state.pendingSyncedTransactions.length - 1
166+
]
167+
if (!pendingTransaction) {
168+
throw new NoPendingSyncTransactionWriteError()
169+
}
170+
if (pendingTransaction.committed) {
171+
throw new SyncTransactionAlreadyCommittedWriteError()
172+
}
173+
174+
// Clear all operations from the current transaction
175+
pendingTransaction.operations = []
176+
pendingTransaction.deletedKeys.clear()
177+
178+
// Mark the transaction as a truncate operation. During commit, this triggers:
179+
// - Delete events for all previously synced keys (excluding optimistic-deleted keys)
180+
// - Clearing of syncedData/syncedMetadata
181+
// - Subsequent synced ops applied on the fresh base
182+
// - Finally, optimistic mutations re-applied on top (single batch)
183+
pendingTransaction.truncate = true
184+
},
185+
})
186+
)
177187

178188
// Store cleanup function if provided
179-
this.syncCleanupFn = typeof cleanupFn === `function` ? cleanupFn : null
189+
this.syncCleanupFn = syncRes?.cleanup ?? null
190+
191+
// Store onLoadMore function if provided
192+
this.syncOnLoadMoreFn = syncRes?.onLoadMore ?? null
180193
} catch (error) {
181194
this.lifecycle.setStatus(`error`)
182195
throw error
@@ -225,6 +238,18 @@ export class CollectionSyncManager<
225238
return this.preloadPromise
226239
}
227240

241+
/**
242+
* Requests the sync layer to load more data.
243+
* @param options Options to control what data is being loaded
244+
* @returns If data loading is asynchronous, this method returns a promise that resolves when the data is loaded.
245+
* If data loading is synchronous, the data is loaded when the method returns.
246+
*/
247+
public syncMore(options: OnLoadMoreOptions): void | Promise<void> {
248+
if (this.syncOnLoadMoreFn) {
249+
return this.syncOnLoadMoreFn(options)
250+
}
251+
}
252+
228253
public cleanup(): void {
229254
try {
230255
if (this.syncCleanupFn) {
@@ -248,3 +273,15 @@ export class CollectionSyncManager<
248273
this.preloadPromise = null
249274
}
250275
}
276+
277+
function normalizeSyncFnResult(result: void | CleanupFn | SyncConfigRes) {
278+
if (typeof result === `function`) {
279+
return { cleanup: result }
280+
}
281+
282+
if (typeof result === `object`) {
283+
return result
284+
}
285+
286+
return undefined
287+
}

0 commit comments

Comments
 (0)