Skip to content

Commit a053969

Browse files
committed
Fixes to subscription
1 parent bcba60b commit a053969

File tree

3 files changed

+60
-14
lines changed

3 files changed

+60
-14
lines changed

packages/db/src/collection/subscription.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import { ensureIndexForExpression } from "../indexes/auto-index.js"
2-
import { and } from "../query/builder/functions.js"
2+
import { and, gt, lt } from "../query/index.js"
33
import {
44
createFilterFunctionFromExpression,
55
createFilteredCallback,
66
} from "./change-events.js"
77
import type { IndexInterface } from "../indexes/base-index.js"
8-
import type { BasicExpression, OrderBy } from "../query/ir.js"
8+
import { Value, type BasicExpression, type OrderBy } from "../query/ir.js"
99
import type { ChangeMessage } from "../types.js"
1010
import type { CollectionImpl } from "./index.js"
1111

@@ -153,6 +153,7 @@ export class CollectionSubscription {
153153
limit,
154154
minValue,
155155
}: RequestLimitedSnapshotOptions) {
156+
console.log("in requestLimitedSnapshot")
156157
if (!limit) throw new Error(`limit is required`)
157158

158159
if (!this.orderByIndex) {
@@ -203,10 +204,19 @@ export class CollectionSubscription {
203204

204205
this.callback(changes)
205206

207+
let whereWithValueFilter = where
208+
if (typeof minValue !== `undefined`) {
209+
// Only request data that we haven't seen yet (i.e. is bigger than the minValue)
210+
const { expression, compareOptions } = orderBy[0]!
211+
const operator = compareOptions.direction === `asc` ? gt : lt
212+
const valueFilter = operator(expression, new Value(minValue))
213+
whereWithValueFilter = where ? and(where, valueFilter) : valueFilter
214+
}
215+
206216
// Request the sync layer to load more data
207217
// don't await it, we will load the data into the collection when it comes in
208218
this.collection.syncMore({
209-
where,
219+
where: whereWithValueFilter,
210220
limit,
211221
orderBy,
212222
})

packages/db/src/query/compiler/expressions.ts

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Func, PropRef, Value } from "../ir.js"
2-
import type { BasicExpression } from "../ir.js"
2+
import type { BasicExpression, OrderBy } from "../ir.js"
33

44
/**
55
* Functions supported by the collection index system.
@@ -90,3 +90,26 @@ export function convertToBasicExpression(
9090
return new Func(whereClause.name, args)
9191
}
9292
}
93+
94+
export function convertOrderByToBasicExpression(
95+
orderBy: OrderBy,
96+
collectionAlias: string
97+
): OrderBy {
98+
const normalizedOrderBy = orderBy.map((clause) => {
99+
const basicExp = convertToBasicExpression(
100+
clause.expression,
101+
collectionAlias
102+
)
103+
104+
if (!basicExp) {
105+
throw new Error(`Failed to convert orderBy expression to a basic expression: ${clause.expression}`)
106+
}
107+
108+
return {
109+
...clause,
110+
expression: basicExp,
111+
}
112+
})
113+
114+
return normalizedOrderBy
115+
}

packages/db/src/query/live/collection-subscriber.ts

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { MultiSet } from "@tanstack/db-ivm"
2-
import { convertToBasicExpression } from "../compiler/expressions.js"
2+
import { convertOrderByToBasicExpression, convertToBasicExpression } from "../compiler/expressions.js"
33
import type { FullSyncState } from "./types.js"
44
import type { MultiSetArray, RootStreamBuilder } from "@tanstack/db-ivm"
55
import type { Collection } from "../../collection/index.js"
@@ -16,26 +16,29 @@ export class CollectionSubscriber<
1616
// Keep track of the biggest value we've sent so far (needed for orderBy optimization)
1717
private biggest: any = undefined
1818

19+
private collectionAlias: string
20+
1921
constructor(
2022
private collectionId: string,
2123
private collection: Collection,
2224
private config: Parameters<SyncConfig<TResult>[`sync`]>[0],
2325
private syncState: FullSyncState,
2426
private collectionConfigBuilder: CollectionConfigBuilder<TContext, TResult>
25-
) {}
26-
27-
subscribe(): CollectionSubscription {
28-
const collectionAlias = findCollectionAlias(
27+
) {
28+
this.collectionAlias = findCollectionAlias(
2929
this.collectionId,
3030
this.collectionConfigBuilder.query
31-
)
32-
const whereClause = this.getWhereClauseFromAlias(collectionAlias)
31+
)!
32+
}
33+
34+
subscribe(): CollectionSubscription {
35+
const whereClause = this.getWhereClauseFromAlias(this.collectionAlias)
3336

3437
if (whereClause) {
3538
// Convert WHERE clause to BasicExpression format for collection subscription
3639
const whereExpression = convertToBasicExpression(
3740
whereClause,
38-
collectionAlias!
41+
this.collectionAlias
3942
)
4043

4144
if (whereExpression) {
@@ -164,11 +167,14 @@ export class CollectionSubscriber<
164167

165168
subscription.setOrderByIndex(index)
166169

170+
// Normalize the orderBy clauses such that the references are relative to the collection
171+
const normalizedOrderBy = convertOrderByToBasicExpression(orderBy, this.collectionAlias)
172+
167173
// Load the first `offset + limit` values from the index
168174
// i.e. the K items from the collection that fall into the requested range: [offset, offset + limit[
169175
subscription.requestLimitedSnapshot({
170176
limit: offset + limit,
171-
orderBy,
177+
orderBy: normalizedOrderBy,
172178
})
173179

174180
return subscription
@@ -234,9 +240,16 @@ export class CollectionSubscriber<
234240
const biggestSentValue = biggestSentRow
235241
? valueExtractorForRawRow(biggestSentRow)
236242
: biggestSentRow
243+
244+
// Normalize the orderBy clauses such that the references are relative to the collection
245+
const normalizedOrderBy = convertOrderByToBasicExpression(
246+
orderBy,
247+
this.collectionAlias
248+
)
249+
237250
// Take the `n` items after the biggest sent value
238251
subscription.requestLimitedSnapshot({
239-
orderBy,
252+
orderBy: normalizedOrderBy,
240253
limit: n,
241254
minValue: biggestSentValue,
242255
})

0 commit comments

Comments
 (0)