Skip to content

Commit 92febbf

Browse files
authored
optimise key loading into query graph (#526)
1 parent b487430 commit 92febbf

File tree

6 files changed

+34
-37
lines changed

6 files changed

+34
-37
lines changed

.changeset/afraid-coats-type.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@tanstack/db-ivm": patch
3+
"@tanstack/db": patch
4+
---
5+
6+
optimise key loading into query graph

packages/db-ivm/src/operators/tap.ts

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,36 +5,34 @@ import type { DifferenceStreamReader } from "../graph.js"
55
import type { MultiSet } from "../multiset.js"
66

77
/**
8-
* Operator that applies a function to each element in the input stream
8+
* Operator that applies a function to each multi-set in the input stream
99
*/
1010
export class TapOperator<T> extends LinearUnaryOperator<T, T> {
11-
#f: (data: T) => void
11+
#f: (data: MultiSet<T>) => void
1212

1313
constructor(
1414
id: number,
1515
inputA: DifferenceStreamReader<T>,
1616
output: DifferenceStreamWriter<T>,
17-
f: (data: T) => void
17+
f: (data: MultiSet<T>) => void
1818
) {
1919
super(id, inputA, output)
2020
this.#f = f
2121
}
2222

2323
inner(collection: MultiSet<T>): MultiSet<T> {
24-
return collection.map((data) => {
25-
this.#f(data)
26-
return data
27-
})
24+
this.#f(collection)
25+
return collection
2826
}
2927
}
3028

3129
/**
32-
* Invokes a function for each element in the input stream.
30+
* Invokes a function for each multi-set in the input stream.
3331
* This operator doesn't modify the stream and is used to perform side effects.
34-
* @param f - The function to invoke on each element
32+
* @param f - The function to invoke on each multi-set
3533
* @returns The input stream
3634
*/
37-
export function tap<T>(f: (data: T) => void): PipedOperator<T, T> {
35+
export function tap<T>(f: (data: MultiSet<T>) => void): PipedOperator<T, T> {
3836
return (stream: IStreamBuilder<T>): IStreamBuilder<T> => {
3937
const output = new StreamBuilder<T>(
4038
stream.graph,

packages/db/src/query/compiler/joins.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ function processJoin(
243243
const activePipelineWithLoading: IStreamBuilder<
244244
[key: unknown, [originalKey: string, namespacedRow: NamespacedRow]]
245245
> = activePipeline.pipe(
246-
tap(([joinKey, _]) => {
246+
tap((data) => {
247247
if (deoptimized) {
248248
return
249249
}
@@ -270,10 +270,11 @@ function processJoin(
270270

271271
const { loadKeys, loadInitialState } = collectionCallbacks
272272

273-
if (index && index.supports(`eq`)) {
273+
if (index && index.supports(`in`)) {
274274
// Use the index to fetch the PKs of the rows in the lazy collection
275275
// that match this row from the active collection based on the value of the joinKey
276-
const matchingKeys = index.lookup(`eq`, joinKey)
276+
const joinKeys = data.getInner().map(([[joinKey]]) => joinKey)
277+
const matchingKeys = index.lookup(`in`, joinKeys)
277278
// Inform the lazy collection that those keys need to be loaded
278279
loadKeys(matchingKeys)
279280
} else {

packages/db/src/query/live/collection-subscriber.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,16 +138,20 @@ export class CollectionSubscriber<
138138
keys: Iterable<string | number>,
139139
filterFn: (item: object) => boolean
140140
) {
141+
const changes: Array<ChangeMessage<any, string | number>> = []
141142
for (const key of keys) {
142143
// Only load the key once
143144
if (this.sentKeys.has(key)) continue
144145

145146
const value = this.collection.get(key)
146147
if (value !== undefined && filterFn(value)) {
147148
this.sentKeys.add(key)
148-
this.sendChangesToPipeline([{ type: `insert`, key, value }])
149+
changes.push({ type: `insert`, key, value })
149150
}
150151
}
152+
if (changes.length > 0) {
153+
this.sendChangesToPipeline(changes)
154+
}
151155
}
152156

153157
private subscribeToAllChanges(

packages/db/tests/collection-auto-index.test.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -532,9 +532,9 @@ describe(`Collection Auto-Indexing`, () => {
532532
expect(tracker.stats.queriesExecuted).toEqual([
533533
{
534534
type: `index`,
535-
operation: `eq`,
535+
operation: `in`,
536536
field: `id2`,
537-
value: `other2`,
537+
value: [`other2`],
538538
},
539539
])
540540

@@ -651,9 +651,9 @@ describe(`Collection Auto-Indexing`, () => {
651651
expect(tracker.stats.queriesExecuted).toEqual([
652652
{
653653
type: `index`,
654-
operation: `eq`,
654+
operation: `in`,
655655
field: `id2`,
656-
value: `other2`,
656+
value: [`other2`],
657657
},
658658
])
659659

packages/db/tests/query/indexes.test.ts

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -780,9 +780,9 @@ describe(`Query Index Optimization`, () => {
780780
expect(tracker1.stats.queriesExecuted).toEqual([
781781
{
782782
type: `index`,
783-
operation: `eq`,
783+
operation: `in`,
784784
field: `id`,
785-
value: `1`,
785+
value: [`1`],
786786
},
787787
])
788788
} finally {
@@ -983,28 +983,16 @@ describe(`Query Index Optimization`, () => {
983983
expect(tracker2.stats.queriesExecuted).toEqual([
984984
{
985985
type: `index`,
986-
operation: `eq`,
987-
field: `id2`,
988-
value: `1`,
989-
},
990-
{
991-
type: `index`,
992-
operation: `eq`,
993-
field: `id2`,
994-
value: `3`,
995-
},
996-
{
997-
type: `index`,
998-
operation: `eq`,
986+
operation: `in`,
999987
field: `id2`,
1000-
value: `5`,
988+
value: [`1`, `3`, `5`],
1001989
},
1002990
])
1003991

1004992
expectIndexUsage(combinedStats, {
1005993
shouldUseIndex: true,
1006994
shouldUseFullScan: false,
1007-
indexCallCount: 4,
995+
indexCallCount: 2,
1008996
fullScanCallCount: 0,
1009997
})
1010998
} finally {
@@ -1187,9 +1175,9 @@ describe(`Query Index Optimization`, () => {
11871175
expect(tracker1.stats.queriesExecuted).toEqual([
11881176
{
11891177
type: `index`,
1190-
operation: `eq`,
1178+
operation: `in`,
11911179
field: `id`,
1192-
value: `1`,
1180+
value: [`1`],
11931181
},
11941182
])
11951183
} finally {

0 commit comments

Comments
 (0)