Skip to content

Commit 68538b4

Browse files
authored
Optimize joins to use index when possible (#335)
1 parent 6b51914 commit 68538b4

File tree

14 files changed

+1426
-100
lines changed

14 files changed

+1426
-100
lines changed

.changeset/sour-emus-count.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@tanstack/db-ivm": patch
3+
"@tanstack/db": patch
4+
---
5+
6+
Optimize joins to use index on the join key when available.

packages/db-ivm/src/operators/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
export * from "./pipe.js"
22
export * from "./map.js"
3+
export * from "./tap.js"
34
export * from "./filter.js"
45
export * from "./negate.js"
56
export * from "./concat.js"

packages/db-ivm/src/operators/tap.ts

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import { DifferenceStreamWriter, LinearUnaryOperator } from "../graph.js"
2+
import { StreamBuilder } from "../d2.js"
3+
import type { IStreamBuilder, PipedOperator } from "../types.js"
4+
import type { DifferenceStreamReader } from "../graph.js"
5+
import type { MultiSet } from "../multiset.js"
6+
7+
/**
8+
* Operator that applies a function to each element in the input stream
9+
*/
10+
export class TapOperator<T> extends LinearUnaryOperator<T, T> {
11+
#f: (data: T) => void
12+
13+
constructor(
14+
id: number,
15+
inputA: DifferenceStreamReader<T>,
16+
output: DifferenceStreamWriter<T>,
17+
f: (data: T) => void
18+
) {
19+
super(id, inputA, output)
20+
this.#f = f
21+
}
22+
23+
inner(collection: MultiSet<T>): MultiSet<T> {
24+
return collection.map((data) => {
25+
this.#f(data)
26+
return data
27+
})
28+
}
29+
}
30+
31+
/**
32+
* Invokes a function for each element in the input stream.
33+
* This operator doesn't modify the stream and is used to perform side effects.
34+
* @param f - The function to invoke on each element
35+
* @returns The input stream
36+
*/
37+
export function tap<T>(f: (data: T) => void): PipedOperator<T, T> {
38+
return (stream: IStreamBuilder<T>): IStreamBuilder<T> => {
39+
const output = new StreamBuilder<T>(
40+
stream.graph,
41+
new DifferenceStreamWriter<T>()
42+
)
43+
const operator = new TapOperator<T>(
44+
stream.graph.getNextOperatorId(),
45+
stream.connectReader(),
46+
output.writer,
47+
f
48+
)
49+
stream.graph.addOperator(operator)
50+
stream.graph.addStream(output.connectReader())
51+
return output
52+
}
53+
}

packages/db/src/collection.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1397,8 +1397,8 @@ export class CollectionImpl<
13971397

13981398
/**
13991399
* Creates an index on a collection for faster queries.
1400-
* Indexes significantly improve query performance by allowing binary search
1401-
* and range queries instead of full scans.
1400+
* Indexes significantly improve query performance by allowing constant time lookups
1401+
* and logarithmic time range queries instead of full scans.
14021402
*
14031403
* @template TResolver - The type of the index resolver (constructor or async loader)
14041404
* @param indexCallback - Function that extracts the indexed value from each item

packages/db/src/errors.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,12 @@ export class UnknownFunctionError extends QueryCompilationError {
377377
}
378378
}
379379

380+
export class JoinCollectionNotFoundError extends QueryCompilationError {
381+
constructor(collectionId: string) {
382+
super(`Collection "${collectionId}" not found during compilation of join`)
383+
}
384+
}
385+
380386
// JOIN Operation Errors
381387
export class JoinError extends TanStackDBError {
382388
constructor(message: string) {

packages/db/src/indexes/auto-index.ts

Lines changed: 51 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,55 @@ export interface AutoIndexConfig {
66
autoIndex?: `off` | `eager`
77
}
88

9+
function shouldAutoIndex(collection: CollectionImpl<any, any, any, any, any>) {
10+
// Only proceed if auto-indexing is enabled
11+
if (collection.config.autoIndex !== `eager`) {
12+
return false
13+
}
14+
15+
// Don't auto-index during sync operations
16+
if (
17+
collection.status === `loading` ||
18+
collection.status === `initialCommit`
19+
) {
20+
return false
21+
}
22+
23+
return true
24+
}
25+
26+
export function ensureIndexForField<
27+
T extends Record<string, any>,
28+
TKey extends string | number,
29+
>(
30+
fieldName: string,
31+
fieldPath: Array<string>,
32+
collection: CollectionImpl<T, TKey, any, any, any>
33+
) {
34+
if (!shouldAutoIndex(collection)) {
35+
return
36+
}
37+
38+
// Check if we already have an index for this field
39+
const existingIndex = Array.from(collection.indexes.values()).find((index) =>
40+
index.matchesField(fieldPath)
41+
)
42+
43+
if (existingIndex) {
44+
return // Index already exists
45+
}
46+
47+
// Create a new index for this field using the collection's createIndex method
48+
try {
49+
collection.createIndex((row) => (row as any)[fieldName], {
50+
name: `auto_${fieldName}`,
51+
indexType: BTreeIndex,
52+
})
53+
} catch (error) {
54+
console.warn(`Failed to create auto-index for field "${fieldName}":`, error)
55+
}
56+
}
57+
958
/**
1059
* Analyzes a where expression and creates indexes for all simple operations on single fields
1160
*/
@@ -16,44 +65,15 @@ export function ensureIndexForExpression<
1665
expression: BasicExpression,
1766
collection: CollectionImpl<T, TKey, any, any, any>
1867
): void {
19-
// Only proceed if auto-indexing is enabled
20-
if (collection.config.autoIndex !== `eager`) {
21-
return
22-
}
23-
24-
// Don't auto-index during sync operations
25-
if (
26-
collection.status === `loading` ||
27-
collection.status === `initialCommit`
28-
) {
68+
if (!shouldAutoIndex(collection)) {
2969
return
3070
}
3171

3272
// Extract all indexable expressions and create indexes for them
3373
const indexableExpressions = extractIndexableExpressions(expression)
3474

3575
for (const { fieldName, fieldPath } of indexableExpressions) {
36-
// Check if we already have an index for this field
37-
const existingIndex = Array.from(collection.indexes.values()).find(
38-
(index) => index.matchesField(fieldPath)
39-
)
40-
41-
if (existingIndex) {
42-
continue // Index already exists
43-
}
44-
45-
// Create a new index for this field using the collection's createIndex method
46-
try {
47-
collection.createIndex((row) => (row as any)[fieldName], {
48-
name: `auto_${fieldName}`,
49-
indexType: BTreeIndex,
50-
})
51-
} catch (error) {
52-
console.warn(
53-
`Failed to create auto-index for field "${fieldName}":`,
54-
error
55-
)
56-
}
76+
ensureIndexForField(fieldName, fieldPath, collection)
5777
}
5878
}
5979

packages/db/src/query/compiler/index.ts

Lines changed: 103 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
LimitOffsetRequireOrderByError,
88
UnsupportedFromTypeError,
99
} from "../../errors.js"
10+
import { PropRef } from "../ir.js"
1011
import { compileExpression } from "./evaluators.js"
1112
import { processJoins } from "./joins.js"
1213
import { processGroupBy } from "./group-by.js"
@@ -18,6 +19,8 @@ import type {
1819
QueryIR,
1920
QueryRef,
2021
} from "../ir.js"
22+
import type { LazyCollectionCallbacks } from "./joins.js"
23+
import type { Collection } from "../../collection.js"
2124
import type {
2225
KeyedStream,
2326
NamespacedAndKeyedStream,
@@ -29,6 +32,8 @@ import type { QueryCache, QueryMapping } from "./types.js"
2932
* Result of query compilation including both the pipeline and collection-specific WHERE clauses
3033
*/
3134
export interface CompilationResult {
35+
/** The ID of the main collection */
36+
collectionId: string
3237
/** The compiled query pipeline */
3338
pipeline: ResultStream
3439
/** Map of collection aliases to their WHERE clauses for index optimization */
@@ -46,6 +51,9 @@ export interface CompilationResult {
4651
export function compileQuery(
4752
rawQuery: QueryIR,
4853
inputs: Record<string, KeyedStream>,
54+
collections: Record<string, Collection<any, any, any, any, any>>,
55+
callbacks: Record<string, LazyCollectionCallbacks>,
56+
lazyCollections: Set<string>,
4957
cache: QueryCache = new WeakMap(),
5058
queryMapping: QueryMapping = new WeakMap()
5159
): CompilationResult {
@@ -70,9 +78,16 @@ export function compileQuery(
7078
const tables: Record<string, KeyedStream> = {}
7179

7280
// Process the FROM clause to get the main table
73-
const { alias: mainTableAlias, input: mainInput } = processFrom(
81+
const {
82+
alias: mainTableAlias,
83+
input: mainInput,
84+
collectionId: mainCollectionId,
85+
} = processFrom(
7486
query.from,
7587
allInputs,
88+
collections,
89+
callbacks,
90+
lazyCollections,
7691
cache,
7792
queryMapping
7893
)
@@ -96,10 +111,15 @@ export function compileQuery(
96111
pipeline,
97112
query.join,
98113
tables,
114+
mainCollectionId,
99115
mainTableAlias,
100116
allInputs,
101117
cache,
102-
queryMapping
118+
queryMapping,
119+
collections,
120+
callbacks,
121+
lazyCollections,
122+
rawQuery
103123
)
104124
}
105125

@@ -249,6 +269,7 @@ export function compileQuery(
249269
const result = resultPipeline
250270
// Cache the result before returning (use original query as key)
251271
const compilationResult = {
272+
collectionId: mainCollectionId,
252273
pipeline: result,
253274
collectionWhereClauses,
254275
}
@@ -275,6 +296,7 @@ export function compileQuery(
275296
const result = resultPipeline
276297
// Cache the result before returning (use original query as key)
277298
const compilationResult = {
299+
collectionId: mainCollectionId,
278300
pipeline: result,
279301
collectionWhereClauses,
280302
}
@@ -289,16 +311,19 @@ export function compileQuery(
289311
function processFrom(
290312
from: CollectionRef | QueryRef,
291313
allInputs: Record<string, KeyedStream>,
314+
collections: Record<string, Collection>,
315+
callbacks: Record<string, LazyCollectionCallbacks>,
316+
lazyCollections: Set<string>,
292317
cache: QueryCache,
293318
queryMapping: QueryMapping
294-
): { alias: string; input: KeyedStream } {
319+
): { alias: string; input: KeyedStream; collectionId: string } {
295320
switch (from.type) {
296321
case `collectionRef`: {
297322
const input = allInputs[from.collection.id]
298323
if (!input) {
299324
throw new CollectionInputNotFoundError(from.collection.id)
300325
}
301-
return { alias: from.alias, input }
326+
return { alias: from.alias, input, collectionId: from.collection.id }
302327
}
303328
case `queryRef`: {
304329
// Find the original query for caching purposes
@@ -308,6 +333,9 @@ function processFrom(
308333
const subQueryResult = compileQuery(
309334
originalQuery,
310335
allInputs,
336+
collections,
337+
callbacks,
338+
lazyCollections,
311339
cache,
312340
queryMapping
313341
)
@@ -324,7 +352,11 @@ function processFrom(
324352
})
325353
)
326354

327-
return { alias: from.alias, input: extractedInput }
355+
return {
356+
alias: from.alias,
357+
input: extractedInput,
358+
collectionId: subQueryResult.collectionId,
359+
}
328360
}
329361
default:
330362
throw new UnsupportedFromTypeError((from as any).type)
@@ -380,3 +412,69 @@ function mapNestedQueries(
380412
}
381413
}
382414
}
415+
416+
function getRefFromAlias(
417+
query: QueryIR,
418+
alias: string
419+
): CollectionRef | QueryRef | void {
420+
if (query.from.alias === alias) {
421+
return query.from
422+
}
423+
424+
for (const join of query.join || []) {
425+
if (join.from.alias === alias) {
426+
return join.from
427+
}
428+
}
429+
}
430+
431+
/**
432+
* Follows the given reference in a query
433+
* until its finds the root field the reference points to.
434+
* @returns The collection, its alias, and the path to the root field in this collection
435+
*/
436+
export function followRef(
437+
query: QueryIR,
438+
ref: PropRef<any>,
439+
collection: Collection
440+
): { collection: Collection; path: Array<string> } | void {
441+
if (ref.path.length === 0) {
442+
return
443+
}
444+
445+
if (ref.path.length === 1) {
446+
// This field should be part of this collection
447+
const field = ref.path[0]!
448+
// is it part of the select clause?
449+
if (query.select) {
450+
const selectedField = query.select[field]
451+
if (selectedField && selectedField.type === `ref`) {
452+
return followRef(query, selectedField, collection)
453+
}
454+
}
455+
456+
// Either this field is not part of the select clause
457+
// and thus it must be part of the collection itself
458+
// or it is part of the select but is not a reference
459+
// so we can stop here and don't have to follow it
460+
return { collection, path: [field] }
461+
}
462+
463+
if (ref.path.length > 1) {
464+
// This is a nested field
465+
const [alias, ...rest] = ref.path
466+
const aliasRef = getRefFromAlias(query, alias!)
467+
if (!aliasRef) {
468+
return
469+
}
470+
471+
if (aliasRef.type === `queryRef`) {
472+
return followRef(aliasRef.query, new PropRef(rest), collection)
473+
} else {
474+
// This is a reference to a collection
475+
// we can't follow it further
476+
// so the field must be on the collection itself
477+
return { collection: aliasRef.collection, path: rest }
478+
}
479+
}
480+
}

0 commit comments

Comments
 (0)