Skip to content

Commit 0cb7699

Browse files
authored
Modify topK and orderBy to track by key instead of by object reference (#405)
1 parent 1d98a77 commit 0cb7699

File tree

8 files changed

+479
-226
lines changed

8 files changed

+479
-226
lines changed

.changeset/polite-islands-kiss.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@tanstack/db-ivm": patch
3+
"@tanstack/db": patch
4+
---
5+
6+
Fix bug with orderBy that resulted in query results having less rows than the configured limit.

packages/db-ivm/src/operators/orderBy.ts

Lines changed: 11 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,9 @@ export function orderByWithFractionalIndexBase<
138138
) => Ve,
139139
options?: OrderByOptions<Ve>
140140
) {
141+
type KeyType = T extends KeyValue<infer K, unknown> ? K : never
142+
type ValueType = T extends KeyValue<unknown, infer V> ? V : never
143+
141144
const limit = options?.limit ?? Infinity
142145
const offset = options?.offset ?? 0
143146
const comparator =
@@ -151,37 +154,16 @@ export function orderByWithFractionalIndexBase<
151154

152155
return (
153156
stream: IStreamBuilder<T>
154-
): IStreamBuilder<
155-
KeyValue<
156-
T extends KeyValue<infer K, unknown> ? K : never,
157-
[T extends KeyValue<unknown, infer V> ? V : never, string]
158-
>
159-
> => {
160-
type KeyType = T extends KeyValue<infer K, unknown> ? K : never
161-
type ValueType = T extends KeyValue<unknown, infer V> ? V : never
162-
157+
): IStreamBuilder<[KeyType, [ValueType, string]]> => {
163158
return stream.pipe(
164-
map(
165-
([key, value]) =>
166-
[
167-
null,
168-
[
169-
key,
170-
valueExtractor(
171-
value as T extends KeyValue<unknown, infer V> ? V : never
172-
),
173-
],
174-
] as KeyValue<null, [KeyType, Ve]>
159+
topKFunction(
160+
(a: ValueType, b: ValueType) =>
161+
comparator(valueExtractor(a), valueExtractor(b)),
162+
{
163+
limit,
164+
offset,
165+
}
175166
),
176-
topKFunction((a, b) => comparator(a[1], b[1]), {
177-
limit,
178-
offset,
179-
}),
180-
map(([_, [[key], index]]) => [key, index] as KeyValue<KeyType, string>),
181-
innerJoin(stream),
182-
map(([key, [index, value]]) => {
183-
return [key, [value, index]] as KeyValue<KeyType, [ValueType, string]>
184-
}),
185167
consolidate()
186168
)
187169
}

packages/db-ivm/src/operators/topKWithFractionalIndex.ts

Lines changed: 69 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,9 @@ import { generateKeyBetween } from "fractional-indexing"
22
import { DifferenceStreamWriter, UnaryOperator } from "../graph.js"
33
import { StreamBuilder } from "../d2.js"
44
import { MultiSet } from "../multiset.js"
5-
import { Index } from "../indexes.js"
65
import { binarySearch, globalObjectIdGenerator } from "../utils.js"
76
import type { DifferenceStreamReader } from "../graph.js"
8-
import type { IStreamBuilder, KeyValue, PipedOperator } from "../types.js"
7+
import type { IStreamBuilder, PipedOperator } from "../types.js"
98

109
export interface TopKWithFractionalIndexOptions {
1110
limit?: number
@@ -158,31 +157,34 @@ class TopKArray<V> implements TopK<V> {
158157
* This operator maintains fractional indices for sorted elements
159158
* and only updates indices when elements move position
160159
*/
161-
export class TopKWithFractionalIndexOperator<K, V1> extends UnaryOperator<
162-
[K, V1],
163-
[K, IndexedValue<V1>]
160+
export class TopKWithFractionalIndexOperator<K, T> extends UnaryOperator<
161+
[K, T],
162+
[K, IndexedValue<T>]
164163
> {
165-
#index = new Index<K, V1>()
164+
#index: Map<K, number> = new Map() // maps keys to their multiplicity
166165

167166
/**
168167
* topK data structure that supports insertions and deletions
169168
* and returns changes to the topK.
170169
*/
171-
#topK: TopK<TaggedValue<V1>>
170+
#topK: TopK<TaggedValue<K, T>>
172171

173172
constructor(
174173
id: number,
175-
inputA: DifferenceStreamReader<[K, V1]>,
176-
output: DifferenceStreamWriter<[K, [V1, string]]>,
177-
comparator: (a: V1, b: V1) => number,
174+
inputA: DifferenceStreamReader<[K, T]>,
175+
output: DifferenceStreamWriter<[K, IndexedValue<T>]>,
176+
comparator: (a: T, b: T) => number,
178177
options: TopKWithFractionalIndexOptions
179178
) {
180179
super(id, inputA, output)
181180
const limit = options.limit ?? Infinity
182181
const offset = options.offset ?? 0
183-
const compareTaggedValues = (a: TaggedValue<V1>, b: TaggedValue<V1>) => {
182+
const compareTaggedValues = (
183+
a: TaggedValue<K, T>,
184+
b: TaggedValue<K, T>
185+
) => {
184186
// First compare on the value
185-
const valueComparison = comparator(untagValue(a), untagValue(b))
187+
const valueComparison = comparator(getVal(a), getVal(b))
186188
if (valueComparison !== 0) {
187189
return valueComparison
188190
}
@@ -197,13 +199,13 @@ export class TopKWithFractionalIndexOperator<K, V1> extends UnaryOperator<
197199
protected createTopK(
198200
offset: number,
199201
limit: number,
200-
comparator: (a: TaggedValue<V1>, b: TaggedValue<V1>) => number
201-
): TopK<TaggedValue<V1>> {
202+
comparator: (a: TaggedValue<K, T>, b: TaggedValue<K, T>) => number
203+
): TopK<TaggedValue<K, T>> {
202204
return new TopKArray(offset, limit, comparator)
203205
}
204206

205207
run(): void {
206-
const result: Array<[[K, [V1, string]], number]> = []
208+
const result: Array<[[K, IndexedValue<T>], number]> = []
207209
for (const message of this.inputMessages()) {
208210
for (const [item, multiplicity] of message.getInner()) {
209211
const [key, value] = item
@@ -218,27 +220,25 @@ export class TopKWithFractionalIndexOperator<K, V1> extends UnaryOperator<
218220

219221
processElement(
220222
key: K,
221-
value: V1,
223+
value: T,
222224
multiplicity: number,
223-
result: Array<[[K, [V1, string]], number]>
225+
result: Array<[[K, IndexedValue<T>], number]>
224226
): void {
225-
const oldMultiplicity = this.#index.getMultiplicity(key, value)
226-
this.#index.addValue(key, [value, multiplicity])
227-
const newMultiplicity = this.#index.getMultiplicity(key, value)
227+
const { oldMultiplicity, newMultiplicity } = this.addKey(key, multiplicity)
228228

229-
let res: TopKChanges<TaggedValue<V1>> = {
229+
let res: TopKChanges<TaggedValue<K, T>> = {
230230
moveIn: null,
231231
moveOut: null,
232232
}
233233
if (oldMultiplicity <= 0 && newMultiplicity > 0) {
234234
// The value was invisible but should now be visible
235235
// Need to insert it into the array of sorted values
236-
const taggedValue = tagValue(value)
236+
const taggedValue = tagValue(key, value)
237237
res = this.#topK.insert(taggedValue)
238238
} else if (oldMultiplicity > 0 && newMultiplicity <= 0) {
239239
// The value was visible but should now be invisible
240240
// Need to remove it from the array of sorted values
241-
const taggedValue = tagValue(value)
241+
const taggedValue = tagValue(key, value)
242242
res = this.#topK.delete(taggedValue)
243243
} else {
244244
// The value was invisible and it remains invisible
@@ -247,26 +247,45 @@ export class TopKWithFractionalIndexOperator<K, V1> extends UnaryOperator<
247247
}
248248

249249
if (res.moveIn) {
250-
const valueWithoutTieBreaker = mapValue(res.moveIn, untagValue)
251-
result.push([[key, valueWithoutTieBreaker], 1])
250+
const index = getIndex(res.moveIn)
251+
const taggedValue = getValue(res.moveIn)
252+
const k = getKey(taggedValue)
253+
const val = getVal(taggedValue)
254+
result.push([[k, [val, index]], 1])
252255
}
253256

254257
if (res.moveOut) {
255-
const valueWithoutTieBreaker = mapValue(res.moveOut, untagValue)
256-
result.push([[key, valueWithoutTieBreaker], -1])
258+
const index = getIndex(res.moveOut)
259+
const taggedValue = getValue(res.moveOut)
260+
const k = getKey(taggedValue)
261+
const val = getVal(taggedValue)
262+
result.push([[k, [val, index]], -1])
257263
}
258264

259265
return
260266
}
267+
268+
private getMultiplicity(key: K): number {
269+
return this.#index.get(key) ?? 0
270+
}
271+
272+
private addKey(
273+
key: K,
274+
multiplicity: number
275+
): { oldMultiplicity: number; newMultiplicity: number } {
276+
const oldMultiplicity = this.getMultiplicity(key)
277+
const newMultiplicity = oldMultiplicity + multiplicity
278+
if (newMultiplicity === 0) {
279+
this.#index.delete(key)
280+
} else {
281+
this.#index.set(key, newMultiplicity)
282+
}
283+
return { oldMultiplicity, newMultiplicity }
284+
}
261285
}
262286

263287
/**
264288
* Limits the number of results based on a comparator, with optional offset.
265-
* This works on a keyed stream, where the key is the first element of the tuple.
266-
* The ordering is within a key group, i.e. elements are sorted within a key group
267-
* and the limit + offset is applied to that sorted group.
268-
* To order the entire stream, key by the same value for all elements such as null.
269-
*
270289
* Uses fractional indexing to minimize the number of changes when elements move positions.
271290
* Each element is assigned a fractional index that is lexicographically sortable.
272291
* When elements move, only the indices of the moved elements are updated, not all elements.
@@ -275,26 +294,22 @@ export class TopKWithFractionalIndexOperator<K, V1> extends UnaryOperator<
275294
* @param options - An optional object containing limit and offset properties
276295
* @returns A piped operator that orders the elements and limits the number of results
277296
*/
278-
export function topKWithFractionalIndex<
279-
KType extends T extends KeyValue<infer K, infer _V> ? K : never,
280-
V1Type extends T extends KeyValue<KType, infer V> ? V : never,
281-
T,
282-
>(
283-
comparator: (a: V1Type, b: V1Type) => number,
297+
export function topKWithFractionalIndex<KType, T>(
298+
comparator: (a: T, b: T) => number,
284299
options?: TopKWithFractionalIndexOptions
285-
): PipedOperator<T, KeyValue<KType, [V1Type, string]>> {
300+
): PipedOperator<[KType, T], [KType, IndexedValue<T>]> {
286301
const opts = options || {}
287302

288303
return (
289-
stream: IStreamBuilder<T>
290-
): IStreamBuilder<KeyValue<KType, [V1Type, string]>> => {
291-
const output = new StreamBuilder<KeyValue<KType, [V1Type, string]>>(
304+
stream: IStreamBuilder<[KType, T]>
305+
): IStreamBuilder<[KType, IndexedValue<T>]> => {
306+
const output = new StreamBuilder<[KType, IndexedValue<T>]>(
292307
stream.graph,
293-
new DifferenceStreamWriter<KeyValue<KType, [V1Type, string]>>()
308+
new DifferenceStreamWriter<[KType, IndexedValue<T>]>()
294309
)
295-
const operator = new TopKWithFractionalIndexOperator<KType, V1Type>(
310+
const operator = new TopKWithFractionalIndexOperator<KType, T>(
296311
stream.graph.getNextOperatorId(),
297-
stream.connectReader() as DifferenceStreamReader<KeyValue<KType, V1Type>>,
312+
stream.connectReader(),
298313
output.writer,
299314
comparator,
300315
opts
@@ -324,24 +339,21 @@ export function getIndex<V>(indexedVal: IndexedValue<V>): FractionalIndex {
324339
return indexedVal[1]
325340
}
326341

327-
function mapValue<V, W>(
328-
indexedVal: IndexedValue<V>,
329-
f: (value: V) => W
330-
): IndexedValue<W> {
331-
return [f(getValue(indexedVal)), getIndex(indexedVal)]
332-
}
333-
334342
export type Tag = number
335-
export type TaggedValue<V> = [V, Tag]
343+
export type TaggedValue<K, V> = [K, V, Tag]
336344

337-
function tagValue<V>(value: V): TaggedValue<V> {
338-
return [value, globalObjectIdGenerator.getId(value)]
345+
function tagValue<K, V>(key: K, value: V): TaggedValue<K, V> {
346+
return [key, value, globalObjectIdGenerator.getId(key)]
339347
}
340348

341-
function untagValue<V>(tieBreakerTaggedValue: TaggedValue<V>): V {
349+
function getKey<K, V>(tieBreakerTaggedValue: TaggedValue<K, V>): K {
342350
return tieBreakerTaggedValue[0]
343351
}
344352

345-
function getTag<V>(tieBreakerTaggedValue: TaggedValue<V>): Tag {
353+
function getVal<K, V>(tieBreakerTaggedValue: TaggedValue<K, V>): V {
346354
return tieBreakerTaggedValue[1]
347355
}
356+
357+
function getTag<K, V>(tieBreakerTaggedValue: TaggedValue<K, V>): Tag {
358+
return tieBreakerTaggedValue[2]
359+
}

packages/db-ivm/src/operators/topKWithFractionalIndexBTree.ts

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@ import {
77
getValue,
88
indexedValue,
99
} from "./topKWithFractionalIndex.js"
10-
import type { IStreamBuilder, KeyValue, PipedOperator } from "../types.js"
11-
import type { DifferenceStreamReader } from "../graph.js"
10+
import type { IStreamBuilder, PipedOperator } from "../types.js"
1211
import type {
1312
IndexedValue,
1413
TaggedValue,
@@ -238,13 +237,13 @@ class TopKTree<V> implements TopK<V> {
238237
*/
239238
export class TopKWithFractionalIndexBTreeOperator<
240239
K,
241-
V1,
242-
> extends TopKWithFractionalIndexOperator<K, V1> {
240+
T,
241+
> extends TopKWithFractionalIndexOperator<K, T> {
243242
protected override createTopK(
244243
offset: number,
245244
limit: number,
246-
comparator: (a: TaggedValue<V1>, b: TaggedValue<V1>) => number
247-
): TopK<TaggedValue<V1>> {
245+
comparator: (a: TaggedValue<K, T>, b: TaggedValue<K, T>) => number
246+
): TopK<TaggedValue<K, T>> {
248247
if (BTree === undefined) {
249248
throw new Error(
250249
`B+ tree not loaded. You need to call loadBTree() before using TopKWithFractionalIndexBTreeOperator.`
@@ -269,14 +268,10 @@ export class TopKWithFractionalIndexBTreeOperator<
269268
* @param options - An optional object containing limit and offset properties
270269
* @returns A piped operator that orders the elements and limits the number of results
271270
*/
272-
export function topKWithFractionalIndexBTree<
273-
KType extends T extends KeyValue<infer K, infer _V> ? K : never,
274-
V1Type extends T extends KeyValue<KType, infer V> ? V : never,
275-
T,
276-
>(
277-
comparator: (a: V1Type, b: V1Type) => number,
271+
export function topKWithFractionalIndexBTree<KType, T>(
272+
comparator: (a: T, b: T) => number,
278273
options?: TopKWithFractionalIndexOptions
279-
): PipedOperator<T, KeyValue<KType, [V1Type, string]>> {
274+
): PipedOperator<[KType, T], [KType, IndexedValue<T>]> {
280275
const opts = options || {}
281276

282277
if (BTree === undefined) {
@@ -286,15 +281,15 @@ export function topKWithFractionalIndexBTree<
286281
}
287282

288283
return (
289-
stream: IStreamBuilder<T>
290-
): IStreamBuilder<KeyValue<KType, [V1Type, string]>> => {
291-
const output = new StreamBuilder<KeyValue<KType, [V1Type, string]>>(
284+
stream: IStreamBuilder<[KType, T]>
285+
): IStreamBuilder<[KType, IndexedValue<T>]> => {
286+
const output = new StreamBuilder<[KType, IndexedValue<T>]>(
292287
stream.graph,
293-
new DifferenceStreamWriter<KeyValue<KType, [V1Type, string]>>()
288+
new DifferenceStreamWriter<[KType, IndexedValue<T>]>()
294289
)
295-
const operator = new TopKWithFractionalIndexOperator<KType, V1Type>(
290+
const operator = new TopKWithFractionalIndexBTreeOperator<KType, T>(
296291
stream.graph.getNextOperatorId(),
297-
stream.connectReader() as DifferenceStreamReader<KeyValue<KType, V1Type>>,
292+
stream.connectReader(),
298293
output.writer,
299294
comparator,
300295
opts

0 commit comments

Comments
 (0)