Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/floppy-wings-mix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tanstack/db": patch
---

Push predicates down to sync layer
13 changes: 13 additions & 0 deletions packages/db/src/collection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import type {
InferSchemaOutput,
InsertConfig,
NonSingleResult,
OnLoadMoreOptions,
OperationConfig,
SingleResult,
SubscribeChangesOptions,
Expand Down Expand Up @@ -362,6 +363,18 @@ export class CollectionImpl<
this._sync.startSync()
}

/**
* Requests the sync layer to load more data.
* @param options Options to control what data is being loaded
* @returns If data loading is asynchronous, this method returns a promise that resolves when the data is loaded.
* If data loading is synchronous, the data is loaded when the method returns.
*/
public syncMore(options: OnLoadMoreOptions): void | Promise<void> {
if (this._sync.syncOnLoadMoreFn) {
return this._sync.syncOnLoadMoreFn(options)
}
}

/**
* Preload the collection data by starting sync if not already started
* Multiple concurrent calls will share the same promise
Expand Down
38 changes: 34 additions & 4 deletions packages/db/src/collection/subscription.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { ensureIndexForExpression } from "../indexes/auto-index.js"
import { and } from "../query/builder/functions.js"
import { and, gt, lt } from "../query/builder/functions.js"
import { Value } from "../query/ir.js"
import {
createFilterFunctionFromExpression,
createFilteredCallback,
} from "./change-events.js"
import type { BasicExpression } from "../query/ir.js"
import type { BasicExpression, OrderBy } from "../query/ir.js"
import type { IndexInterface } from "../indexes/base-index.js"
import type { ChangeMessage } from "../types.js"
import type { CollectionImpl } from "./index.js"
Expand All @@ -15,8 +16,9 @@ type RequestSnapshotOptions = {
}

type RequestLimitedSnapshotOptions = {
minValue?: any
orderBy: OrderBy
limit: number
minValue?: any
}

type CollectionSubscriptionOptions = {
Expand Down Expand Up @@ -117,6 +119,13 @@ export class CollectionSubscription {
this.loadedInitialState = true
}

// Request the sync layer to load more data
// don't await it, we will load the data into the collection when it comes in
this.collection.syncMore({
where: stateOpts.where,
})

// Also load data immediately from the collection
const snapshot = this.collection.currentStateAsChanges(stateOpts)

if (snapshot === undefined) {
Expand All @@ -140,7 +149,11 @@ export class CollectionSubscription {
* It uses that range index to load the items in the order of the index.
* Note: it does not send keys that have already been sent before.
*/
requestLimitedSnapshot({ limit, minValue }: RequestLimitedSnapshotOptions) {
requestLimitedSnapshot({
orderBy,
limit,
minValue,
}: RequestLimitedSnapshotOptions) {
if (!limit) throw new Error(`limit is required`)

if (!this.orderByIndex) {
Expand Down Expand Up @@ -190,6 +203,23 @@ export class CollectionSubscription {
}

this.callback(changes)

let whereWithValueFilter = where
if (typeof minValue !== `undefined`) {
// Only request data that we haven't seen yet (i.e. is bigger than the minValue)
const { expression, compareOptions } = orderBy[0]!
const operator = compareOptions.direction === `asc` ? gt : lt
const valueFilter = operator(expression, new Value(minValue))
whereWithValueFilter = where ? and(where, valueFilter) : valueFilter
}

// Request the sync layer to load more data
// don't await it, we will load the data into the collection when it comes in
this.collection.syncMore({
where: whereWithValueFilter,
limit,
orderBy,
})
}

/**
Expand Down
Loading
Loading