Skip to content

Commit 360b0df

Browse files
authored
enable live queries to use indexes on collections for where clauses (#258)
1 parent dcfef51 commit 360b0df

File tree

15 files changed

+1502
-254
lines changed

15 files changed

+1502
-254
lines changed

.changeset/fast-kings-retire.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
"@tanstack/db": patch
3+
---
4+
5+
Enabled live queries to use the collection indexes
6+
7+
Live queries now use the collection indexes for many queries, using the optimized query pipeline to push where clauses to the collection, which is then able to use the index to filter the data.

packages/db/src/change-events.ts

Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
import {
2+
createSingleRowRefProxy,
3+
toExpression,
4+
} from "./query/builder/ref-proxy"
5+
import { compileSingleRowExpression } from "./query/compiler/evaluators.js"
6+
import { optimizeExpressionWithIndexes } from "./utils/index-optimization.js"
7+
import type {
8+
ChangeMessage,
9+
CurrentStateAsChangesOptions,
10+
SubscribeChangesOptions,
11+
} from "./types"
12+
import type { Collection } from "./collection"
13+
import type { SingleRowRefProxy } from "./query/builder/ref-proxy"
14+
import type { BasicExpression } from "./query/ir.js"
15+
16+
/**
17+
* Interface for a collection-like object that provides the necessary methods
18+
* for the change events system to work
19+
*/
20+
export interface CollectionLike<
21+
T extends object = Record<string, unknown>,
22+
TKey extends string | number = string | number,
23+
> extends Pick<Collection<T, TKey>, `get` | `has` | `entries` | `indexes`> {}
24+
25+
/**
26+
* Returns the current state of the collection as an array of changes
27+
* @param collection - The collection to get changes from
28+
* @param options - Options including optional where filter
29+
* @returns An array of changes
30+
* @example
31+
* // Get all items as changes
32+
* const allChanges = currentStateAsChanges(collection)
33+
*
34+
* // Get only items matching a condition
35+
* const activeChanges = currentStateAsChanges(collection, {
36+
* where: (row) => row.status === 'active'
37+
* })
38+
*
39+
* // Get only items using a pre-compiled expression
40+
* const activeChanges = currentStateAsChanges(collection, {
41+
* whereExpression: eq(row.status, 'active')
42+
* })
43+
*/
44+
export function currentStateAsChanges<
45+
T extends object,
46+
TKey extends string | number,
47+
>(
48+
collection: CollectionLike<T, TKey>,
49+
options: CurrentStateAsChangesOptions<T> = {}
50+
): Array<ChangeMessage<T>> {
51+
// Helper function to collect filtered results
52+
const collectFilteredResults = (
53+
filterFn?: (value: T) => boolean
54+
): Array<ChangeMessage<T>> => {
55+
const result: Array<ChangeMessage<T>> = []
56+
for (const [key, value] of collection.entries()) {
57+
// If no filter function is provided, include all items
58+
if (filterFn?.(value) ?? true) {
59+
result.push({
60+
type: `insert`,
61+
key,
62+
value,
63+
})
64+
}
65+
}
66+
return result
67+
}
68+
69+
if (!options.where && !options.whereExpression) {
70+
// No filtering, return all items
71+
return collectFilteredResults()
72+
}
73+
74+
// There's a where clause, let's see if we can use an index
75+
try {
76+
let expression: BasicExpression<boolean>
77+
78+
if (options.whereExpression) {
79+
// Use the pre-compiled expression directly
80+
expression = options.whereExpression
81+
} else if (options.where) {
82+
// Create the single-row refProxy for the callback
83+
const singleRowRefProxy = createSingleRowRefProxy<T>()
84+
85+
// Execute the callback to get the expression
86+
const whereExpression = options.where(singleRowRefProxy)
87+
88+
// Convert the result to a BasicExpression
89+
expression = toExpression(whereExpression)
90+
} else {
91+
// This should never happen due to the check above, but TypeScript needs it
92+
return []
93+
}
94+
95+
// Try to optimize the query using indexes
96+
const optimizationResult = optimizeExpressionWithIndexes(
97+
expression,
98+
collection.indexes
99+
)
100+
101+
if (optimizationResult.canOptimize) {
102+
// Use index optimization
103+
const result: Array<ChangeMessage<T>> = []
104+
for (const key of optimizationResult.matchingKeys) {
105+
const value = collection.get(key)
106+
if (value !== undefined) {
107+
result.push({
108+
type: `insert`,
109+
key,
110+
value,
111+
})
112+
}
113+
}
114+
return result
115+
} else {
116+
// No index found or complex expression, fall back to full scan with filter
117+
const filterFn = options.where
118+
? createFilterFunction(options.where)
119+
: createFilterFunctionFromExpression(expression)
120+
121+
return collectFilteredResults(filterFn)
122+
}
123+
} catch (error) {
124+
// If anything goes wrong with the where clause, fall back to full scan
125+
console.warn(
126+
`Error processing where clause, falling back to full scan:`,
127+
error
128+
)
129+
130+
const filterFn = options.where
131+
? createFilterFunction(options.where)
132+
: createFilterFunctionFromExpression(options.whereExpression!)
133+
134+
return collectFilteredResults(filterFn)
135+
}
136+
}
137+
138+
/**
139+
* Creates a filter function from a where callback
140+
* @param whereCallback - The callback function that defines the filter condition
141+
* @returns A function that takes an item and returns true if it matches the filter
142+
*/
143+
export function createFilterFunction<T extends object>(
144+
whereCallback: (row: SingleRowRefProxy<T>) => any
145+
): (item: T) => boolean {
146+
return (item: T): boolean => {
147+
try {
148+
// First try the RefProxy approach for query builder functions
149+
const singleRowRefProxy = createSingleRowRefProxy<T>()
150+
const whereExpression = whereCallback(singleRowRefProxy)
151+
const expression = toExpression(whereExpression)
152+
const evaluator = compileSingleRowExpression(expression)
153+
const result = evaluator(item as Record<string, unknown>)
154+
// WHERE clauses should always evaluate to boolean predicates (Kevin's feedback)
155+
return result
156+
} catch {
157+
// If RefProxy approach fails (e.g., arithmetic operations), fall back to direct evaluation
158+
try {
159+
// Create a simple proxy that returns actual values for arithmetic operations
160+
const simpleProxy = new Proxy(item as any, {
161+
get(target, prop) {
162+
return target[prop]
163+
},
164+
}) as SingleRowRefProxy<T>
165+
166+
const result = whereCallback(simpleProxy)
167+
return result
168+
} catch {
169+
// If both approaches fail, exclude the item
170+
return false
171+
}
172+
}
173+
}
174+
}
175+
176+
/**
177+
* Creates a filter function from a pre-compiled expression
178+
* @param expression - The pre-compiled expression to evaluate
179+
* @returns A function that takes an item and returns true if it matches the filter
180+
*/
181+
export function createFilterFunctionFromExpression<T extends object>(
182+
expression: BasicExpression<boolean>
183+
): (item: T) => boolean {
184+
return (item: T): boolean => {
185+
try {
186+
const evaluator = compileSingleRowExpression(expression)
187+
const result = evaluator(item as Record<string, unknown>)
188+
return Boolean(result)
189+
} catch {
190+
// If evaluation fails, exclude the item
191+
return false
192+
}
193+
}
194+
}
195+
196+
/**
197+
* Creates a filtered callback that only calls the original callback with changes that match the where clause
198+
* @param originalCallback - The original callback to filter
199+
* @param options - The subscription options containing the where clause
200+
* @returns A filtered callback function
201+
*/
202+
export function createFilteredCallback<T extends object>(
203+
originalCallback: (changes: Array<ChangeMessage<T>>) => void,
204+
options: SubscribeChangesOptions<T>
205+
): (changes: Array<ChangeMessage<T>>) => void {
206+
const filterFn = options.whereExpression
207+
? createFilterFunctionFromExpression(options.whereExpression)
208+
: createFilterFunction(options.where!)
209+
210+
return (changes: Array<ChangeMessage<T>>) => {
211+
const filteredChanges: Array<ChangeMessage<T>> = []
212+
213+
for (const change of changes) {
214+
if (change.type === `insert`) {
215+
// For inserts, check if the new value matches the filter
216+
if (filterFn(change.value)) {
217+
filteredChanges.push(change)
218+
}
219+
} else if (change.type === `update`) {
220+
// For updates, we need to check both old and new values
221+
const newValueMatches = filterFn(change.value)
222+
const oldValueMatches = change.previousValue
223+
? filterFn(change.previousValue)
224+
: false
225+
226+
if (newValueMatches && oldValueMatches) {
227+
// Both old and new match: emit update
228+
filteredChanges.push(change)
229+
} else if (newValueMatches && !oldValueMatches) {
230+
// New matches but old didn't: emit insert
231+
filteredChanges.push({
232+
...change,
233+
type: `insert`,
234+
})
235+
} else if (!newValueMatches && oldValueMatches) {
236+
// Old matched but new doesn't: emit delete
237+
filteredChanges.push({
238+
...change,
239+
type: `delete`,
240+
value: change.previousValue!, // Use the previous value for the delete
241+
})
242+
}
243+
// If neither matches, don't emit anything
244+
} else {
245+
// For deletes, include if the previous value would have matched
246+
// (so subscribers know something they were tracking was deleted)
247+
if (filterFn(change.value)) {
248+
filteredChanges.push(change)
249+
}
250+
}
251+
}
252+
253+
if (filteredChanges.length > 0) {
254+
originalCallback(filteredChanges)
255+
}
256+
}
257+
}

0 commit comments

Comments
 (0)