Skip to content

Commit dd6cdf7

Browse files
authored
Predicate pushdown to sync (#617)
* Add onLoadMore callback and call it from the requestSnapshot methods * Export IR * Fixes to subscription * Remove debug logging in ts DB * Fix cleanup function returned from utility test * Linting * Fix state reference * Update lockfile * Changeset * Break import cycle * Update lockfile * Fix vite config
1 parent 7275cbf commit dd6cdf7

File tree

14 files changed

+2060
-1677
lines changed

14 files changed

+2060
-1677
lines changed

.changeset/floppy-wings-mix.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@tanstack/db": patch
3+
---
4+
5+
Push predicates down to sync layer

packages/db/src/collection/index.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import type {
2525
InferSchemaOutput,
2626
InsertConfig,
2727
NonSingleResult,
28+
OnLoadMoreOptions,
2829
OperationConfig,
2930
SingleResult,
3031
SubscribeChangesOptions,
@@ -362,6 +363,18 @@ export class CollectionImpl<
362363
this._sync.startSync()
363364
}
364365

366+
/**
367+
* Requests the sync layer to load more data.
368+
* @param options Options to control what data is being loaded
369+
* @returns If data loading is asynchronous, this method returns a promise that resolves when the data is loaded.
370+
* If data loading is synchronous, the data is loaded when the method returns.
371+
*/
372+
public syncMore(options: OnLoadMoreOptions): void | Promise<void> {
373+
if (this._sync.syncOnLoadMoreFn) {
374+
return this._sync.syncOnLoadMoreFn(options)
375+
}
376+
}
377+
365378
/**
366379
* Preload the collection data by starting sync if not already started
367380
* Multiple concurrent calls will share the same promise

packages/db/src/collection/subscription.ts

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import { ensureIndexForExpression } from "../indexes/auto-index.js"
2-
import { and } from "../query/builder/functions.js"
2+
import { and, gt, lt } from "../query/builder/functions.js"
3+
import { Value } from "../query/ir.js"
34
import {
45
createFilterFunctionFromExpression,
56
createFilteredCallback,
67
} from "./change-events.js"
7-
import type { BasicExpression } from "../query/ir.js"
8+
import type { BasicExpression, OrderBy } from "../query/ir.js"
89
import type { IndexInterface } from "../indexes/base-index.js"
910
import type { ChangeMessage } from "../types.js"
1011
import type { CollectionImpl } from "./index.js"
@@ -15,8 +16,9 @@ type RequestSnapshotOptions = {
1516
}
1617

1718
type RequestLimitedSnapshotOptions = {
18-
minValue?: any
19+
orderBy: OrderBy
1920
limit: number
21+
minValue?: any
2022
}
2123

2224
type CollectionSubscriptionOptions = {
@@ -117,6 +119,13 @@ export class CollectionSubscription {
117119
this.loadedInitialState = true
118120
}
119121

122+
// Request the sync layer to load more data
123+
// don't await it, we will load the data into the collection when it comes in
124+
this.collection.syncMore({
125+
where: stateOpts.where,
126+
})
127+
128+
// Also load data immediately from the collection
120129
const snapshot = this.collection.currentStateAsChanges(stateOpts)
121130

122131
if (snapshot === undefined) {
@@ -140,7 +149,11 @@ export class CollectionSubscription {
140149
* It uses that range index to load the items in the order of the index.
141150
* Note: it does not send keys that have already been sent before.
142151
*/
143-
requestLimitedSnapshot({ limit, minValue }: RequestLimitedSnapshotOptions) {
152+
requestLimitedSnapshot({
153+
orderBy,
154+
limit,
155+
minValue,
156+
}: RequestLimitedSnapshotOptions) {
144157
if (!limit) throw new Error(`limit is required`)
145158

146159
if (!this.orderByIndex) {
@@ -190,6 +203,23 @@ export class CollectionSubscription {
190203
}
191204

192205
this.callback(changes)
206+
207+
let whereWithValueFilter = where
208+
if (typeof minValue !== `undefined`) {
209+
// Only request data that we haven't seen yet (i.e. is bigger than the minValue)
210+
const { expression, compareOptions } = orderBy[0]!
211+
const operator = compareOptions.direction === `asc` ? gt : lt
212+
const valueFilter = operator(expression, new Value(minValue))
213+
whereWithValueFilter = where ? and(where, valueFilter) : valueFilter
214+
}
215+
216+
// Request the sync layer to load more data
217+
// don't await it, we will load the data into the collection when it comes in
218+
this.collection.syncMore({
219+
where: whereWithValueFilter,
220+
limit,
221+
orderBy,
222+
})
193223
}
194224

195225
/**

0 commit comments

Comments
 (0)