Skip to content

Commit 2f87216

Browse files
kevin-dpsamwillis
andauthored
Move data loading concerns from join/order by to CollectionSubscription (#564)
Co-authored-by: Sam Willis <[email protected]>
1 parent 5e2932f commit 2f87216

37 files changed

+925
-681
lines changed

.changeset/afraid-camels-tickle.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@tanstack/db": patch
3+
---
4+
5+
optimise the live query graph execution by removing recursive calls to graph.run

.changeset/fifty-ways-hang.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@tanstack/db-ivm": patch
3+
---
4+
5+
Fix a bug with distinct operator

.changeset/plain-lights-end.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
"@tanstack/db": minor
3+
"@tanstack/angular-db": patch
4+
"@tanstack/svelte-db": patch
5+
"@tanstack/react-db": patch
6+
"@tanstack/solid-db": patch
7+
"@tanstack/vue-db": patch
8+
---
9+
10+
Let collection.subscribeChanges return a subscription object. Move all data loading code related to optimizations into that subscription object.

packages/angular-db/src/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,11 +162,12 @@ export function injectLiveQuery(opts: any) {
162162
}
163163

164164
// Subscribe to changes
165-
unsub = currentCollection.subscribeChanges(
165+
const subscription = currentCollection.subscribeChanges(
166166
(_: Array<ChangeMessage<any>>) => {
167167
syncDataFromCollection(currentCollection)
168168
}
169169
)
170+
unsub = subscription.unsubscribe.bind(subscription)
170171

171172
// Handle ready state
172173
currentCollection.onFirstReady(() => {

packages/angular-db/tests/inject-live-query.test.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,9 @@ function createMockCollection<T extends object, K extends string | number>(
102102
size: () => map.size,
103103
subscribeChanges: (cb: (changes: Array<any>) => void) => {
104104
subs.add(cb)
105-
return () => subs.delete(cb)
105+
return {
106+
unsubscribe: () => subs.delete(cb),
107+
}
106108
},
107109
onFirstReady: (cb: () => void) => {
108110
if (status === `ready`) {

packages/db-ivm/src/operators/distinct.ts

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,25 @@ import { hash } from "../hashing/index.js"
44
import { MultiSet } from "../multiset.js"
55
import type { Hash } from "../hashing/index.js"
66
import type { DifferenceStreamReader } from "../graph.js"
7-
import type { IStreamBuilder } from "../types.js"
7+
import type { IStreamBuilder, KeyValue } from "../types.js"
88

99
type Multiplicity = number
1010

11+
type GetValue<T> = T extends KeyValue<any, infer V> ? V : never
12+
1113
/**
1214
* Operator that removes duplicates
1315
*/
14-
export class DistinctOperator<T> extends UnaryOperator<T> {
16+
export class DistinctOperator<
17+
T extends KeyValue<any, any>,
18+
> extends UnaryOperator<T, KeyValue<number, GetValue<T>>> {
1519
#by: (value: T) => any
1620
#values: Map<Hash, Multiplicity> // keeps track of the number of times each value has been seen
1721

1822
constructor(
1923
id: number,
2024
input: DifferenceStreamReader<T>,
21-
output: DifferenceStreamWriter<T>,
25+
output: DifferenceStreamWriter<KeyValue<number, GetValue<T>>>,
2226
by: (value: T) => any = (value: T) => value
2327
) {
2428
super(id, input, output)
@@ -39,12 +43,11 @@ export class DistinctOperator<T> extends UnaryOperator<T> {
3943
this.#values.get(hashedValue) ??
4044
0
4145
const newMultiplicity = oldMultiplicity + diff
42-
4346
updatedValues.set(hashedValue, [newMultiplicity, value])
4447
}
4548
}
4649

47-
const result: Array<[T, number]> = []
50+
const result: Array<[KeyValue<number, GetValue<T>>, number]> = []
4851

4952
// Check which values became visible or disappeared
5053
for (const [
@@ -62,11 +65,11 @@ export class DistinctOperator<T> extends UnaryOperator<T> {
6265
if (oldMultiplicity <= 0 && newMultiplicity > 0) {
6366
// The value wasn't present in the stream
6467
// but with this change it is now present in the stream
65-
result.push([value, 1])
68+
result.push([[hash(this.#by(value)), value[1]], 1])
6669
} else if (oldMultiplicity > 0 && newMultiplicity <= 0) {
6770
// The value was present in the stream
6871
// but with this change it is no longer present in the stream
69-
result.push([value, -1])
72+
result.push([[hash(this.#by(value)), value[1]], -1])
7073
}
7174
}
7275

@@ -79,7 +82,9 @@ export class DistinctOperator<T> extends UnaryOperator<T> {
7982
/**
8083
* Removes duplicate values
8184
*/
82-
export function distinct<T>(by: (value: T) => any = (value: T) => value) {
85+
export function distinct<T extends KeyValue<any, any>>(
86+
by: (value: T) => any = (value: T) => value
87+
) {
8388
return (stream: IStreamBuilder<T>): IStreamBuilder<T> => {
8489
const output = new StreamBuilder<T>(
8590
stream.graph,

packages/db-ivm/tests/operators/distinct.test.ts

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { MultiSet } from "../../src/multiset.js"
44
import { distinct } from "../../src/operators/distinct.js"
55
import { output } from "../../src/operators/output.js"
66
import { MessageTracker, assertResults } from "../test-utils.js"
7+
import { hash } from "../../src/hashing/index.js"
78

89
describe(`Operators`, () => {
910
describe(`Efficient distinct operation`, () => {
@@ -39,9 +40,9 @@ function testDistinct() {
3940

4041
expect(data).toEqual([
4142
[
42-
[[1, `a`], 1],
43-
[[2, `b`], 1],
44-
[[2, `c`], 1],
43+
[[hash([1, `a`]), `a`], 1],
44+
[[hash([2, `b`]), `b`], 1],
45+
[[hash([2, `c`]), `c`], 1],
4546
],
4647
])
4748
})
@@ -74,7 +75,7 @@ function testDistinct() {
7475

7576
graph.run()
7677

77-
const data = messages.map((m) => m.getInner())[0]
78+
const data = messages.map((m) => m.getInner())[0]!
7879
const countries = data
7980
.map(([[_, value], multiplicity]) => [value.country, multiplicity])
8081
.sort()
@@ -118,8 +119,8 @@ function testDistinct() {
118119
`distinct with updates - initial`,
119120
initialResult,
120121
[
121-
[1, `a`],
122-
[1, `b`],
122+
[hash([1, `a`]), `a`],
123+
[hash([1, `b`]), `b`],
123124
], // Should have both distinct values
124125
4 // Max expected messages
125126
)
@@ -140,7 +141,7 @@ function testDistinct() {
140141
assertResults(
141142
`distinct with updates - second batch`,
142143
secondResult,
143-
[[1, `c`]], // Should only have 'c' remaining
144+
[[hash([1, `c`]), `c`]], // Should only have 'c' remaining
144145
4 // Max expected messages
145146
)
146147

@@ -186,9 +187,9 @@ function testDistinct() {
186187

187188
expect(data).toEqual([
188189
[
189-
[[`key1`, 1], 1],
190-
[[`key1`, 2], 1],
191-
[[`key2`, 1], 1],
190+
[[hash([`key1`, 1]), 1], 1],
191+
[[hash([`key1`, 2]), 2], 1],
192+
[[hash([`key2`, 1]), 1], 1],
192193
],
193194
])
194195
})
@@ -224,8 +225,8 @@ function testDistinct() {
224225
`distinct with multiple batches that cancel out`,
225226
result,
226227
[
227-
[`key1`, 1], // Should remain (multiplicity 2 -> 1 in distinct)
228-
[`key2`, 1], // Should remain (multiplicity 2 -> 1 in distinct)
228+
[hash([`key1`, 1]), 1], // Should remain (multiplicity 2 -> 1 in distinct)
229+
[hash([`key2`, 1]), 1], // Should remain (multiplicity 2 -> 1 in distinct)
229230
],
230231
6 // Max expected messages (generous upper bound)
231232
)

packages/db/src/change-events.ts

Lines changed: 18 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ export function currentStateAsChanges<
4646
TKey extends string | number,
4747
>(
4848
collection: CollectionLike<T, TKey>,
49-
options: CurrentStateAsChangesOptions<T> = {}
50-
): Array<ChangeMessage<T>> {
49+
options: CurrentStateAsChangesOptions = {}
50+
): Array<ChangeMessage<T>> | void {
5151
// Helper function to collect filtered results
5252
const collectFilteredResults = (
5353
filterFn?: (value: T) => boolean
@@ -66,31 +66,17 @@ export function currentStateAsChanges<
6666
return result
6767
}
6868

69-
if (!options.where && !options.whereExpression) {
69+
// TODO: handle orderBy and limit options
70+
// by calling optimizeOrderedLimit
71+
72+
if (!options.where) {
7073
// No filtering, return all items
7174
return collectFilteredResults()
7275
}
7376

7477
// There's a where clause, let's see if we can use an index
7578
try {
76-
let expression: BasicExpression<boolean>
77-
78-
if (options.whereExpression) {
79-
// Use the pre-compiled expression directly
80-
expression = options.whereExpression
81-
} else if (options.where) {
82-
// Create the single-row refProxy for the callback
83-
const singleRowRefProxy = createSingleRowRefProxy<T>()
84-
85-
// Execute the callback to get the expression
86-
const whereExpression = options.where(singleRowRefProxy)
87-
88-
// Convert the result to a BasicExpression
89-
expression = toExpression(whereExpression)
90-
} else {
91-
// This should never happen due to the check above, but TypeScript needs it
92-
return []
93-
}
79+
const expression: BasicExpression<boolean> = options.where
9480

9581
// Try to optimize the query using indexes
9682
const optimizationResult = optimizeExpressionWithIndexes(
@@ -113,11 +99,11 @@ export function currentStateAsChanges<
11399
}
114100
return result
115101
} else {
116-
// No index found or complex expression, fall back to full scan with filter
117-
const filterFn = options.where
118-
? createFilterFunction(options.where)
119-
: createFilterFunctionFromExpression(expression)
102+
if (options.optimizedOnly) {
103+
return
104+
}
120105

106+
const filterFn = createFilterFunctionFromExpression(expression)
121107
return collectFilteredResults(filterFn)
122108
}
123109
} catch (error) {
@@ -127,9 +113,11 @@ export function currentStateAsChanges<
127113
error
128114
)
129115

130-
const filterFn = options.where
131-
? createFilterFunction(options.where)
132-
: createFilterFunctionFromExpression(options.whereExpression!)
116+
const filterFn = createFilterFunctionFromExpression(options.where)
117+
118+
if (options.optimizedOnly) {
119+
return
120+
}
133121

134122
return collectFilteredResults(filterFn)
135123
}
@@ -201,11 +189,9 @@ export function createFilterFunctionFromExpression<T extends object>(
201189
*/
202190
export function createFilteredCallback<T extends object>(
203191
originalCallback: (changes: Array<ChangeMessage<T>>) => void,
204-
options: SubscribeChangesOptions<T>
192+
options: SubscribeChangesOptions
205193
): (changes: Array<ChangeMessage<T>>) => void {
206-
const filterFn = options.whereExpression
207-
? createFilterFunctionFromExpression(options.whereExpression)
208-
: createFilterFunction(options.where!)
194+
const filterFn = createFilterFunctionFromExpression(options.whereExpression!)
209195

210196
return (changes: Array<ChangeMessage<T>>) => {
211197
const filteredChanges: Array<ChangeMessage<T>> = []

0 commit comments

Comments
 (0)