Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions packages/db/src/collection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import type {
InferSchemaInput,
InferSchemaOutput,
InsertConfig,
OnLoadMoreOptions,
OperationConfig,
SubscribeChangesOptions,
Transaction as TransactionType,
Expand Down Expand Up @@ -332,6 +333,18 @@ export class CollectionImpl<
this._sync.startSync()
}

/**
* Requests the sync layer to load more data.
* @param options Options to control what data is being loaded
* @returns If data loading is asynchronous, this method returns a promise that resolves when the data is loaded.
* If data loading is synchronous, the data is loaded when the method returns.
*/
public syncMore(options: OnLoadMoreOptions): void | Promise<void> {
if (this._sync.syncOnLoadMoreFn) {
return this._sync.syncOnLoadMoreFn(options)
}
}

/**
* Preload the collection data by starting sync if not already started
* Multiple concurrent calls will share the same promise
Expand Down
38 changes: 34 additions & 4 deletions packages/db/src/collection/subscription.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { ensureIndexForExpression } from "../indexes/auto-index.js"
import { and } from "../query/builder/functions.js"
import { and, gt, lt } from "../query/index.js"
import { Value } from "../query/ir.js"
import {
createFilterFunctionFromExpression,
createFilteredCallback,
} from "./change-events.js"
import type { BasicExpression } from "../query/ir.js"
import type { BasicExpression, OrderBy } from "../query/ir.js"
import type { BaseIndex } from "../indexes/base-index.js"
import type { ChangeMessage } from "../types.js"
import type { CollectionImpl } from "./index.js"
Expand All @@ -15,8 +16,9 @@ type RequestSnapshotOptions = {
}

type RequestLimitedSnapshotOptions = {
minValue?: any
orderBy: OrderBy
limit: number
minValue?: any
}

type CollectionSubscriptionOptions = {
Expand Down Expand Up @@ -117,6 +119,13 @@ export class CollectionSubscription {
this.loadedInitialState = true
}

// Request the sync layer to load more data
// don't await it, we will load the data into the collection when it comes in
this.collection.syncMore({
where: stateOpts.where,
})

// Also load data immediately from the collection
const snapshot = this.collection.currentStateAsChanges(stateOpts)

if (snapshot === undefined) {
Expand All @@ -140,7 +149,11 @@ export class CollectionSubscription {
* It uses that range index to load the items in the order of the index.
* Note: it does not send keys that have already been sent before.
*/
requestLimitedSnapshot({ limit, minValue }: RequestLimitedSnapshotOptions) {
requestLimitedSnapshot({
orderBy,
limit,
minValue,
}: RequestLimitedSnapshotOptions) {
if (!limit) throw new Error(`limit is required`)

if (!this.orderByIndex) {
Expand Down Expand Up @@ -190,6 +203,23 @@ export class CollectionSubscription {
}

this.callback(changes)

let whereWithValueFilter = where
if (typeof minValue !== `undefined`) {
// Only request data that we haven't seen yet (i.e. is bigger than the minValue)
const { expression, compareOptions } = orderBy[0]!
const operator = compareOptions.direction === `asc` ? gt : lt
const valueFilter = operator(expression, new Value(minValue))
whereWithValueFilter = where ? and(where, valueFilter) : valueFilter
}

// Request the sync layer to load more data
// don't await it, we will load the data into the collection when it comes in
this.collection.syncMore({
where: whereWithValueFilter,
limit,
orderBy,
})
}

/**
Expand Down
235 changes: 136 additions & 99 deletions packages/db/src/collection/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@ import {
SyncTransactionAlreadyCommittedWriteError,
} from "../errors"
import type { StandardSchemaV1 } from "@standard-schema/spec"
import type { ChangeMessage, CollectionConfig } from "../types"
import type {
ChangeMessage,
CleanupFn,
CollectionConfig,
OnLoadMoreOptions,
SyncConfigRes,
} from "../types"
import type { CollectionImpl } from "./index.js"
import type { CollectionStateManager } from "./state"
import type { CollectionLifecycleManager } from "./lifecycle"
Expand All @@ -27,6 +33,9 @@ export class CollectionSyncManager<

public preloadPromise: Promise<void> | null = null
public syncCleanupFn: (() => void) | null = null
public syncOnLoadMoreFn:
| ((options: OnLoadMoreOptions) => void | Promise<void>)
| null = null

/**
* Creates a new CollectionSyncManager instance
Expand All @@ -51,7 +60,6 @@ export class CollectionSyncManager<
* This is called when the collection is first accessed or preloaded
*/
public startSync(): void {
const state = this.state
if (
this.lifecycle.status !== `idle` &&
this.lifecycle.status !== `cleaned-up`
Expand All @@ -62,106 +70,111 @@ export class CollectionSyncManager<
this.lifecycle.setStatus(`loading`)

try {
const cleanupFn = this.config.sync.sync({
collection: this.collection,
begin: () => {
state.pendingSyncedTransactions.push({
committed: false,
operations: [],
deletedKeys: new Set(),
})
},
write: (messageWithoutKey: Omit<ChangeMessage<TOutput>, `key`>) => {
const pendingTransaction =
state.pendingSyncedTransactions[
state.pendingSyncedTransactions.length - 1
]
if (!pendingTransaction) {
throw new NoPendingSyncTransactionWriteError()
}
if (pendingTransaction.committed) {
throw new SyncTransactionAlreadyCommittedWriteError()
}
const key = this.config.getKey(messageWithoutKey.value)

// Check if an item with this key already exists when inserting
if (messageWithoutKey.type === `insert`) {
const insertingIntoExistingSynced = state.syncedData.has(key)
const hasPendingDeleteForKey =
pendingTransaction.deletedKeys.has(key)
const isTruncateTransaction = pendingTransaction.truncate === true
// Allow insert after truncate in the same transaction even if it existed in syncedData
if (
insertingIntoExistingSynced &&
!hasPendingDeleteForKey &&
!isTruncateTransaction
) {
throw new DuplicateKeySyncError(key, this.id)
const syncRes = normalizeSyncFnResult(
this.config.sync.sync({
collection: this.collection,
begin: () => {
this.state.pendingSyncedTransactions.push({
committed: false,
operations: [],
deletedKeys: new Set(),
})
},
write: (messageWithoutKey: Omit<ChangeMessage<TOutput>, `key`>) => {
const pendingTransaction =
this.state.pendingSyncedTransactions[
this.state.pendingSyncedTransactions.length - 1
]
if (!pendingTransaction) {
throw new NoPendingSyncTransactionWriteError()
}
}

const message: ChangeMessage<TOutput> = {
...messageWithoutKey,
key,
}
pendingTransaction.operations.push(message)

if (messageWithoutKey.type === `delete`) {
pendingTransaction.deletedKeys.add(key)
}
},
commit: () => {
const pendingTransaction =
state.pendingSyncedTransactions[
state.pendingSyncedTransactions.length - 1
]
if (!pendingTransaction) {
throw new NoPendingSyncTransactionCommitError()
}
if (pendingTransaction.committed) {
throw new SyncTransactionAlreadyCommittedError()
}

pendingTransaction.committed = true

// Update status to initialCommit when transitioning from loading
// This indicates we're in the process of committing the first transaction
if (this.lifecycle.status === `loading`) {
this.lifecycle.setStatus(`initialCommit`)
}

state.commitPendingTransactions()
},
markReady: () => {
this.lifecycle.markReady()
},
truncate: () => {
const pendingTransaction =
state.pendingSyncedTransactions[
state.pendingSyncedTransactions.length - 1
]
if (!pendingTransaction) {
throw new NoPendingSyncTransactionWriteError()
}
if (pendingTransaction.committed) {
throw new SyncTransactionAlreadyCommittedWriteError()
}

// Clear all operations from the current transaction
pendingTransaction.operations = []
pendingTransaction.deletedKeys.clear()

// Mark the transaction as a truncate operation. During commit, this triggers:
// - Delete events for all previously synced keys (excluding optimistic-deleted keys)
// - Clearing of syncedData/syncedMetadata
// - Subsequent synced ops applied on the fresh base
// - Finally, optimistic mutations re-applied on top (single batch)
pendingTransaction.truncate = true
},
})
if (pendingTransaction.committed) {
throw new SyncTransactionAlreadyCommittedWriteError()
}
const key = this.config.getKey(messageWithoutKey.value)

// Check if an item with this key already exists when inserting
if (messageWithoutKey.type === `insert`) {
const insertingIntoExistingSynced = this.state.syncedData.has(key)
const hasPendingDeleteForKey =
pendingTransaction.deletedKeys.has(key)
const isTruncateTransaction = pendingTransaction.truncate === true
// Allow insert after truncate in the same transaction even if it existed in syncedData
if (
insertingIntoExistingSynced &&
!hasPendingDeleteForKey &&
!isTruncateTransaction
) {
throw new DuplicateKeySyncError(key, this.id)
}
}

const message: ChangeMessage<TOutput> = {
...messageWithoutKey,
key,
}
pendingTransaction.operations.push(message)

if (messageWithoutKey.type === `delete`) {
pendingTransaction.deletedKeys.add(key)
}
},
commit: () => {
const pendingTransaction =
this.state.pendingSyncedTransactions[
this.state.pendingSyncedTransactions.length - 1
]
if (!pendingTransaction) {
throw new NoPendingSyncTransactionCommitError()
}
if (pendingTransaction.committed) {
throw new SyncTransactionAlreadyCommittedError()
}

pendingTransaction.committed = true

// Update status to initialCommit when transitioning from loading
// This indicates we're in the process of committing the first transaction
if (this.lifecycle.status === `loading`) {
this.lifecycle.setStatus(`initialCommit`)
}

this.state.commitPendingTransactions()
},
markReady: () => {
this.lifecycle.markReady()
},
truncate: () => {
const pendingTransaction =
this.state.pendingSyncedTransactions[
this.state.pendingSyncedTransactions.length - 1
]
if (!pendingTransaction) {
throw new NoPendingSyncTransactionWriteError()
}
if (pendingTransaction.committed) {
throw new SyncTransactionAlreadyCommittedWriteError()
}

// Clear all operations from the current transaction
pendingTransaction.operations = []
pendingTransaction.deletedKeys.clear()

// Mark the transaction as a truncate operation. During commit, this triggers:
// - Delete events for all previously synced keys (excluding optimistic-deleted keys)
// - Clearing of syncedData/syncedMetadata
// - Subsequent synced ops applied on the fresh base
// - Finally, optimistic mutations re-applied on top (single batch)
pendingTransaction.truncate = true
},
})
)

// Store cleanup function if provided
this.syncCleanupFn = typeof cleanupFn === `function` ? cleanupFn : null
this.syncCleanupFn = syncRes?.cleanup ?? null

// Store onLoadMore function if provided
this.syncOnLoadMoreFn = syncRes?.onLoadMore ?? null
} catch (error) {
this.lifecycle.setStatus(`error`)
throw error
Expand Down Expand Up @@ -210,6 +223,18 @@ export class CollectionSyncManager<
return this.preloadPromise
}

/**
* Requests the sync layer to load more data.
* @param options Options to control what data is being loaded
* @returns If data loading is asynchronous, this method returns a promise that resolves when the data is loaded.
* If data loading is synchronous, the data is loaded when the method returns.
*/
public syncMore(options: OnLoadMoreOptions): void | Promise<void> {
if (this.syncOnLoadMoreFn) {
return this.syncOnLoadMoreFn(options)
}
}

public cleanup(): void {
try {
if (this.syncCleanupFn) {
Expand All @@ -233,3 +258,15 @@ export class CollectionSyncManager<
this.preloadPromise = null
}
}

function normalizeSyncFnResult(result: void | CleanupFn | SyncConfigRes) {
if (typeof result === `function`) {
return { cleanup: result }
}

if (typeof result === `object`) {
return result
}

return undefined
}
5 changes: 5 additions & 0 deletions packages/db/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
// Re-export all public APIs
// Re-export IR types under their own namespace
// because custom collections need access to the IR types
import * as IR from "./query/ir.js"

export * from "./collection/index.js"
export * from "./SortedMap"
export * from "./transactions"
Expand All @@ -18,3 +22,4 @@ export { type IndexOptions } from "./indexes/index-options.js"

// Re-export some stuff explicitly to ensure the type & value is exported
export type { Collection } from "./collection/index.js"
export { IR }
Loading
Loading