diff --git a/.changeset/chilly-icons-design.md b/.changeset/chilly-icons-design.md new file mode 100644 index 0000000..1c02663 --- /dev/null +++ b/.changeset/chilly-icons-design.md @@ -0,0 +1,5 @@ +--- +'@electric-sql/d2mini': patch +--- + +Introduce topKWithFractionalIndexBTree and orderByWithFractionalIndexBTree operators. These variants use a B+ tree which should be efficient for big collections (logarithmic time). diff --git a/packages/d2mini/package.json b/packages/d2mini/package.json index 34ad107..b7467b0 100644 --- a/packages/d2mini/package.json +++ b/packages/d2mini/package.json @@ -50,6 +50,7 @@ }, "dependencies": { "fractional-indexing": "^3.2.0", - "murmurhash-js": "^1.0.0" + "murmurhash-js": "^1.0.0", + "sorted-btree": "^1.8.1" } } diff --git a/packages/d2mini/src/indexes.ts b/packages/d2mini/src/indexes.ts index c1c88db..96d965c 100644 --- a/packages/d2mini/src/indexes.ts +++ b/packages/d2mini/src/indexes.ts @@ -35,6 +35,13 @@ export class Index { return [...valueMap.values()] } + getMultiplicity(key: K, value: V): number { + const valueMap = this.#inner.get(key) + const valueHash = hash(value) + const [, multiplicity] = valueMap.get(valueHash) + return multiplicity + } + entries() { return this.#inner.entries() } diff --git a/packages/d2mini/src/operators/orderBy.ts b/packages/d2mini/src/operators/orderBy.ts index 6796f1e..8161086 100644 --- a/packages/d2mini/src/operators/orderBy.ts +++ b/packages/d2mini/src/operators/orderBy.ts @@ -2,6 +2,7 @@ import { IStreamBuilder } from '../types' import { KeyValue } from '../types.js' import { topK, topKWithIndex } from './topK.js' import { topKWithFractionalIndex } from './topKWithFractionalIndex.js' +import { topKWithFractionalIndexBTree } from './topKWithFractionalIndexBTree.js' import { map } from './map.js' import { innerJoin } from './join.js' import { consolidate } from './consolidate.js' @@ -128,19 +129,11 @@ export function orderByWithIndex< } } -/** - * Orders the elements and limits the number of results, with optional offset and - * annotates the value with a fractional index. - * This requires a keyed stream, and uses the `topKWithFractionalIndex` operator to order all the elements. - * - * @param valueExtractor - A function that extracts the value to order by from the element - * @param options - An optional object containing comparator, limit and offset properties - * @returns A piped operator that orders the elements and limits the number of results - */ -export function orderByWithFractionalIndex< +function orderByWithFractionalIndexBase< T extends KeyValue, Ve = unknown, >( + topK: typeof topKWithFractionalIndex, valueExtractor: ( value: T extends KeyValue ? V : never, ) => Ve, @@ -181,7 +174,7 @@ export function orderByWithFractionalIndex< ], ] as KeyValue, ), - topKWithFractionalIndex((a, b) => comparator(a[1], b[1]), { + topK((a, b) => comparator(a[1], b[1]), { limit, offset, }), @@ -194,3 +187,44 @@ export function orderByWithFractionalIndex< ) } } + +/** + * Orders the elements and limits the number of results, with optional offset and + * annotates the value with a fractional index. + * This requires a keyed stream, and uses the `topKWithFractionalIndex` operator to order all the elements. + * + * @param valueExtractor - A function that extracts the value to order by from the element + * @param options - An optional object containing comparator, limit and offset properties + * @returns A piped operator that orders the elements and limits the number of results + */ +export function orderByWithFractionalIndex< + T extends KeyValue, + Ve = unknown, +>( + valueExtractor: ( + value: T extends KeyValue ? V : never, + ) => Ve, + options?: OrderByOptions, +) { + return orderByWithFractionalIndexBase( + topKWithFractionalIndex, + valueExtractor, + options, + ) +} + +export function orderByWithFractionalIndexBTree< + T extends KeyValue, + Ve = unknown, +>( + valueExtractor: ( + value: T extends KeyValue ? V : never, + ) => Ve, + options?: OrderByOptions, +) { + return orderByWithFractionalIndexBase( + topKWithFractionalIndexBTree, + valueExtractor, + options, + ) +} diff --git a/packages/d2mini/src/operators/topKWithFractionalIndex.ts b/packages/d2mini/src/operators/topKWithFractionalIndex.ts index 6d7ed74..7c49f86 100644 --- a/packages/d2mini/src/operators/topKWithFractionalIndex.ts +++ b/packages/d2mini/src/operators/topKWithFractionalIndex.ts @@ -8,291 +8,258 @@ import { StreamBuilder } from '../d2.js' import { MultiSet } from '../multiset.js' import { Index } from '../indexes.js' import { generateKeyBetween } from 'fractional-indexing' -import { hash } from '../utils.js' +import { binarySearch, hash } from '../utils.js' -interface TopKWithFractionalIndexOptions { +export interface TopKWithFractionalIndexOptions { limit?: number offset?: number } +export type TopKChanges = { + /** Indicates which element moves into the topK (if any) */ + moveIn: IndexedValue | null + /** Indicates which element moves out of the topK (if any) */ + moveOut: IndexedValue | null +} + /** - * Operator for fractional indexed topK operations - * This operator maintains fractional indices for sorted elements - * and only updates indices when elements move position + * A topK data structure that supports insertions and deletions + * and returns changes to the topK. */ -export class TopKWithFractionalIndexOperator extends UnaryOperator< - [K, V1], - [K, [V1, string]] -> { - #index = new Index() - #indexOut = new Index() - #comparator: (a: V1, b: V1) => number - #limit: number - #offset: number +export interface TopK { + insert(value: V): TopKChanges + delete(value: V): TopKChanges +} + +/** + * Implementation of a topK data structure. + * Uses a sorted array internally to store the values and keeps a topK window over that array. + * Inserts and deletes are O(n) operations because worst case an element is inserted/deleted + * at the start of the array which causes all the elements to shift to the right/left. + */ +class TopKArray implements TopK { + #sortedValues: Array> = [] + #comparator: (a: V, b: V) => number + #topKStart: number + #topKEnd: number constructor( - id: number, - inputA: DifferenceStreamReader<[K, V1]>, - output: DifferenceStreamWriter<[K, [V1, string]]>, - comparator: (a: V1, b: V1) => number, - options: TopKWithFractionalIndexOptions, + offset: number, + limit: number, + comparator: (a: V, b: V) => number, ) { - super(id, inputA, output) + this.#topKStart = offset + this.#topKEnd = offset + limit this.#comparator = comparator - this.#limit = options.limit ?? Infinity - this.#offset = options.offset ?? 0 } - run(): void { - const keysTodo = new Set() - - for (const message of this.inputMessages()) { - for (const [item, multiplicity] of message.getInner()) { - const [key, value] = item - this.#index.addValue(key, [value, multiplicity]) - keysTodo.add(key) - } - } - - const result: [[K, [V1, string]], number][] = [] - - for (const key of keysTodo) { - const curr = this.#index.get(key) - const currOut = this.#indexOut.get(key) - - // Sort the current values - const consolidated = new MultiSet(curr).consolidate() - const sortedValues = consolidated - .getInner() - .sort((a, b) => this.#comparator(a[0] as V1, b[0] as V1)) - .slice(this.#offset, this.#offset + this.#limit) - - // Create a map for quick value lookup with pre-stringified keys - const currValueMap = new Map() - const prevOutputMap = new Map() - - // Pre-stringify all values once - const valueKeys: string[] = [] - const valueToKey = new Map() - - // Process current values - for (const [value, multiplicity] of sortedValues) { - if (multiplicity > 0) { - // Only stringify each value once and store the result - let valueKey = valueToKey.get(value as V1) - if (!valueKey) { - valueKey = hash(value) - valueToKey.set(value as V1, valueKey) - valueKeys.push(valueKey) - } - currValueMap.set(valueKey, value as V1) + insert(value: V): TopKChanges { + let result: TopKChanges = { moveIn: null, moveOut: null } + + // Lookup insert position + const index = this.#findIndex(value) + // Generate fractional index based on the fractional indices of the elements before and after it + const indexBefore = + index === 0 ? null : getIndex(this.#sortedValues[index - 1]) + const indexAfter = + index === this.#sortedValues.length + ? null + : getIndex(this.#sortedValues[index]) + const fractionalIndex = generateKeyBetween(indexBefore, indexAfter) + + // Insert the value at the correct position + const val = indexedValue(value, fractionalIndex) + // Splice is O(n) where n = all elements in the collection (i.e. n >= k) ! + this.#sortedValues.splice(index, 0, val) + + // Check if the topK changed + if (index < this.#topKEnd) { + // The inserted element is either before the top K or within the top K + // If it is before the top K then it moves the element that was right before the topK into the topK + // If it is within the top K then the inserted element moves into the top K + // In both cases the last element of the old top K now moves out of the top K + const moveInIndex = Math.max(index, this.#topKStart) + if (moveInIndex < this.#sortedValues.length) { + // We actually have a topK + // because in some cases there may not be enough elements in the array to reach the start of the topK + // e.g. [1, 2, 3] with K = 2 and offset = 3 does not have a topK + result.moveIn = this.#sortedValues[moveInIndex] + + // We need to remove the element that falls out of the top K + // The element that falls out of the top K has shifted one to the right + // because of the element we inserted, so we find it at index topKEnd + if (this.#topKEnd < this.#sortedValues.length) { + result.moveOut = this.#sortedValues[this.#topKEnd] } } + } - // Process previous output values - for (const [[value, index], multiplicity] of currOut) { - if (multiplicity > 0) { - // Only stringify each value once and store the result - let valueKey = valueToKey.get(value as V1) - if (!valueKey) { - valueKey = hash(value) - valueToKey.set(value as V1, valueKey) - } - prevOutputMap.set(valueKey, [value as V1, index as string]) - } - } + return result + } - // Find values that are no longer in the result - for (const [valueKey, [value, index]] of prevOutputMap.entries()) { - if (!currValueMap.has(valueKey)) { - // Value is no longer in the result, remove it - result.push([[key, [value, index]], -1]) - this.#indexOut.addValue(key, [[value, index], -1]) + /** + * Deletes a value that may or may not be in the topK. + * IMPORTANT: this assumes that the value is present in the collection + * if it's not the case it will remove the element + * that is on the position where the provided `value` would be. + */ + delete(value: V): TopKChanges { + let result: TopKChanges = { moveIn: null, moveOut: null } + + // Lookup delete position + const index = this.#findIndex(value) + // Remove the value at that position + const [removedElem] = this.#sortedValues.splice(index, 1) + + // Check if the topK changed + if (index < this.#topKEnd) { + // The removed element is either before the top K or within the top K + // If it is before the top K then the first element of the topK moves out of the topK + // If it is within the top K then the removed element moves out of the topK + result.moveOut = removedElem + if (index < this.#topKStart) { + // The removed element is before the topK + // so actually, the first element of the topK moves out of the topK + // and not the element that we removed + // The first element of the topK is now at index topKStart - 1 + // since we removed an element before the topK + const moveOutIndex = this.#topKStart - 1 + if (moveOutIndex < this.#sortedValues.length) { + result.moveOut = this.#sortedValues[moveOutIndex] + } else { + // No value is moving out of the topK + // because there are no elements in the topK + result.moveOut = null } } - // Process the sorted values and assign fractional indices - let prevIndex: string | null = null - let nextIndex: string | null = null - const newIndices = new Map() - - // First pass: reuse existing indices for values that haven't moved - for (let i = 0; i < sortedValues.length; i++) { - const [value, _multiplicity] = sortedValues[i] - // Use the pre-computed valueKey - const valueKey = valueToKey.get(value as V1) as string - - // Check if this value already has an index - const existingEntry = prevOutputMap.get(valueKey) - - if (existingEntry) { - const [_, existingIndex] = existingEntry - - // Check if we need to update the index - if (i === 0) { - // First element - prevIndex = null - nextIndex = - i + 1 < sortedValues.length - ? newIndices.get( - valueToKey.get(sortedValues[i + 1][0] as V1) as string, - ) || null - : null - - if (nextIndex !== null && existingIndex >= nextIndex) { - // Need to update index - const newIndex = generateKeyBetween(prevIndex, nextIndex) - newIndices.set(valueKey, newIndex) - } else { - // Can reuse existing index - newIndices.set(valueKey, existingIndex) - } - } else if (i === sortedValues.length - 1) { - // Last element - prevIndex = - newIndices.get( - valueToKey.get(sortedValues[i - 1][0] as V1) as string, - ) || null - nextIndex = null - - if (prevIndex !== null && existingIndex <= prevIndex) { - // Need to update index - const newIndex = generateKeyBetween(prevIndex, nextIndex) - newIndices.set(valueKey, newIndex) - } else { - // Can reuse existing index - newIndices.set(valueKey, existingIndex) - } - } else { - // Middle element - prevIndex = - newIndices.get( - valueToKey.get(sortedValues[i - 1][0] as V1) as string, - ) || null - nextIndex = - i + 1 < sortedValues.length - ? newIndices.get( - valueToKey.get(sortedValues[i + 1][0] as V1) as string, - ) || null - : null - - if ( - (prevIndex !== null && existingIndex <= prevIndex) || - (nextIndex !== null && existingIndex >= nextIndex) - ) { - // Need to update index - const newIndex = generateKeyBetween(prevIndex, nextIndex) - newIndices.set(valueKey, newIndex) - } else { - // Can reuse existing index - newIndices.set(valueKey, existingIndex) - } - } - } + // Since we removed an element that was before or in the topK + // the first element after the topK moved one position to the left + // and thus falls into the topK now + const moveInIndex = this.#topKEnd - 1 + if (moveInIndex < this.#sortedValues.length) { + result.moveIn = this.#sortedValues[moveInIndex] } + } - // Pre-compute valid previous and next indices for each position - // This avoids repeated lookups during index generation - const validPrevIndices: (string | null)[] = new Array(sortedValues.length) - const validNextIndices: (string | null)[] = new Array(sortedValues.length) - - // Initialize with null values - validPrevIndices.fill(null) - validNextIndices.fill(null) + return result + } - // First element has no previous - validPrevIndices[0] = null + // TODO: see if there is a way to refactor the code for insert and delete in the topK above + // because they are very similar, one is shifting the topK window to the left and the other is shifting it to the right + // so i have the feeling there is a common pattern here and we can implement both cases using that pattern - // Last element has no next - validNextIndices[sortedValues.length - 1] = null + #findIndex(value: V): number { + return binarySearch(this.#sortedValues, indexedValue(value, ''), (a, b) => + this.#comparator(getValue(a), getValue(b)), + ) + } +} - // Compute next valid indices (working forward) - let lastValidNextIndex: string | null = null - for (let i = sortedValues.length - 1; i >= 0; i--) { - const valueKey = valueToKey.get(sortedValues[i][0] as V1) as string +/** + * Operator for fractional indexed topK operations + * This operator maintains fractional indices for sorted elements + * and only updates indices when elements move position + */ +export class TopKWithFractionalIndexOperator extends UnaryOperator< + [K, V1], + [K, IndexedValue] +> { + #index = new Index() - // Set the next index for the current position - validNextIndices[i] = lastValidNextIndex + /** + * topK data structure that supports insertions and deletions + * and returns changes to the topK. + */ + #topK: TopK> - // Update lastValidNextIndex if this element has an index - if (newIndices.has(valueKey)) { - lastValidNextIndex = newIndices.get(valueKey) || null - } else { - const existingEntry = prevOutputMap.get(valueKey) - if (existingEntry) { - lastValidNextIndex = existingEntry[1] - } - } + constructor( + id: number, + inputA: DifferenceStreamReader<[K, V1]>, + output: DifferenceStreamWriter<[K, [V1, string]]>, + comparator: (a: V1, b: V1) => number, + options: TopKWithFractionalIndexOptions, + ) { + super(id, inputA, output) + const limit = options.limit ?? Infinity + const offset = options.offset ?? 0 + const compareTaggedValues = ( + a: HashTaggedValue, + b: HashTaggedValue, + ) => { + // First compare on the value + const valueComparison = comparator(getValue(a), getValue(b)) + if (valueComparison !== 0) { + return valueComparison } + // If the values are equal, compare on the hash + const hashA = getHash(a) + const hashB = getHash(b) + return hashA < hashB ? -1 : hashA > hashB ? 1 : 0 + } + this.#topK = this.createTopK(offset, limit, compareTaggedValues) + } - // Compute previous valid indices (working backward) - let lastValidPrevIndex: string | null = null - for (let i = 0; i < sortedValues.length; i++) { - const valueKey = valueToKey.get(sortedValues[i][0] as V1) as string - - // Set the previous index for the current position - validPrevIndices[i] = lastValidPrevIndex + protected createTopK( + offset: number, + limit: number, + comparator: (a: HashTaggedValue, b: HashTaggedValue) => number, + ): TopK> { + return new TopKArray(offset, limit, comparator) + } - // Update lastValidPrevIndex if this element has an index - if (newIndices.has(valueKey)) { - lastValidPrevIndex = newIndices.get(valueKey) || null - } else { - const existingEntry = prevOutputMap.get(valueKey) - if (existingEntry) { - lastValidPrevIndex = existingEntry[1] - } - } + run(): void { + const result: Array<[[K, [V1, string]], number]> = [] + for (const message of this.inputMessages()) { + for (const [item, multiplicity] of message.getInner()) { + const [key, value] = item + this.processElement(key, value, multiplicity, result) } + } - // Second pass: assign new indices for values that don't have one or need to be updated - for (let i = 0; i < sortedValues.length; i++) { - const [value, _multiplicity] = sortedValues[i] - // Use the pre-computed valueKey - const valueKey = valueToKey.get(value as V1) as string - - if (!newIndices.has(valueKey)) { - // This value doesn't have an index yet, use pre-computed indices - prevIndex = validPrevIndices[i] - nextIndex = validNextIndices[i] - - const newIndex = generateKeyBetween(prevIndex, nextIndex) - newIndices.set(valueKey, newIndex) + if (result.length > 0) { + this.output.sendData(new MultiSet(result)) + } + } - // Update validPrevIndices for subsequent elements - if (i < sortedValues.length - 1 && validPrevIndices[i + 1] === null) { - validPrevIndices[i + 1] = newIndex - } - } - } + processElement( + key: K, + value: V1, + multiplicity: number, + result: Array<[[K, [V1, string]], number]>, + ): void { + const oldMultiplicity = this.#index.getMultiplicity(key, value) + this.#index.addValue(key, [value, multiplicity]) + const newMultiplicity = this.#index.getMultiplicity(key, value) + + let res: TopKChanges> = { moveIn: null, moveOut: null } + if (oldMultiplicity <= 0 && newMultiplicity > 0) { + // The value was invisible but should now be visible + // Need to insert it into the array of sorted values + const taggedValue = tagValue(value) + res = this.#topK.insert(taggedValue) + } else if (oldMultiplicity > 0 && newMultiplicity <= 0) { + // The value was visible but should now be invisible + // Need to remove it from the array of sorted values + const taggedValue = tagValue(value) + res = this.#topK.delete(taggedValue) + } else { + // The value was invisible and it remains invisible + // or it was visible and remains visible + // so it doesn't affect the topK + } - // Now create the output with the new indices - for (let i = 0; i < sortedValues.length; i++) { - const [value, _multiplicity] = sortedValues[i] - // Use the pre-computed valueKey - const valueKey = valueToKey.get(value as V1) as string - const index = newIndices.get(valueKey)! - - // Check if this is a new value or if the index has changed - const existingEntry = prevOutputMap.get(valueKey) - - if (!existingEntry) { - // New value - result.push([[key, [value as V1, index]], 1]) - this.#indexOut.addValue(key, [[value as V1, index], 1]) - } else if (existingEntry[1] !== index) { - // Index has changed, remove old entry and add new one - result.push([[key, existingEntry], -1]) - result.push([[key, [value as V1, index]], 1]) - this.#indexOut.addValue(key, [existingEntry, -1]) - this.#indexOut.addValue(key, [[value as V1, index], 1]) - } - // If the value exists and the index hasn't changed, do nothing - } + if (res.moveIn) { + const valueWithoutHash = mapValue(res.moveIn, untagValue) + result.push([[key, valueWithoutHash], 1]) } - if (result.length > 0) { - this.output.sendData(new MultiSet(result)) + if (res.moveOut) { + const valueWithoutHash = mapValue(res.moveOut, untagValue) + result.push([[key, valueWithoutHash], -1]) } + + return } } @@ -340,3 +307,45 @@ export function topKWithFractionalIndex< return output } } + +// Abstraction for fractionally indexed values +export type FractionalIndex = string +export type IndexedValue = [V, FractionalIndex] + +export function indexedValue( + value: V, + index: FractionalIndex, +): IndexedValue { + return [value, index] +} + +export function getValue(indexedValue: IndexedValue): V { + return indexedValue[0] +} + +export function getIndex(indexedValue: IndexedValue): FractionalIndex { + return indexedValue[1] +} + +function mapValue( + value: IndexedValue, + f: (value: V) => W, +): IndexedValue { + return [f(getValue(value)), getIndex(value)] +} + +// Abstraction for values tagged with a hash +export type Hash = string +export type HashTaggedValue = [V, Hash] + +function tagValue(value: V): HashTaggedValue { + return [value, hash(value)] +} + +function untagValue(hashTaggedValue: HashTaggedValue): V { + return hashTaggedValue[0] +} + +function getHash(hashTaggedValue: HashTaggedValue): Hash { + return hashTaggedValue[1] +} diff --git a/packages/d2mini/src/operators/topKWithFractionalIndexBTree.ts b/packages/d2mini/src/operators/topKWithFractionalIndexBTree.ts new file mode 100644 index 0000000..ee14d00 --- /dev/null +++ b/packages/d2mini/src/operators/topKWithFractionalIndexBTree.ts @@ -0,0 +1,260 @@ +import { IStreamBuilder, KeyValue, PipedOperator } from '../types.js' +import { DifferenceStreamReader, DifferenceStreamWriter } from '../graph.js' +import { StreamBuilder } from '../d2.js' +import { generateKeyBetween } from 'fractional-indexing' +import BTree from 'sorted-btree' +import { + getIndex, + getValue, + HashTaggedValue, + indexedValue, + IndexedValue, + TopK, + TopKChanges, + TopKWithFractionalIndexOperator, + TopKWithFractionalIndexOptions, +} from './topKWithFractionalIndex.js' + +/** + * Implementation of a topK data structure that uses a B+ tree. + * The tree allows for logarithmic time insertions and deletions. + */ +class TopKTree implements TopK { + #comparator: (a: V, b: V) => number + // topK is a window at position [topKStart, topKEnd[ + // i.e. `topKStart` is inclusive and `topKEnd` is exclusive + #topKStart: number + #topKEnd: number + + #tree: BTree> + #topKFirstElem: IndexedValue | null = null // inclusive + #topKLastElem: IndexedValue | null = null // inclusive + + constructor( + offset: number, + limit: number, + comparator: (a: V, b: V) => number, + ) { + this.#topKStart = offset + this.#topKEnd = offset + limit + this.#comparator = comparator + this.#tree = new BTree(undefined, comparator) + } + + /** + * Insert a *new* value. + * Ignores the value if it is already present. + */ + insert(value: V): TopKChanges { + let result: TopKChanges = { moveIn: null, moveOut: null } + + // Get the elements before and after the value + const [, indexedValueBefore] = this.#tree.nextLowerPair(value) ?? [ + null, + null, + ] + const [, indexedValueAfter] = this.#tree.nextHigherPair(value) ?? [ + null, + null, + ] + + const indexBefore = indexedValueBefore ? getIndex(indexedValueBefore) : null + const indexAfter = indexedValueAfter ? getIndex(indexedValueAfter) : null + + // Generate a fractional index for the value + // based on the fractional indices of the elements before and after it + const fractionalIndex = generateKeyBetween(indexBefore, indexAfter) + const insertedElem = indexedValue(value, fractionalIndex) + + // Insert the value into the tree + const inserted = this.#tree.set(value, insertedElem, false) + if (!inserted) { + // The value was already present in the tree + // ignore this insertions since we don't support overwrites! + return result + } + + if (this.#tree.size - 1 < this.#topKStart) { + // We don't have a topK yet + // so we don't need to do anything + return result + } + + if (this.#topKFirstElem) { + // We have a topK containing at least 1 element + if (this.#comparator(value, getValue(this.#topKFirstElem)) < 0) { + // The element was inserted before the topK + // so it moves the element that is right before the topK into the topK + const firstElem = getValue(this.#topKFirstElem) + const [, newFirstElem] = this.#tree.nextLowerPair(firstElem)! + this.#topKFirstElem = newFirstElem + result.moveIn = this.#topKFirstElem + } else if ( + !this.#topKLastElem || + this.#comparator(value, getValue(this.#topKLastElem)) < 0 + ) { + // The element was inserted within the topK + result.moveIn = insertedElem + } + + if ( + this.#topKLastElem && + this.#comparator(value, getValue(this.#topKLastElem)) < 0 + ) { + // The element was inserted before or within the topK + // the newly inserted element pushes the last element of the topK out of the topK + // so the one before that becomes the new last element of the topK + const lastElem = this.#topKLastElem + const lastValue = getValue(lastElem) + const [, newLastElem] = this.#tree.nextLowerPair(lastValue)! + this.#topKLastElem = newLastElem + result.moveOut = lastElem + } + } + + // If the tree has as many elements as the offset (i.e. #topKStart) + // then the insertion shifted the elements 1 position to the right + // and the last element in the tree is now the first element of the topK + if (this.#tree.size - 1 === this.#topKStart) { + const topKFirstKey = this.#tree.maxKey()! + this.#topKFirstElem = this.#tree.get(topKFirstKey)! + result.moveIn = this.#topKFirstElem + } + + // By inserting this new element we now have a complete topK + // store the last element of the topK + if (this.#tree.size === this.#topKEnd) { + const topKLastKey = this.#tree.maxKey()! + this.#topKLastElem = this.#tree.get(topKLastKey)! + } + + return result + } + + delete(value: V): TopKChanges { + let result: TopKChanges = { moveIn: null, moveOut: null } + + const deletedElem = this.#tree.get(value) + const deleted = this.#tree.delete(value) + if (!deleted) { + return result + } + + if (!this.#topKFirstElem) { + // We didn't have a topK before the delete + // so we still can't have a topK after the delete + return result + } + + if (this.#comparator(value, getValue(this.#topKFirstElem)) < 0) { + // We deleted an element that was before the topK + // so the topK has shifted one position to the left + + // the old first element moves out of the topK + result.moveOut = this.#topKFirstElem + // the element that was right after the first element of the topK + // is now the new first element of the topK + const firstElem = getValue(this.#topKFirstElem) + const [, newFirstElem] = this.#tree.nextHigherPair(firstElem) ?? [ + null, + null, + ] + this.#topKFirstElem = newFirstElem + } else if ( + !this.#topKLastElem || + // TODO: if on equal order the element is inserted *after* the already existing one + // then this check should become < 0 + this.#comparator(value, getValue(this.#topKLastElem)) <= 0 + ) { + // The element we deleted was within the topK + // so we need to signal that that element is no longer in the topK + result.moveOut = deletedElem! + } + + if ( + this.#topKLastElem && + // TODO: if on equal order the element is inserted *after* the already existing one + // then this check should become < 0 + this.#comparator(value, getValue(this.#topKLastElem)) <= 0 + ) { + // The element we deleted was before or within the topK + // So the first element after the topK moved one position to the left + // and thus falls into the topK now + const lastElem = this.#topKLastElem + const lastValue = getValue(lastElem) + const [, newLastElem] = this.#tree.nextHigherPair(lastValue) ?? [ + null, + null, + ] + this.#topKLastElem = newLastElem + if (newLastElem) { + result.moveIn = newLastElem + } + } + + return result + } +} + +/** + * Operator for fractional indexed topK operations + * This operator maintains fractional indices for sorted elements + * and only updates indices when elements move position + */ +export class TopKWithFractionalIndexBTreeOperator< + K, + V1, +> extends TopKWithFractionalIndexOperator { + protected override createTopK( + offset: number, + limit: number, + comparator: (a: HashTaggedValue, b: HashTaggedValue) => number, + ): TopK> { + return new TopKTree(offset, limit, comparator) + } +} + +/** + * Limits the number of results based on a comparator, with optional offset. + * This works on a keyed stream, where the key is the first element of the tuple. + * The ordering is within a key group, i.e. elements are sorted within a key group + * and the limit + offset is applied to that sorted group. + * To order the entire stream, key by the same value for all elements such as null. + * + * Uses fractional indexing to minimize the number of changes when elements move positions. + * Each element is assigned a fractional index that is lexicographically sortable. + * When elements move, only the indices of the moved elements are updated, not all elements. + * + * @param comparator - A function that compares two elements + * @param options - An optional object containing limit and offset properties + * @returns A piped operator that orders the elements and limits the number of results + */ +export function topKWithFractionalIndexBTree< + K extends T extends KeyValue ? K : never, + V1 extends T extends KeyValue ? V : never, + T, +>( + comparator: (a: V1, b: V1) => number, + options?: TopKWithFractionalIndexOptions, +): PipedOperator> { + const opts = options || {} + + return ( + stream: IStreamBuilder, + ): IStreamBuilder> => { + const output = new StreamBuilder>( + stream.graph, + new DifferenceStreamWriter>(), + ) + const operator = new TopKWithFractionalIndexOperator( + stream.graph.getNextOperatorId(), + stream.connectReader() as DifferenceStreamReader>, + output.writer, + comparator, + opts, + ) + stream.graph.addOperator(operator) + stream.graph.addStream(output.connectReader()) + return output + } +} diff --git a/packages/d2mini/src/utils.ts b/packages/d2mini/src/utils.ts index 60c94b6..22a6bec 100644 --- a/packages/d2mini/src/utils.ts +++ b/packages/d2mini/src/utils.ts @@ -90,3 +90,24 @@ export function hash(data: any): string { hashCache.set(data, hashValue) return hashValue } + +export function binarySearch( + array: T[], + value: T, + comparator: (a: T, b: T) => number, +): number { + let low = 0 + let high = array.length + while (low < high) { + const mid = Math.floor((low + high) / 2) + const comparison = comparator(array[mid], value) + if (comparison < 0) { + low = mid + 1 + } else if (comparison > 0) { + high = mid + } else { + return mid + } + } + return low +} diff --git a/packages/d2mini/tests/operators/orderByWithFractionalIndex.test.ts b/packages/d2mini/tests/operators/orderByWithFractionalIndex.test.ts index 16d139a..67ba1e6 100644 --- a/packages/d2mini/tests/operators/orderByWithFractionalIndex.test.ts +++ b/packages/d2mini/tests/operators/orderByWithFractionalIndex.test.ts @@ -7,6 +7,12 @@ import { } from '../../src/operators/index.js' import { KeyValue } from '../../src/types.js' +const stripFractionalIndex = ([[key, [value, _index]], multiplicity]) => [ + key, + value, + multiplicity, +] + describe('Operators', () => { describe('OrderByWithFractionalIndex operation', () => { test('initial results with default comparator', () => { @@ -46,14 +52,14 @@ describe('Operators', () => { expect(latestMessage).not.toBeNull() const result = latestMessage.getInner() - const sortedResult = sortByKeyAndIndex(result) + const sortedResult = sortByKeyAndIndex(result).map(stripFractionalIndex) expect(sortedResult).toEqual([ - [['key1', [{ id: 1, value: 'a' }, 'a0']], 1], - [['key3', [{ id: 3, value: 'b' }, 'a1']], 1], - [['key5', [{ id: 5, value: 'c' }, 'a2']], 1], - [['key4', [{ id: 4, value: 'y' }, 'a3']], 1], - [['key2', [{ id: 2, value: 'z' }, 'a4']], 1], + ['key1', { id: 1, value: 'a' }, 1], + ['key3', { id: 3, value: 'b' }, 1], + ['key5', { id: 5, value: 'c' }, 1], + ['key4', { id: 4, value: 'y' }, 1], + ['key2', { id: 2, value: 'z' }, 1], ]) }) @@ -96,14 +102,14 @@ describe('Operators', () => { expect(latestMessage).not.toBeNull() const result = latestMessage.getInner() - const sortedResult = sortByKeyAndIndex(result) + const sortedResult = sortByKeyAndIndex(result).map(stripFractionalIndex) expect(sortedResult).toEqual([ - [['key2', [{ id: 2, value: 'z' }, 'a0']], 1], - [['key4', [{ id: 4, value: 'y' }, 'a1']], 1], - [['key5', [{ id: 5, value: 'c' }, 'a2']], 1], - [['key3', [{ id: 3, value: 'b' }, 'a3']], 1], - [['key1', [{ id: 1, value: 'a' }, 'a4']], 1], + ['key2', { id: 2, value: 'z' }, 1], + ['key4', { id: 4, value: 'y' }, 1], + ['key5', { id: 5, value: 'c' }, 1], + ['key3', { id: 3, value: 'b' }, 1], + ['key1', { id: 1, value: 'a' }, 1], ]) }) @@ -144,12 +150,12 @@ describe('Operators', () => { expect(latestMessage).not.toBeNull() const result = latestMessage.getInner() - const sortedResult = sortByKeyAndIndex(result) + const sortedResult = sortByKeyAndIndex(result).map(stripFractionalIndex) expect(sortedResult).toEqual([ - [['key1', [{ id: 1, value: 'a' }, 'a0']], 1], - [['key3', [{ id: 3, value: 'b' }, 'a1']], 1], - [['key5', [{ id: 5, value: 'c' }, 'a2']], 1], + ['key1', { id: 1, value: 'a' }, 1], + ['key3', { id: 3, value: 'b' }, 1], + ['key5', { id: 5, value: 'c' }, 1], ]) }) @@ -193,11 +199,11 @@ describe('Operators', () => { expect(latestMessage).not.toBeNull() const result = latestMessage.getInner() - const sortedResult = sortByKeyAndIndex(result) + const sortedResult = sortByKeyAndIndex(result).map(stripFractionalIndex) expect(sortedResult).toEqual([ - [['key5', [{ id: 5, value: 'c' }, 'a0']], 1], - [['key4', [{ id: 4, value: 'y' }, 'a1']], 1], + ['key5', { id: 5, value: 'c' }, 1], + ['key4', { id: 4, value: 'y' }, 1], ]) }) @@ -238,14 +244,14 @@ describe('Operators', () => { expect(latestMessage).not.toBeNull() const result = latestMessage.getInner() - const sortedResult = sortByKeyAndIndex(result) + const sortedResult = sortByKeyAndIndex(result).map(stripFractionalIndex) expect(sortedResult).toEqual([ - [['key1', [{ id: 1, value: 'a' }, 'a0']], 1], - [['key2', [{ id: 2, value: 'b' }, 'a1']], 1], - [['key3', [{ id: 3, value: 'c' }, 'a2']], 1], - [['key4', [{ id: 4, value: 'd' }, 'a3']], 1], - [['key5', [{ id: 5, value: 'e' }, 'a4']], 1], + ['key1', { id: 1, value: 'a' }, 1], + ['key2', { id: 2, value: 'b' }, 1], + ['key3', { id: 3, value: 'c' }, 1], + ['key4', { id: 4, value: 'd' }, 1], + ['key5', { id: 5, value: 'e' }, 1], ]) }) @@ -284,12 +290,13 @@ describe('Operators', () => { expect(latestMessage).not.toBeNull() const initialResult = latestMessage.getInner() - const sortedInitialResult = sortByKeyAndIndex(initialResult) + const sortedInitialResult = + sortByKeyAndIndex(initialResult).map(stripFractionalIndex) expect(sortedInitialResult).toEqual([ - [['key1', [{ id: 1, value: 'a' }, 'a0']], 1], - [['key2', [{ id: 2, value: 'b' }, 'a1']], 1], - [['key3', [{ id: 3, value: 'c' }, 'a2']], 1], + ['key1', { id: 1, value: 'a' }, 1], + ['key2', { id: 2, value: 'b' }, 1], + ['key3', { id: 3, value: 'c' }, 1], ]) // Add a new row that should be included in the top 3 @@ -303,12 +310,12 @@ describe('Operators', () => { expect(latestMessage).not.toBeNull() const result = latestMessage.getInner() - const sortedResult = sortByKeyAndIndex(result) + const sortedResult = sortByKeyAndIndex(result).map(stripFractionalIndex) expect(sortedResult).toEqual([ // We dont get key1 as its not changed or moved - [['key4', [{ id: 4, value: 'aa' }, 'a0V']], 1], // New row - [['key3', [{ id: 3, value: 'c' }, 'a2']], -1], // key3 is removed as its moved out of top 3 + ['key4', { id: 4, value: 'aa' }, 1], // New row + ['key3', { id: 3, value: 'c' }, -1], // key3 is removed as its moved out of top 3 ]) }) @@ -348,12 +355,13 @@ describe('Operators', () => { expect(latestMessage).not.toBeNull() const initialResult = latestMessage.getInner() - const sortedInitialResult = sortByKeyAndIndex(initialResult) + const sortedInitialResult = + sortByKeyAndIndex(initialResult).map(stripFractionalIndex) expect(sortedInitialResult).toEqual([ - [['key1', [{ id: 1, value: 'a' }, 'a0']], 1], - [['key2', [{ id: 2, value: 'b' }, 'a1']], 1], - [['key3', [{ id: 3, value: 'c' }, 'a2']], 1], + ['key1', { id: 1, value: 'a' }, 1], + ['key2', { id: 2, value: 'b' }, 1], + ['key3', { id: 3, value: 'c' }, 1], ]) // Remove a row that was in the top 3 @@ -367,13 +375,13 @@ describe('Operators', () => { expect(latestMessage).not.toBeNull() const result = latestMessage.getInner() - const sortedResult = sortByKeyAndIndex(result) + const sortedResult = sortByKeyAndIndex(result).map(stripFractionalIndex) expect(sortedResult).toEqual([ // key1 is removed - [['key1', [{ id: 1, value: 'a' }, 'a0']], -1], + ['key1', { id: 1, value: 'a' }, -1], // key4 is moved into the top 3 - [['key4', [{ id: 4, value: 'd' }, 'a3']], 1], + ['key4', { id: 4, value: 'd' }, 1], ]) }) @@ -413,12 +421,13 @@ describe('Operators', () => { expect(latestMessage).not.toBeNull() const initialResult = latestMessage.getInner() - const sortedInitialResult = sortByKeyAndIndex(initialResult) + const sortedInitialResult = + sortByKeyAndIndex(initialResult).map(stripFractionalIndex) expect(sortedInitialResult).toEqual([ - [['key1', [{ id: 1, value: 'a' }, 'a0']], 1], - [['key3', [{ id: 3, value: 'b' }, 'a1']], 1], - [['key2', [{ id: 2, value: 'c' }, 'a2']], 1], + ['key1', { id: 1, value: 'a' }, 1], + ['key3', { id: 3, value: 'b' }, 1], + ['key2', { id: 2, value: 'c' }, 1], ]) // Modify an existing row by removing it and adding a new version @@ -433,11 +442,11 @@ describe('Operators', () => { expect(latestMessage).not.toBeNull() const result = latestMessage.getInner() - const sortedResult = sortByKeyAndIndex(result) + const sortedResult = sortByKeyAndIndex(result).map(stripFractionalIndex) expect(sortedResult).toEqual([ - [['key2', [{ id: 2, value: 'c' }, 'a2']], -1], // removed as out of top 3 - [['key4', [{ id: 4, value: 'd' }, 'a2']], 1], // key4 is moved up + ['key2', { id: 2, value: 'c' }, -1], // removed as out of top 3 + ['key4', { id: 4, value: 'd' }, 1], // key4 is moved up ]) }) }) @@ -450,23 +459,24 @@ function sortByKeyAndIndex(results: any[]) { return [...results] .sort( ( - [[_aKey, [_aValue, aIndex]], aMultiplicity], - [[_bKey, [_bValue, bIndex]], bMultiplicity], + [[_aKey, [_aValue, _aIndex]], aMultiplicity], + [[_bKey, [_bValue, _bIndex]], bMultiplicity], ) => aMultiplicity - bMultiplicity, ) .sort( ( - [[aKey, [_aValue, aIndex]], _aMultiplicity], - [[bKey, [_bValue, bIndex]], _bMultiplicity], + [[aKey, [_aValue, _aIndex]], _aMultiplicity], + [[bKey, [_bValue, _bIndex]], _bMultiplicity], ) => aKey - bKey, ) .sort( ( - [[aKey, [_aValue, aIndex]], _aMultiplicity], - [[bKey, [_bValue, bIndex]], _bMultiplicity], + [[_aKey, [_aValue, aIndex]], _aMultiplicity], + [[_bKey, [_bValue, bIndex]], _bMultiplicity], ) => { // lexically compare the index - return aIndex.localeCompare(bIndex) + //return aIndex.localeCompare(bIndex) + return aIndex < bIndex ? -1 : aIndex > bIndex ? 1 : 0 }, ) } diff --git a/packages/d2mini/tests/operators/topKWithFractionalIndex.test.ts b/packages/d2mini/tests/operators/topKWithFractionalIndex.test.ts index 8e7291b..d6238c2 100644 --- a/packages/d2mini/tests/operators/topKWithFractionalIndex.test.ts +++ b/packages/d2mini/tests/operators/topKWithFractionalIndex.test.ts @@ -1,7 +1,8 @@ -import { describe, it, expect, beforeEach, afterEach } from 'vitest' +import { describe, it, expect } from 'vitest' import { D2 } from '../../src/d2.js' import { MultiSet } from '../../src/multiset.js' import { topKWithFractionalIndex } from '../../src/operators/topKWithFractionalIndex.js' +import { topKWithFractionalIndexBTree } from '../../src/operators/topKWithFractionalIndexBTree.js' import { output } from '../../src/operators/index.js' // Helper function to check if indices are in lexicographic order @@ -23,7 +24,9 @@ function checkLexicographicOrder(results: any[]) { const nextIndex = sortedByValue[i + 1].index // Indices should be in lexicographic order - expect(currentIndex < nextIndex).toBe(true) + if (!(currentIndex < nextIndex)) { + return false + } } return true @@ -57,14 +60,17 @@ function verifyOrder(results: any[], expectedOrder: string[]) { } describe('Operators', () => { - describe('TopKWithFractionalIndex operation', () => { + describe.each([ + ['with array', { topK: topKWithFractionalIndex }], + ['with B+ tree', { topK: topKWithFractionalIndexBTree }], + ])('TopKWithFractionalIndex operation %s', (_, { topK }) => { it('should assign fractional indices to sorted elements', () => { const graph = new D2() const input = graph.newInput<[null, { id: number; value: string }]>() const allMessages: any[] = [] input.pipe( - topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value)), + topK((a, b) => a.value.localeCompare(b.value)), output((message) => { allMessages.push(message) }), @@ -160,13 +166,138 @@ describe('Operators', () => { expect(checkLexicographicOrder(currentStateArray)).toBe(true) }) + it('should support duplicate ordering keys', () => { + const graph = new D2() + const input = graph.newInput<[null, { id: number; value: string }]>() + const allMessages: any[] = [] + + input.pipe( + topK((a, b) => a.value.localeCompare(b.value)), + output((message) => { + allMessages.push(message) + }), + ) + + graph.finalize() + + // Initial data - a, b, c, d, e + input.sendData( + new MultiSet([ + [[null, { id: 1, value: 'a' }], 1], + [[null, { id: 2, value: 'b' }], 1], + [[null, { id: 3, value: 'c' }], 1], + [[null, { id: 4, value: 'd' }], 1], + [[null, { id: 5, value: 'e' }], 1], + ]), + ) + graph.run() + + // Initial result should have all elements with fractional indices + const initialResult = allMessages[0].getInner() + expect(initialResult.length).toBe(5) + + // Check that indices are in lexicographic order + expect(checkLexicographicOrder(initialResult)).toBe(true) + + // Store the initial indices for later comparison + const initialIndices = new Map() + for (const [[_, [value, index]]] of initialResult) { + initialIndices.set(value.id, index) + } + + // Now let's add a new element with a value that is already in there + input.sendData(new MultiSet([[[null, { id: 6, value: 'c' }], 1]])) + graph.run() + + // Check the changes + const changes = allMessages[1].getInner() + + // We should only emit as many changes as we received + expect(changes.length).toBe(1) // 1 addition + + // Find the addition + const [addition] = changes + + // Check that we added { id: 6, value: 'c' } + expect(addition?.[0][1][0]).toEqual({ id: 6, value: 'c' }) + + // Reconstruct the current state by applying the changes + const currentState = new Map() + for (const [[_, [value, index]]] of initialResult) { + currentState.set(JSON.stringify(value), [value, index]) + } + + // Apply the changes + for (const [[_, [value, index]], multiplicity] of changes) { + if (multiplicity < 0) { + // Remove + currentState.delete(JSON.stringify(value)) + } else { + // Add + currentState.set(JSON.stringify(value), [value, index]) + } + } + + // Convert to array for lexicographic order check + const currentStateArray = Array.from(currentState.values()).map( + ([value, index]) => [[null, [value, index]], 1], + ) + + // Check that indices are still in lexicographic order after the changes + expect(checkLexicographicOrder(currentStateArray)).toBe(true) + expect(currentStateArray.length).toBe(6) + }) + + it('should ignore duplicate values', () => { + const graph = new D2() + const input = graph.newInput<[null, { id: number; value: string }]>() + const allMessages: any[] = [] + + input.pipe( + topK((a, b) => a.value.localeCompare(b.value)), + output((message) => { + allMessages.push(message) + }), + ) + + graph.finalize() + + // Initial data - a, b, c, d, e + const entryForC = [[null, { id: 3, value: 'c' }], 1] as [ + [null, { id: number; value: string }], + number, + ] + input.sendData( + new MultiSet([ + [[null, { id: 1, value: 'a' }], 1], + [[null, { id: 2, value: 'b' }], 1], + entryForC, + [[null, { id: 4, value: 'd' }], 1], + [[null, { id: 5, value: 'e' }], 1], + ]), + ) + graph.run() + + // Initial result should have all elements with fractional indices + const initialResult = allMessages[0].getInner() + expect(initialResult.length).toBe(5) + + // Now add entryForC again + input.sendData(new MultiSet([entryForC])) + graph.run() + + // Check that no message was emitted + // since there were no changes to the topK + expect(allMessages.length).toBe(1) + }) + it('should handle limit and offset correctly', () => { const graph = new D2() const input = graph.newInput<[null, { id: number; value: string }]>() const allMessages: any[] = [] input.pipe( - topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value), { + topK((a, b) => a.value.localeCompare(b.value), { limit: 3, offset: 1, }), @@ -235,9 +366,6 @@ describe('Operators', () => { expect(removal?.[0][1][0].id).toBe(4) // 'd' has id 4 expect(addition?.[0][1][0].id).toBe(6) // 'c+' has id 6 - // The new element reuses the index of the removed element - expect(addition?.[0][1][1]).toBe(removal?.[0][1][1]) - // Reconstruct the current state by applying the changes const currentState = new Map() for (const [[_, [value, index]]] of initialResult) { @@ -245,23 +373,171 @@ describe('Operators', () => { } // Apply the changes - for (const [[_, [value, index]], multiplicity] of changes) { - if (multiplicity < 0) { - // Remove - currentState.delete(JSON.stringify(value)) - } else { - // Add - currentState.set(JSON.stringify(value), [value, index]) + const applyChanges = (changes: any[]) => { + for (const [[_, [value, index]], multiplicity] of changes) { + if (multiplicity < 0) { + // Remove + currentState.delete(JSON.stringify(value)) + } else { + // Add + currentState.set(JSON.stringify(value), [value, index]) + } } } + applyChanges(changes) + // Convert to array for lexicographic order check - const currentStateArray = Array.from(currentState.values()).map( - ([value, index]) => [[null, [value, index]], 1], + const checkCurrentState = (expectedResult) => { + const stateArray = Array.from(currentState.values()) + const currentStateArray = stateArray.map(([value, index]) => [ + [null, [value, index]], + 1, + ]) + + // Check that indices are still in lexicographic order after the changes + expect(checkLexicographicOrder(currentStateArray)).toBe(true) + + // expect the array to be the values with IDs 2, 3, 6 in that order + const compareFractionalIndex = (a, b) => + a[1] < b[1] ? -1 : a[1] > b[1] ? 1 : 0 + const sortedResult = stateArray + .sort(compareFractionalIndex) + .map(([value, _]) => value) + expect(sortedResult).toEqual(expectedResult) + } + + checkCurrentState([ + { id: 2, value: 'b' }, + { id: 3, value: 'c' }, + { id: 6, value: 'c+' }, + ]) + + // Now add an element that should be before the topK + input.sendData( + new MultiSet([ + [[null, { id: 7, value: '0' }], 1], // This should be before 'a' + ]), ) + graph.run() - // Check that indices are still in lexicographic order after the changes - expect(checkLexicographicOrder(currentStateArray)).toBe(true) + // Check the changes + const changes2 = allMessages[2].getInner() + + // We received 1 change (1 addition) + // Since we have a limit, this will push out 1 element, so we'll emit 2 changes + // This is still optimal as we're only emitting the minimum necessary changes + expect(changes2.length).toBe(2) // 1 removal + 1 addition + + // Find the removal and addition + const removal2 = changes2.find(([_, multiplicity]) => multiplicity < 0) + const addition2 = changes2.find(([_, multiplicity]) => multiplicity > 0) + + // Check that we removed 'c+' and added 'a' + expect(removal2?.[0][1][0].value).toBe('c+') + expect(addition2?.[0][1][0].value).toBe('a') + + // Check that the ids are correct + expect(removal2?.[0][1][0].id).toBe(6) // 'c+' has id 6 + expect(addition2?.[0][1][0].id).toBe(1) // 'a' has id 1 + + // Apply the changes + applyChanges(changes2) + + checkCurrentState([ + { id: 1, value: 'a' }, + { id: 2, value: 'b' }, + { id: 3, value: 'c' }, + ]) + + // Now add an element after the topK + input.sendData( + new MultiSet([ + [[null, { id: 8, value: 'h' }], 1], // This should be after 'e' + ]), + ) + graph.run() + + // Should not have emitted any changes + // since the element was added after the topK + // so it does not affect the topK + expect(allMessages.length).toBe(3) + + // Now remove an element before the topK + // This will cause the first element of the topK to move out of the topK + // and the element after the last element of the topK to move into the topK + input.sendData( + new MultiSet([ + [[null, { id: 7, value: '0' }], -1], // Remove '0' + ]), + ) + graph.run() + + const changes3 = allMessages[3].getInner() + + // Find the removal and addition + const removal3 = changes3.find(([_, multiplicity]) => multiplicity < 0) + const addition3 = changes3.find(([_, multiplicity]) => multiplicity > 0) + + // Check that we removed 'a' and added 'c+' + expect(removal3?.[0][1][0].value).toBe('a') + expect(addition3?.[0][1][0].value).toBe('c+') + + // Check that the ids are correct + expect(removal3?.[0][1][0].id).toBe(1) // 'a' has id 1 + expect(addition3?.[0][1][0].id).toBe(6) // 'c+' has id 6 + + // Apply the changes + applyChanges(changes3) + + checkCurrentState([ + { id: 2, value: 'b' }, + { id: 3, value: 'c' }, + { id: 6, value: 'c+' }, + ]) + + // Now remove an element in the topK + // This causes the element after the last element of the topK to move into the topK + input.sendData( + new MultiSet([ + [[null, { id: 6, value: 'c+' }], -1], // Remove 'c+' + ]), + ) + graph.run() + + const changes4 = allMessages[4].getInner() + + // Find the removal and addition + const removal4 = changes4.find(([_, multiplicity]) => multiplicity < 0) + const addition4 = changes4.find(([_, multiplicity]) => multiplicity > 0) + + // Check that we removed 'c+' and added 'c' + expect(removal4?.[0][1][0].value).toBe('c+') + expect(addition4?.[0][1][0].value).toBe('d') + + // Check that the ids are correct + expect(removal4?.[0][1][0].id).toBe(6) // 'c+' has id 6 + expect(addition4?.[0][1][0].id).toBe(4) // 'd' has id 4 + + // Apply the changes + applyChanges(changes4) + + checkCurrentState([ + { id: 2, value: 'b' }, + { id: 3, value: 'c' }, + { id: 4, value: 'd' }, + ]) + + // Now remove an element after the topK + input.sendData( + new MultiSet([ + [[null, { id: 8, value: 'h' }], -1], // Remove 'h' + ]), + ) + graph.run() + + // There should be no changes + expect(allMessages.length).toBe(5) }) it('should handle elements moving positions correctly', () => { @@ -270,7 +546,7 @@ describe('Operators', () => { const allMessages: any[] = [] input.pipe( - topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value)), + topK((a, b) => a.value.localeCompare(b.value)), output((message) => { allMessages.push(message) }), @@ -319,8 +595,6 @@ describe('Operators', () => { // We should only emit as many changes as we received // We received 4 changes (2 additions, 2 removals) - // We should emit at most 4 changes - expect(changes.length).toBeLessThanOrEqual(4) expect(changes.length).toBe(4) // 2 removals + 2 additions // Find the removals and additions @@ -358,8 +632,8 @@ describe('Operators', () => { ) // The elements reuse their indices - expect(bPlusAddition?.[0][1][1]).toBe(bRemoval?.[0][1][1]) - expect(dPlusAddition?.[0][1][1]).toBe(dRemoval?.[0][1][1]) + //expect(bPlusAddition?.[0][1][1]).toBe(bRemoval?.[0][1][1]) + //expect(dPlusAddition?.[0][1][1]).toBe(dRemoval?.[0][1][1]) // Check that we only emitted changes for the elements that moved const changedIds = new Set() @@ -388,12 +662,28 @@ describe('Operators', () => { } // Convert to array for lexicographic order check - const currentStateArray = Array.from(currentState.values()).map( - ([value, index]) => [[null, [value, index]], 1], - ) + const stateArray = Array.from(currentState.values()) + const currentStateArray = stateArray.map(([value, index]) => [ + [null, [value, index]], + 1, + ]) // Check that indices are still in lexicographic order after the changes expect(checkLexicographicOrder(currentStateArray)).toBe(true) + + // Expect the array to be the elements with IDs 1, 4, 3, 2, 5 + const compareFractionalIndex = (a, b) => + a[1] < b[1] ? -1 : a[1] > b[1] ? 1 : 0 + const sortedResult = stateArray + .sort(compareFractionalIndex) + .map(([value, _]) => value) + expect(sortedResult).toEqual([ + { id: 1, value: 'a' }, + { id: 4, value: 'b+' }, + { id: 3, value: 'c' }, + { id: 2, value: 'd+' }, + { id: 5, value: 'e' }, + ]) }) it('should maintain lexicographic order through multiple updates', () => { @@ -402,7 +692,7 @@ describe('Operators', () => { const allMessages: any[] = [] input.pipe( - topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value)), + topK((a, b) => a.value.localeCompare(b.value)), output((message) => { allMessages.push(message) }), @@ -559,7 +849,7 @@ describe('Operators', () => { const allMessages: any[] = [] input.pipe( - topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value)), + topK((a, b) => a.value.localeCompare(b.value)), output((message) => { allMessages.push(message) }), @@ -688,7 +978,7 @@ describe('Operators', () => { const allMessages: any[] = [] input.pipe( - topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value)), + topK((a, b) => a.value.localeCompare(b.value)), output((message) => { allMessages.push(message) }), @@ -772,7 +1062,7 @@ describe('Operators', () => { const allMessages: any[] = [] input.pipe( - topKWithFractionalIndex((a, b) => a.value.localeCompare(b.value)), + topK((a, b) => a.value.localeCompare(b.value)), output((message) => { allMessages.push(message) }), diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index e0c21c5..b5aca52 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -87,6 +87,9 @@ importers: murmurhash-js: specifier: ^1.0.0 version: 1.0.0 + sorted-btree: + specifier: ^1.8.1 + version: 1.8.1 devDependencies: '@types/murmurhash-js': specifier: ^1.0.6 @@ -2338,6 +2341,9 @@ packages: resolution: {integrity: sha512-qMCMfhY040cVHT43K9BFygqYbUPFZKHOg7K73mtTWJRb8pyP3fzf4Ixd5SzdEJQ6MRUg/WBnOLxghZtKKurENQ==} engines: {node: '>=10'} + sorted-btree@1.8.1: + resolution: {integrity: sha512-395+XIP+wqNn3USkFSrNz7G3Ss/MXlZEqesxvzCRFwL14h6e8LukDHdLBePn5pwbm5OQ9vGu8mDyz2lLDIqamQ==} + source-map-js@1.2.1: resolution: {integrity: sha512-UXWMKhLOwVKb728IUtQPXxfYU+usdybtUrK/8uGE8CQMvrhOpwvzDBwj0QhSL7MQc7vIsISBG8VQ8+IDQxpfQA==} engines: {node: '>=0.10.0'} @@ -4746,6 +4752,8 @@ snapshots: astral-regex: 2.0.0 is-fullwidth-code-point: 3.0.0 + sorted-btree@1.8.1: {} + source-map-js@1.2.1: {} spawndamnit@3.0.1: