diff --git a/.changeset/lucky-toys-fail.md b/.changeset/lucky-toys-fail.md new file mode 100644 index 0000000..c069534 --- /dev/null +++ b/.changeset/lucky-toys-fail.md @@ -0,0 +1,5 @@ +--- +'@electric-sql/d2mini': patch +--- + +fix a bug where `reduce` would not emit an message for deleted keys diff --git a/.changeset/odd-garlics-chew.md b/.changeset/odd-garlics-chew.md new file mode 100644 index 0000000..60b7db3 --- /dev/null +++ b/.changeset/odd-garlics-chew.md @@ -0,0 +1,5 @@ +--- +'@electric-sql/d2mini': patch +--- + +fix a bug where groupBy would not remove a group if it's key was completely removed from the stream diff --git a/packages/d2mini/src/indexes.ts b/packages/d2mini/src/indexes.ts index c1c88db..5108b00 100644 --- a/packages/d2mini/src/indexes.ts +++ b/packages/d2mini/src/indexes.ts @@ -7,32 +7,24 @@ import { DefaultMap, hash } from './utils.js' * exploit the key-value structure of the data to run efficiently. */ export class Index { - #inner: DefaultMap> + #inner: DefaultMap + #changedKeys: Set constructor() { - this.#inner = new DefaultMap>( - () => - new DefaultMap(() => [undefined as any as V, 0]), - ) - // #inner is as map of: - // { - // [key]: { - // [hash(value)]: [value, multiplicity] - // } - // } + this.#inner = new DefaultMap(() => []) + this.#changedKeys = new Set() } toString(indent = false): string { return `Index(${JSON.stringify( - [...this.#inner].map(([k, valueMap]) => [k, [...valueMap]]), + [...this.#inner], undefined, indent ? ' ' : undefined, )})` } get(key: K): [V, number][] { - const valueMap = this.#inner.get(key) - return [...valueMap.values()] + return this.#inner.get(key) } entries() { @@ -44,54 +36,78 @@ export class Index { } has(key: K): boolean { - return this.#inner.has(key) + return this.#inner.has(key) && this.#inner.get(key).length > 0 } get size(): number { - return this.#inner.size + let count = 0 + for (const [, values] of this.#inner.entries()) { + if (values.length > 0) { + count++ + } + } + return count } addValue(key: K, value: [V, number]): void { - const [val, multiplicity] = value - const valueMap = this.#inner.get(key) - const valueHash = hash(val) - const [, existingMultiplicity] = valueMap.get(valueHash) - if (existingMultiplicity !== 0) { - const newMultiplicity = existingMultiplicity + multiplicity - if (newMultiplicity === 0) { - valueMap.delete(valueHash) - } else { - valueMap.set(valueHash, [val, newMultiplicity]) + const values = this.#inner.get(key) + values.push(value) + this.#changedKeys.add(key) + } + + append(other: Index): void { + for (const [key, otherValues] of other.entries()) { + const thisValues = this.#inner.get(key) + for (const value of otherValues) { + thisValues.push(value) + } + this.#changedKeys.add(key) + } + } + + compact(keys: K[] = []): void { + // If no keys specified, use the changed keys + const keysToProcess = keys.length === 0 ? [...this.#changedKeys] : keys + + for (const key of keysToProcess) { + if (!this.#inner.has(key)) continue + + const values = this.#inner.get(key) + const consolidated = this.consolidateValues(values) + + // Remove the key entirely and re-add only if there are non-zero values + this.#inner.delete(key) + if (consolidated.length > 0) { + this.#inner.get(key).push(...consolidated) } + } + + // Clear the changed keys after compaction + if (keys.length === 0) { + this.#changedKeys.clear() } else { - if (multiplicity !== 0) { - valueMap.set(valueHash, [val, multiplicity]) + // Only remove the keys that were explicitly compacted + for (const key of keys) { + this.#changedKeys.delete(key) } } } - append(other: Index): void { - for (const [key, otherValueMap] of other.entries()) { - const thisValueMap = this.#inner.get(key) - for (const [ - valueHash, - [value, multiplicity], - ] of otherValueMap.entries()) { - const [, existingMultiplicity] = thisValueMap.get(valueHash) - if (existingMultiplicity !== 0) { - const newMultiplicity = existingMultiplicity + multiplicity - if (newMultiplicity === 0) { - thisValueMap.delete(valueHash) - } else { - thisValueMap.set(valueHash, [value, newMultiplicity]) - } - } else { - if (multiplicity !== 0) { - thisValueMap.set(valueHash, [value, multiplicity]) - } - } + private consolidateValues(values: [V, number][]): [V, number][] { + const consolidated = new Map() + + for (const [value, multiplicity] of values) { + const valueHash = hash(value) + if (consolidated.has(valueHash)) { + consolidated.get(valueHash)!.multiplicity += multiplicity + } else { + consolidated.set(valueHash, { value, multiplicity }) } } + + return [...consolidated.values()] + .filter(({ multiplicity }) => multiplicity !== 0) + .map(({ value, multiplicity }) => [value, multiplicity]) } join(other: Index): MultiSet<[K, [V, V2]]> { @@ -100,11 +116,11 @@ export class Index { // We want to iterate over the smaller of the two indexes to reduce the // number of operations we need to do. if (this.size <= other.size) { - for (const [key, valueMap] of this.entries()) { + for (const [key, values1] of this.entries()) { if (!other.has(key)) continue - const otherValues = other.get(key) - for (const [val1, mul1] of valueMap.values()) { - for (const [val2, mul2] of otherValues) { + const values2 = other.get(key) + for (const [val1, mul1] of values1) { + for (const [val2, mul2] of values2) { if (mul1 !== 0 && mul2 !== 0) { result.push([[key, [val1, val2]], mul1 * mul2]) } @@ -112,11 +128,11 @@ export class Index { } } } else { - for (const [key, otherValueMap] of other.entries()) { + for (const [key, values2] of other.entries()) { if (!this.has(key)) continue - const values = this.get(key) - for (const [val2, mul2] of otherValueMap.values()) { - for (const [val1, mul1] of values) { + const values1 = this.get(key) + for (const [val2, mul2] of values2) { + for (const [val1, mul1] of values1) { if (mul1 !== 0 && mul2 !== 0) { result.push([[key, [val1, val2]], mul1 * mul2]) } diff --git a/packages/d2mini/src/operators/groupBy.ts b/packages/d2mini/src/operators/groupBy.ts index 8904106..172b66e 100644 --- a/packages/d2mini/src/operators/groupBy.ts +++ b/packages/d2mini/src/operators/groupBy.ts @@ -87,6 +87,17 @@ export function groupBy< // Then reduce to compute aggregates const reduced = withKeysAndValues.pipe( reduce((values) => { + // Calculate total multiplicity to check if the group should exist + let totalMultiplicity = 0 + for (const [_, multiplicity] of values) { + totalMultiplicity += multiplicity + } + + // If total multiplicity is 0 or negative, the group should be removed completely + if (totalMultiplicity <= 0) { + return [] + } + const result: Record = {} // Get the original key from first value in group diff --git a/packages/d2mini/src/operators/join.ts b/packages/d2mini/src/operators/join.ts index 521429a..ac50551 100644 --- a/packages/d2mini/src/operators/join.ts +++ b/packages/d2mini/src/operators/join.ts @@ -77,6 +77,13 @@ export class JoinOperator extends BinaryOperator< // Append deltaB to indexB this.#indexB.append(deltaB) + + // Compact both indexes to consolidate values and remove zero-multiplicity entries + // Only compact changed keys for efficiency + deltaA.compact() + deltaB.compact() + this.#indexA.compact() + this.#indexB.compact() } } diff --git a/packages/d2mini/src/operators/reduce.ts b/packages/d2mini/src/operators/reduce.ts index 887dd21..1188ec9 100644 --- a/packages/d2mini/src/operators/reduce.ts +++ b/packages/d2mini/src/operators/reduce.ts @@ -15,7 +15,6 @@ import { hash } from '../utils.js' export class ReduceOperator extends UnaryOperator<[K, V1], [K, V2]> { #index = new Index() #indexOut = new Index() - #keysTodo = new Set() #f: (values: [V1, number][]) => [V2, number][] constructor( @@ -30,41 +29,71 @@ export class ReduceOperator extends UnaryOperator<[K, V1], [K, V2]> { run(): void { // Collect all input messages and update the index + const keysTodo = new Set() for (const message of this.inputMessages()) { for (const [item, multiplicity] of message.getInner()) { const [key, value] = item this.#index.addValue(key, [value, multiplicity]) - this.#keysTodo.add(key) + keysTodo.add(key) } } // For each key, compute the reduction and delta const result: [[K, V2], number][] = [] - for (const key of this.#keysTodo) { + for (const key of keysTodo) { const curr = this.#index.get(key) const currOut = this.#indexOut.get(key) const out = this.#f(curr) - // Calculate delta between current and previous output - const delta = new Map() - const values = new Map() + // Create maps for current and previous outputs + const newOutputMap = new Map< + string, + { value: V2; multiplicity: number } + >() + const oldOutputMap = new Map< + string, + { value: V2; multiplicity: number } + >() + + // Process new output for (const [value, multiplicity] of out) { const valueKey = hash(value) - values.set(valueKey, value) - delta.set(valueKey, (delta.get(valueKey) || 0) + multiplicity) + if (newOutputMap.has(valueKey)) { + newOutputMap.get(valueKey)!.multiplicity += multiplicity + } else { + newOutputMap.set(valueKey, { value, multiplicity }) + } } + + // Process previous output for (const [value, multiplicity] of currOut) { const valueKey = hash(value) - values.set(valueKey, value) - delta.set(valueKey, (delta.get(valueKey) || 0) - multiplicity) + if (oldOutputMap.has(valueKey)) { + oldOutputMap.get(valueKey)!.multiplicity += multiplicity + } else { + oldOutputMap.set(valueKey, { value, multiplicity }) + } } - // Add non-zero deltas to result - for (const [valueKey, multiplicity] of delta) { - const value = values.get(valueKey)! - if (multiplicity !== 0) { - result.push([[key, value], multiplicity]) - this.#indexOut.addValue(key, [value, multiplicity]) + // First, emit removals for old values that are no longer present or have changed + for (const [valueKey, { value, multiplicity }] of oldOutputMap) { + const newEntry = newOutputMap.get(valueKey) + if (!newEntry || newEntry.multiplicity !== multiplicity) { + // Remove the old value entirely + result.push([[key, value], -multiplicity]) + this.#indexOut.addValue(key, [value, -multiplicity]) + } + } + + // Then, emit additions for new values that are not present in old or have changed + for (const [valueKey, { value, multiplicity }] of newOutputMap) { + const oldEntry = oldOutputMap.get(valueKey) + if (!oldEntry || oldEntry.multiplicity !== multiplicity) { + // Add the new value only if it has non-zero multiplicity + if (multiplicity !== 0) { + result.push([[key, value], multiplicity]) + this.#indexOut.addValue(key, [value, multiplicity]) + } } } } @@ -72,7 +101,11 @@ export class ReduceOperator extends UnaryOperator<[K, V1], [K, V2]> { if (result.length > 0) { this.output.sendData(new MultiSet(result)) } - this.#keysTodo.clear() + + // Compact both indexes to consolidate values and remove zero-multiplicity entries + // Only compact changed keys for efficiency + this.#index.compact() + this.#indexOut.compact() } } diff --git a/packages/d2mini/src/operators/topKWithFractionalIndex.ts b/packages/d2mini/src/operators/topKWithFractionalIndex.ts index 6d7ed74..a6d173d 100644 --- a/packages/d2mini/src/operators/topKWithFractionalIndex.ts +++ b/packages/d2mini/src/operators/topKWithFractionalIndex.ts @@ -293,6 +293,11 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< if (result.length > 0) { this.output.sendData(new MultiSet(result)) } + + // Compact both indexes to consolidate values and remove zero-multiplicity entries + // Only compact changed keys for efficiency + this.#index.compact() + this.#indexOut.compact() } } diff --git a/packages/d2mini/tests/indexes.test.ts b/packages/d2mini/tests/indexes.test.ts index ce0cfbd..58c39cc 100644 --- a/packages/d2mini/tests/indexes.test.ts +++ b/packages/d2mini/tests/indexes.test.ts @@ -31,8 +31,17 @@ describe('Index', () => { index.addValue('key1', [10, 1]) index.addValue('key1', [10, -1]) + // Before compaction, values are stored as-is const result = index.get('key1') - expect(result).toEqual([]) + expect(result).toEqual([ + [10, 1], + [10, -1], + ]) + + // After compaction, zero-multiplicity values are removed + index.compact(['key1']) + const compactedResult = index.get('key1') + expect(compactedResult).toEqual([]) }) }) @@ -61,6 +70,14 @@ describe('Index', () => { index.append(other) + // Before compaction, values are stored separately + expect(index.get('key1')).toEqual([ + [10, 2], + [10, 3], + ]) + + // After compaction, multiplicities are combined + index.compact(['key1']) expect(index.get('key1')).toEqual([[10, 5]]) }) }) @@ -130,4 +147,68 @@ describe('Index', () => { expect(result.getInner()).toEqual([]) }) }) + + test('should track and compact only changed keys', () => { + // Add values to different keys + index.addValue('key1', [10, 1]) + index.addValue('key1', [10, -1]) // This should cancel out when compacted + index.addValue('key2', [20, 1]) + index.addValue('key3', [30, 1]) + + // Create another index that we'll append from + const other = createIndex('other') + other.addValue('key4', [40, 1]) + other.addValue('key4', [40, -1]) // This should cancel out when compacted + other.addValue('key5', [50, 1]) + + // Append should also track changed keys + index.append(other) + + // Before compaction, all values should be stored as-is + expect(index.get('key1')).toEqual([ + [10, 1], + [10, -1], + ]) + expect(index.get('key2')).toEqual([[20, 1]]) + expect(index.get('key3')).toEqual([[30, 1]]) + expect(index.get('key4')).toEqual([ + [40, 1], + [40, -1], + ]) + expect(index.get('key5')).toEqual([[50, 1]]) + + // Compact without arguments should only compact changed keys + index.compact() + + // After compaction, values should be consolidated and zero-multiplicity entries removed + expect(index.get('key1')).toEqual([]) // Cancelled out + expect(index.get('key2')).toEqual([[20, 1]]) + expect(index.get('key3')).toEqual([[30, 1]]) + expect(index.get('key4')).toEqual([]) // Cancelled out + expect(index.get('key5')).toEqual([[50, 1]]) + + // Add more values after compaction + index.addValue('key2', [25, 1]) + index.addValue('key6', [60, 1]) + + // Only key2 and key6 should have new uncompacted values + expect(index.get('key2')).toEqual([ + [20, 1], + [25, 1], + ]) + expect(index.get('key6')).toEqual([[60, 1]]) + + // Compact again - should only affect key2 and key6 + index.compact() + + expect(index.get('key2')).toEqual([ + [20, 1], + [25, 1], + ]) // Both values preserved + expect(index.get('key6')).toEqual([[60, 1]]) + + // Other keys should remain unchanged + expect(index.get('key3')).toEqual([[30, 1]]) + expect(index.get('key5')).toEqual([[50, 1]]) + }) }) diff --git a/packages/d2mini/tests/operators/count.test.ts b/packages/d2mini/tests/operators/count.test.ts index 7b87afc..50c066c 100644 --- a/packages/d2mini/tests/operators/count.test.ts +++ b/packages/d2mini/tests/operators/count.test.ts @@ -111,8 +111,8 @@ function testCount() { expect(data).toEqual([ [[['one', 2], 1]], [ - [['one', 3], 1], [['one', 2], -1], // <-- old count of 'one' removed + [['one', 3], 1], [['two', 1], 1], ], ]) diff --git a/packages/d2mini/tests/operators/distinct.test.ts b/packages/d2mini/tests/operators/distinct.test.ts index 12b9034..94fbfd6 100644 --- a/packages/d2mini/tests/operators/distinct.test.ts +++ b/packages/d2mini/tests/operators/distinct.test.ts @@ -83,8 +83,8 @@ function testDistinct() { [[1, 'b'], 1], ], [ - [[1, 'c'], 1], [[1, 'b'], -1], + [[1, 'c'], 1], ], ]) }) diff --git a/packages/d2mini/tests/operators/groupBy.test.ts b/packages/d2mini/tests/operators/groupBy.test.ts index c147620..deaba21 100644 --- a/packages/d2mini/tests/operators/groupBy.test.ts +++ b/packages/d2mini/tests/operators/groupBy.test.ts @@ -232,11 +232,11 @@ describe('Operators', () => { { category: 'A', region: 'East', - total: 45, - count: 3, + total: 30, + count: 2, }, ], - 1, + -1, ], [ [ @@ -244,11 +244,11 @@ describe('Operators', () => { { category: 'A', region: 'East', - total: 30, - count: 2, + total: 45, + count: 3, }, ], - -1, + 1, ], [ [ @@ -281,11 +281,11 @@ describe('Operators', () => { { category: 'A', region: 'East', - total: 25, - count: 2, + total: 45, + count: 3, }, ], - 1, + -1, ], [ [ @@ -293,11 +293,11 @@ describe('Operators', () => { { category: 'A', region: 'East', - total: 45, - count: 3, + total: 25, + count: 2, }, ], - -1, + 1, ], ] @@ -381,22 +381,22 @@ describe('Operators', () => { '{"category":"A"}', { category: 'A', - average: 20, - count: 3, + average: 15, + count: 2, }, ], - 1, + -1, ], [ [ '{"category":"A"}', { category: 'A', - average: 15, - count: 2, + average: 20, + count: 3, }, ], - -1, + 1, ], [ [ @@ -427,22 +427,22 @@ describe('Operators', () => { '{"category":"A"}', { category: 'A', - average: 25, - count: 2, + average: 20, + count: 3, }, ], - 1, + -1, ], [ [ '{"category":"A"}', { category: 'A', - average: 20, - count: 3, + average: 25, + count: 2, }, ], - -1, + 1, ], ] @@ -584,5 +584,477 @@ describe('Operators', () => { expect(latestMessage.getInner()).toEqual(expectedResult) }) + + test('complete group removal with sum aggregate', () => { + const graph = new D2() + const input = graph.newInput<{ + category: string + amount: number + }>() + let latestMessage: any = null + + input.pipe( + groupBy((data) => ({ category: data.category }), { + total: sum((data) => data.amount), + }), + output((message) => { + latestMessage = message + }), + ) + + graph.finalize() + + // Initial data + input.sendData( + new MultiSet([ + [{ category: 'A', amount: 10 }, 1], + [{ category: 'A', amount: 20 }, 1], + [{ category: 'B', amount: 30 }, 1], + [{ category: 'C', amount: 40 }, 1], + ]), + ) + graph.run() + + // Verify initial state + expect(latestMessage).not.toBeNull() + let result = latestMessage.getInner() + expect(result).toHaveLength(3) // Should have 3 groups + + // Find the group for category A + const categoryAGroup = result.find( + ([key]) => key[0] === '{"category":"A"}', + ) + expect(categoryAGroup).toBeDefined() + expect(categoryAGroup[0][1].total).toBe(30) // Sum of 10 + 20 + + // Now remove ALL records from category A + input.sendData( + new MultiSet([ + [{ category: 'A', amount: 10 }, -1], + [{ category: 'A', amount: 20 }, -1], + ]), + ) + graph.run() + + // After removing all A records, the group should be completely removed + // NOT return a group with total: 0 + result = latestMessage.getInner() + + // The result should contain the removal of the old group + // but NOT the creation of a new group with total: 0 + const expectedResult = [ + [ + [ + '{"category":"A"}', + { + category: 'A', + total: 30, + }, + ], + -1, // This should be removed + ], + ] + + expect(result).toEqual(expectedResult) + + // Verify no new group with total: 0 was created by checking that + // we don't have any positive weight entries for category A + const positiveCategoryAEntries = result.filter( + ([key, , weight]) => key[0] === '{"category":"A"}' && weight > 0, + ) + expect(positiveCategoryAEntries).toHaveLength(0) + }) + + test('complete group removal with multiple aggregates', () => { + const graph = new D2() + const input = graph.newInput<{ + category: string + region: string + amount: number + }>() + let latestMessage: any = null + + input.pipe( + groupBy( + (data) => ({ + category: data.category, + region: data.region, + }), + { + total: sum((data) => data.amount), + count: count(), + average: avg((data) => data.amount), + }, + ), + output((message) => { + latestMessage = message + }), + ) + + graph.finalize() + + // Initial data + input.sendData( + new MultiSet([ + [{ category: 'A', region: 'East', amount: 10 }, 1], + [{ category: 'A', region: 'East', amount: 20 }, 1], + [{ category: 'A', region: 'West', amount: 30 }, 1], + [{ category: 'B', region: 'East', amount: 40 }, 1], + ]), + ) + graph.run() + + // Verify initial state + expect(latestMessage).not.toBeNull() + let result = latestMessage.getInner() + expect(result).toHaveLength(3) // Should have 3 groups + + // Find the group for category A, region East + const categoryAEastGroup = result.find( + ([key]) => key[0] === '{"category":"A","region":"East"}', + ) + expect(categoryAEastGroup).toBeDefined() + expect(categoryAEastGroup[0][1]).toEqual({ + category: 'A', + region: 'East', + total: 30, // 10 + 20 + count: 2, + average: 15, // 30 / 2 + }) + + // Now remove ALL records from category A, region East + input.sendData( + new MultiSet([ + [{ category: 'A', region: 'East', amount: 10 }, -1], + [{ category: 'A', region: 'East', amount: 20 }, -1], + ]), + ) + graph.run() + + // After removing all A/East records, that group should be completely removed + // NOT return a group with total: 0, count: 0, average: 0 (or NaN) + result = latestMessage.getInner() + + // The result should contain the removal of the old group + const expectedResult = [ + [ + [ + '{"category":"A","region":"East"}', + { + category: 'A', + region: 'East', + total: 30, + count: 2, + average: 15, + }, + ], + -1, // This should be removed + ], + ] + + expect(result).toEqual(expectedResult) + + // Verify no new group with zero/empty values was created + const positiveCategoryAEastEntries = result.filter( + ([key, , weight]) => + key[0] === '{"category":"A","region":"East"}' && weight > 0, + ) + expect(positiveCategoryAEastEntries).toHaveLength(0) + }) + + test('group removal and re-addition with sum aggregate', () => { + const graph = new D2() + const input = graph.newInput<{ + category: string + amount: number + }>() + let latestMessage: any = null + + input.pipe( + groupBy((data) => ({ category: data.category }), { + total: sum((data) => data.amount), + }), + output((message) => { + latestMessage = message + }), + ) + + graph.finalize() + + // Step 1: Initial data + input.sendData( + new MultiSet([ + [{ category: 'A', amount: 10 }, 1], + [{ category: 'A', amount: 20 }, 1], + [{ category: 'B', amount: 30 }, 1], + ]), + ) + graph.run() + + // Verify initial state + expect(latestMessage).not.toBeNull() + let result = latestMessage.getInner() + expect(result).toHaveLength(2) // Should have 2 groups + + // Find the group for category A + let categoryAGroup = result.find( + ([key]) => key[0] === '{"category":"A"}', + ) + expect(categoryAGroup).toBeDefined() + expect(categoryAGroup[0][1].total).toBe(30) // Sum of 10 + 20 + + // Step 2: Remove ALL records from category A + input.sendData( + new MultiSet([ + [{ category: 'A', amount: 10 }, -1], + [{ category: 'A', amount: 20 }, -1], + ]), + ) + graph.run() + + // Verify group A is completely removed + result = latestMessage.getInner() + const expectedRemovalResult = [ + [ + [ + '{"category":"A"}', + { + category: 'A', + total: 30, + }, + ], + -1, // Group should be removed + ], + ] + expect(result).toEqual(expectedRemovalResult) + + // Step 3: Re-add records to category A with different values + input.sendData( + new MultiSet([ + [{ category: 'A', amount: 50 }, 1], + [{ category: 'A', amount: 25 }, 1], + ]), + ) + graph.run() + + // Verify group A is recreated with correct new aggregate values + result = latestMessage.getInner() + const expectedReAdditionResult = [ + [ + [ + '{"category":"A"}', + { + category: 'A', + total: 75, // 50 + 25 (new values, not the old 30) + }, + ], + 1, // New group should be added + ], + ] + expect(result).toEqual(expectedReAdditionResult) + + // Step 4: Verify no lingering effects by adding more data + input.sendData( + new MultiSet([ + [{ category: 'A', amount: 15 }, 1], + ]), + ) + graph.run() + + // Verify aggregate is updated correctly from the new baseline + result = latestMessage.getInner() + const expectedUpdateResult = [ + [ + [ + '{"category":"A"}', + { + category: 'A', + total: 75, // Previous total + }, + ], + -1, // Remove old state + ], + [ + [ + '{"category":"A"}', + { + category: 'A', + total: 90, // 75 + 15 + }, + ], + 1, // Add new state + ], + ] + expect(result).toEqual(expectedUpdateResult) + }) + + test('group removal and re-addition with multiple aggregates', () => { + const graph = new D2() + const input = graph.newInput<{ + category: string + region: string + amount: number + }>() + let latestMessage: any = null + + input.pipe( + groupBy( + (data) => ({ + category: data.category, + region: data.region, + }), + { + total: sum((data) => data.amount), + count: count(), + average: avg((data) => data.amount), + minimum: min((data) => data.amount), + maximum: max((data) => data.amount), + }, + ), + output((message) => { + latestMessage = message + }), + ) + + graph.finalize() + + // Step 1: Initial data + input.sendData( + new MultiSet([ + [{ category: 'A', region: 'East', amount: 10 }, 1], + [{ category: 'A', region: 'East', amount: 20 }, 1], + [{ category: 'A', region: 'East', amount: 30 }, 1], + [{ category: 'B', region: 'West', amount: 100 }, 1], + ]), + ) + graph.run() + + // Verify initial state + expect(latestMessage).not.toBeNull() + let result = latestMessage.getInner() + expect(result).toHaveLength(2) // Should have 2 groups + + // Find the group for category A, region East + let categoryAEastGroup = result.find( + ([key]) => key[0] === '{"category":"A","region":"East"}', + ) + expect(categoryAEastGroup).toBeDefined() + expect(categoryAEastGroup[0][1]).toEqual({ + category: 'A', + region: 'East', + total: 60, // 10 + 20 + 30 + count: 3, + average: 20, // 60 / 3 + minimum: 10, + maximum: 30, + }) + + // Step 2: Remove ALL records from category A, region East + input.sendData( + new MultiSet([ + [{ category: 'A', region: 'East', amount: 10 }, -1], + [{ category: 'A', region: 'East', amount: 20 }, -1], + [{ category: 'A', region: 'East', amount: 30 }, -1], + ]), + ) + graph.run() + + // Verify group is completely removed + result = latestMessage.getInner() + const expectedRemovalResult = [ + [ + [ + '{"category":"A","region":"East"}', + { + category: 'A', + region: 'East', + total: 60, + count: 3, + average: 20, + minimum: 10, + maximum: 30, + }, + ], + -1, // Group should be removed + ], + ] + expect(result).toEqual(expectedRemovalResult) + + // Step 3: Re-add records to category A, region East with completely different values + input.sendData( + new MultiSet([ + [{ category: 'A', region: 'East', amount: 5 }, 1], + [{ category: 'A', region: 'East', amount: 15 }, 1], + [{ category: 'A', region: 'East', amount: 40 }, 1], + [{ category: 'A', region: 'East', amount: 40 }, 1], // Duplicate to test aggregates properly + ]), + ) + graph.run() + + // Verify group is recreated with correct new aggregate values + result = latestMessage.getInner() + const expectedReAdditionResult = [ + [ + [ + '{"category":"A","region":"East"}', + { + category: 'A', + region: 'East', + total: 100, // 5 + 15 + 40 + 40 (completely new calculation) + count: 4, + average: 25, // 100 / 4 + minimum: 5, // New minimum + maximum: 40, // New maximum + }, + ], + 1, // New group should be added + ], + ] + expect(result).toEqual(expectedReAdditionResult) + + // Step 4: Remove some records and verify aggregates update correctly + input.sendData( + new MultiSet([ + [{ category: 'A', region: 'East', amount: 40 }, -1], // Remove one of the 40s + ]), + ) + graph.run() + + // Verify aggregates are updated correctly from the new baseline + result = latestMessage.getInner() + const expectedPartialRemovalResult = [ + [ + [ + '{"category":"A","region":"East"}', + { + category: 'A', + region: 'East', + total: 100, + count: 4, + average: 25, + minimum: 5, + maximum: 40, + }, + ], + -1, // Remove old state + ], + [ + [ + '{"category":"A","region":"East"}', + { + category: 'A', + region: 'East', + total: 60, // 5 + 15 + 40 (one 40 removed) + count: 3, + average: 20, // 60 / 3 + minimum: 5, // Still 5 + maximum: 40, // Still 40 (one remains) + }, + ], + 1, // Add new state + ], + ] + expect(result).toEqual(expectedPartialRemovalResult) + }) }) }) diff --git a/packages/d2mini/tests/operators/reduce.test.ts b/packages/d2mini/tests/operators/reduce.test.ts index 05ac358..63330e5 100644 --- a/packages/d2mini/tests/operators/reduce.test.ts +++ b/packages/d2mini/tests/operators/reduce.test.ts @@ -85,5 +85,314 @@ describe('Operators', () => { ], ]) }) + + test('multiple incremental updates to same key', () => { + const graph = new D2() + const input = graph.newInput<[string, number]>() + const messages: MultiSet<[string, number]>[] = [] + + input.pipe( + reduce((vals) => { + let sum = 0 + for (const [val, diff] of vals) { + sum += val * diff + } + return [[sum, 1]] + }), + output((message) => { + messages.push(message) + }), + ) + + graph.finalize() + + // First update: a=1, b=2 + input.sendData( + new MultiSet([ + [['a', 1], 1], + [['b', 2], 1], + ]), + ) + graph.run() + + // Second update: add more to a, modify b + input.sendData( + new MultiSet([ + [['a', 3], 1], + [['b', 4], 1], + ]), + ) + graph.run() + + // Third update: remove some from a + input.sendData(new MultiSet([[['a', 1], -1]])) + graph.run() + + const data = messages.map((m) => m.getInner()) + + expect(data).toEqual([ + // First update: a=1, b=2 + [ + [['a', 1], 1], + [['b', 2], 1], + ], + // Second update: old values removed, new values added + [ + [['a', 1], -1], // Remove old sum for a + [['a', 4], 1], // Add new sum for a (1+3) + [['b', 2], -1], // Remove old sum for b + [['b', 6], 1], // Add new sum for b (2+4) + ], + // Third update: remove a=1, so new sum is just 3 + [ + [['a', 4], -1], // Remove old sum for a + [['a', 3], 1], // Add new sum for a (just 3 now) + ], + ]) + }) + + test('updates that cancel out completely', () => { + const graph = new D2() + const input = graph.newInput<[string, number]>() + const messages: MultiSet<[string, number]>[] = [] + + input.pipe( + reduce((vals) => { + let sum = 0 + for (const [val, diff] of vals) { + sum += val * diff + } + return [[sum, 1]] + }), + output((message) => { + messages.push(message) + }), + ) + + graph.finalize() + + // First update: add values + input.sendData( + new MultiSet([ + [['a', 5], 1], + [['a', 3], 1], + [['b', 10], 1], + ]), + ) + graph.run() + + // Second update: cancel out all values for 'a' + input.sendData( + new MultiSet([ + [['a', 5], -1], + [['a', 3], -1], + ]), + ) + graph.run() + + const data = messages.map((m) => m.getInner()) + + expect(data).toEqual([ + // First update: a=8, b=10 + [ + [['a', 8], 1], + [['b', 10], 1], + ], + // Second update: remove old sum, add new sum (which is 0) + [ + [['a', 8], -1], // Remove old sum for a + [['a', 0], 1], // Add new sum for a (which is 0) + ], + ]) + }) + + test('mixed positive and negative updates', () => { + const graph = new D2() + const input = graph.newInput<[string, number]>() + const messages: MultiSet<[string, number]>[] = [] + + input.pipe( + reduce((vals) => { + let sum = 0 + for (const [val, diff] of vals) { + sum += val * diff + } + return [[sum, 1]] + }), + output((message) => { + messages.push(message) + }), + ) + + graph.finalize() + + // First update: establish initial state + input.sendData( + new MultiSet([ + [['a', 10], 1], + [['a', 5], 2], + [['b', 20], 1], + ]), + ) + graph.run() + + // Second update: mix of adds and removes + input.sendData( + new MultiSet([ + [['a', 10], -1], // Remove one 10 + [['a', 2], 1], // Add a 2 + [['b', 20], -1], // Remove the 20 + [['b', 15], 1], // Add a 15 + [['c', 100], 1], // Add new key + ]), + ) + graph.run() + + const data = messages.map((m) => m.getInner()) + + expect(data).toEqual([ + // First update: a=20 (10+5+5), b=20 + [ + [['a', 20], 1], + [['b', 20], 1], + ], + // Second update: a=12 (5+5+2), b=15, c=100 + [ + [['a', 20], -1], // Remove old sum for a + [['a', 12], 1], // Add new sum for a + [['b', 20], -1], // Remove old sum for b + [['b', 15], 1], // Add new sum for b + [['c', 100], 1], // Add new key c + ], + ]) + }) + + test('complex aggregation with multiple updates', () => { + const graph = new D2() + const input = graph.newInput<[string, { value: number; count: number }]>() + const messages: MultiSet<[string, { avg: number; total: number }]>[] = [] + + input.pipe( + reduce((vals) => { + let totalSum = 0 + let totalCount = 0 + for (const [val, diff] of vals) { + totalSum += val.value * val.count * diff + totalCount += val.count * diff + } + const avg = totalCount > 0 ? totalSum / totalCount : 0 + return [[{ avg, total: totalSum }, 1]] + }), + output((message) => { + messages.push(message) + }), + ) + + graph.finalize() + + // First batch: group 'a' has values + input.sendData( + new MultiSet([ + [['a', { value: 10, count: 2 }], 1], // 2 values of 10 + [['a', { value: 20, count: 1 }], 1], // 1 value of 20 + ]), + ) + graph.run() + + // Second batch: add more to 'a' and start 'b' + input.sendData( + new MultiSet([ + [['a', { value: 30, count: 1 }], 1], // 1 value of 30 + [['b', { value: 50, count: 3 }], 1], // 3 values of 50 + ]), + ) + graph.run() + + // Third batch: remove some from 'a' + input.sendData( + new MultiSet([ + [['a', { value: 10, count: 2 }], -1], // Remove the 2 values of 10 + ]), + ) + graph.run() + + const data = messages.map((m) => m.getInner()) + + expect(data).toEqual([ + // First update: a avg=(10*2+20*1)/(2+1)=40/3≈13.33, total=40 + [[['a', { avg: 40 / 3, total: 40 }], 1]], + // Second update: + // a avg=(10*2+20*1+30*1)/(2+1+1)=70/4=17.5, total=70 + // b avg=50, total=150 + [ + [['a', { avg: 40 / 3, total: 40 }], -1], // Remove old + [['a', { avg: 17.5, total: 70 }], 1], // Add new + [['b', { avg: 50, total: 150 }], 1], // New key + ], + // Third update: a avg=(20*1+30*1)/(1+1)=50/2=25, total=50 + [ + [['a', { avg: 17.5, total: 70 }], -1], // Remove old + [['a', { avg: 25, total: 50 }], 1], // Add new + ], + ]) + }) + + test('updates with zero-multiplicity results', () => { + const graph = new D2() + const input = graph.newInput<[string, number]>() + const messages: MultiSet<[string, number]>[] = [] + + input.pipe( + reduce((vals) => { + let sum = 0 + for (const [val, diff] of vals) { + sum += val * diff + } + // Only return non-zero sums + return sum !== 0 ? [[sum, 1]] : [] + }), + output((message) => { + messages.push(message) + }), + ) + + graph.finalize() + + // First update: establish values + input.sendData( + new MultiSet([ + [['a', 5], 1], + [['a', -3], 1], + [['b', 10], 1], + ]), + ) + graph.run() + + // Second update: make 'a' sum to zero + input.sendData(new MultiSet([[['a', -2], 1]])) + graph.run() + + // Third update: add back to 'a' + input.sendData(new MultiSet([[['a', 7], 1]])) + graph.run() + + const data = messages.map((m) => m.getInner()) + + expect(data).toEqual([ + // First update: a=2, b=10 + [ + [['a', 2], 1], + [['b', 10], 1], + ], + // Second update: a becomes 0 (filtered out), only removal + [ + [['a', 2], -1], // Remove old sum for a + ], + // Third update: a=7 (0+7) + [ + [['a', 7], 1], // Add new sum for a + ], + ]) + }) }) })