Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/lucky-toys-fail.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@electric-sql/d2mini': patch
---

fix a bug where `reduce` would not emit an message for deleted keys
5 changes: 5 additions & 0 deletions .changeset/odd-garlics-chew.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@electric-sql/d2mini': patch
---

fix a bug where groupBy would not remove a group if it's key was completely removed from the stream
128 changes: 72 additions & 56 deletions packages/d2mini/src/indexes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,24 @@ import { DefaultMap, hash } from './utils.js'
* exploit the key-value structure of the data to run efficiently.
*/
export class Index<K, V> {
#inner: DefaultMap<K, DefaultMap<string, [V, number]>>
#inner: DefaultMap<K, [V, number][]>
#changedKeys: Set<K>

constructor() {
this.#inner = new DefaultMap<K, DefaultMap<string, [V, number]>>(
() =>
new DefaultMap<string, [V, number]>(() => [undefined as any as V, 0]),
)
// #inner is as map of:
// {
// [key]: {
// [hash(value)]: [value, multiplicity]
// }
// }
this.#inner = new DefaultMap<K, [V, number][]>(() => [])
this.#changedKeys = new Set<K>()
}

toString(indent = false): string {
return `Index(${JSON.stringify(
[...this.#inner].map(([k, valueMap]) => [k, [...valueMap]]),
[...this.#inner],
undefined,
indent ? ' ' : undefined,
)})`
}

get(key: K): [V, number][] {
const valueMap = this.#inner.get(key)
return [...valueMap.values()]
return this.#inner.get(key)
}

entries() {
Expand All @@ -44,54 +36,78 @@ export class Index<K, V> {
}

has(key: K): boolean {
return this.#inner.has(key)
return this.#inner.has(key) && this.#inner.get(key).length > 0
}

get size(): number {
return this.#inner.size
let count = 0
for (const [, values] of this.#inner.entries()) {
if (values.length > 0) {
count++
}
}
return count
}

addValue(key: K, value: [V, number]): void {
const [val, multiplicity] = value
const valueMap = this.#inner.get(key)
const valueHash = hash(val)
const [, existingMultiplicity] = valueMap.get(valueHash)
if (existingMultiplicity !== 0) {
const newMultiplicity = existingMultiplicity + multiplicity
if (newMultiplicity === 0) {
valueMap.delete(valueHash)
} else {
valueMap.set(valueHash, [val, newMultiplicity])
const values = this.#inner.get(key)
values.push(value)
this.#changedKeys.add(key)
}

append(other: Index<K, V>): void {
for (const [key, otherValues] of other.entries()) {
const thisValues = this.#inner.get(key)
for (const value of otherValues) {
thisValues.push(value)
}
this.#changedKeys.add(key)
}
}

compact(keys: K[] = []): void {
// If no keys specified, use the changed keys
const keysToProcess = keys.length === 0 ? [...this.#changedKeys] : keys

for (const key of keysToProcess) {
if (!this.#inner.has(key)) continue

const values = this.#inner.get(key)
const consolidated = this.consolidateValues(values)

// Remove the key entirely and re-add only if there are non-zero values
this.#inner.delete(key)
if (consolidated.length > 0) {
this.#inner.get(key).push(...consolidated)
}
}

// Clear the changed keys after compaction
if (keys.length === 0) {
this.#changedKeys.clear()
} else {
if (multiplicity !== 0) {
valueMap.set(valueHash, [val, multiplicity])
// Only remove the keys that were explicitly compacted
for (const key of keys) {
this.#changedKeys.delete(key)
}
}
}

append(other: Index<K, V>): void {
for (const [key, otherValueMap] of other.entries()) {
const thisValueMap = this.#inner.get(key)
for (const [
valueHash,
[value, multiplicity],
] of otherValueMap.entries()) {
const [, existingMultiplicity] = thisValueMap.get(valueHash)
if (existingMultiplicity !== 0) {
const newMultiplicity = existingMultiplicity + multiplicity
if (newMultiplicity === 0) {
thisValueMap.delete(valueHash)
} else {
thisValueMap.set(valueHash, [value, newMultiplicity])
}
} else {
if (multiplicity !== 0) {
thisValueMap.set(valueHash, [value, multiplicity])
}
}
private consolidateValues(values: [V, number][]): [V, number][] {
const consolidated = new Map<string, { value: V; multiplicity: number }>()

for (const [value, multiplicity] of values) {
const valueHash = hash(value)
if (consolidated.has(valueHash)) {
consolidated.get(valueHash)!.multiplicity += multiplicity
} else {
consolidated.set(valueHash, { value, multiplicity })
}
}

return [...consolidated.values()]
.filter(({ multiplicity }) => multiplicity !== 0)
.map(({ value, multiplicity }) => [value, multiplicity])
}

join<V2>(other: Index<K, V2>): MultiSet<[K, [V, V2]]> {
Expand All @@ -100,23 +116,23 @@ export class Index<K, V> {
// We want to iterate over the smaller of the two indexes to reduce the
// number of operations we need to do.
if (this.size <= other.size) {
for (const [key, valueMap] of this.entries()) {
for (const [key, values1] of this.entries()) {
if (!other.has(key)) continue
const otherValues = other.get(key)
for (const [val1, mul1] of valueMap.values()) {
for (const [val2, mul2] of otherValues) {
const values2 = other.get(key)
for (const [val1, mul1] of values1) {
for (const [val2, mul2] of values2) {
if (mul1 !== 0 && mul2 !== 0) {
result.push([[key, [val1, val2]], mul1 * mul2])
}
}
}
}
} else {
for (const [key, otherValueMap] of other.entries()) {
for (const [key, values2] of other.entries()) {
if (!this.has(key)) continue
const values = this.get(key)
for (const [val2, mul2] of otherValueMap.values()) {
for (const [val1, mul1] of values) {
const values1 = this.get(key)
for (const [val2, mul2] of values2) {
for (const [val1, mul1] of values1) {
if (mul1 !== 0 && mul2 !== 0) {
result.push([[key, [val1, val2]], mul1 * mul2])
}
Expand Down
11 changes: 11 additions & 0 deletions packages/d2mini/src/operators/groupBy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,17 @@ export function groupBy<
// Then reduce to compute aggregates
const reduced = withKeysAndValues.pipe(
reduce((values) => {
// Calculate total multiplicity to check if the group should exist
let totalMultiplicity = 0
for (const [_, multiplicity] of values) {
totalMultiplicity += multiplicity
}

// If total multiplicity is 0 or negative, the group should be removed completely
if (totalMultiplicity <= 0) {
return []
}

const result: Record<string, unknown> = {}

// Get the original key from first value in group
Expand Down
7 changes: 7 additions & 0 deletions packages/d2mini/src/operators/join.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ export class JoinOperator<K, V1, V2> extends BinaryOperator<

// Append deltaB to indexB
this.#indexB.append(deltaB)

// Compact both indexes to consolidate values and remove zero-multiplicity entries
// Only compact changed keys for efficiency
deltaA.compact()
deltaB.compact()
this.#indexA.compact()
this.#indexB.compact()
}
}

Expand Down
67 changes: 50 additions & 17 deletions packages/d2mini/src/operators/reduce.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import { hash } from '../utils.js'
export class ReduceOperator<K, V1, V2> extends UnaryOperator<[K, V1], [K, V2]> {
#index = new Index<K, V1>()
#indexOut = new Index<K, V2>()
#keysTodo = new Set<K>()
#f: (values: [V1, number][]) => [V2, number][]

constructor(
Expand All @@ -30,49 +29,83 @@ export class ReduceOperator<K, V1, V2> extends UnaryOperator<[K, V1], [K, V2]> {

run(): void {
// Collect all input messages and update the index
const keysTodo = new Set<K>()
for (const message of this.inputMessages()) {
for (const [item, multiplicity] of message.getInner()) {
const [key, value] = item
this.#index.addValue(key, [value, multiplicity])
this.#keysTodo.add(key)
keysTodo.add(key)
}
}

// For each key, compute the reduction and delta
const result: [[K, V2], number][] = []
for (const key of this.#keysTodo) {
for (const key of keysTodo) {
const curr = this.#index.get(key)
const currOut = this.#indexOut.get(key)
const out = this.#f(curr)

// Calculate delta between current and previous output
const delta = new Map<string, number>()
const values = new Map<string, V2>()
// Create maps for current and previous outputs
const newOutputMap = new Map<
string,
{ value: V2; multiplicity: number }
>()
const oldOutputMap = new Map<
string,
{ value: V2; multiplicity: number }
>()

// Process new output
for (const [value, multiplicity] of out) {
const valueKey = hash(value)
values.set(valueKey, value)
delta.set(valueKey, (delta.get(valueKey) || 0) + multiplicity)
if (newOutputMap.has(valueKey)) {
newOutputMap.get(valueKey)!.multiplicity += multiplicity
} else {
newOutputMap.set(valueKey, { value, multiplicity })
}
}

// Process previous output
for (const [value, multiplicity] of currOut) {
const valueKey = hash(value)
values.set(valueKey, value)
delta.set(valueKey, (delta.get(valueKey) || 0) - multiplicity)
if (oldOutputMap.has(valueKey)) {
oldOutputMap.get(valueKey)!.multiplicity += multiplicity
} else {
oldOutputMap.set(valueKey, { value, multiplicity })
}
}

// Add non-zero deltas to result
for (const [valueKey, multiplicity] of delta) {
const value = values.get(valueKey)!
if (multiplicity !== 0) {
result.push([[key, value], multiplicity])
this.#indexOut.addValue(key, [value, multiplicity])
// First, emit removals for old values that are no longer present or have changed
for (const [valueKey, { value, multiplicity }] of oldOutputMap) {
const newEntry = newOutputMap.get(valueKey)
if (!newEntry || newEntry.multiplicity !== multiplicity) {
// Remove the old value entirely
result.push([[key, value], -multiplicity])
this.#indexOut.addValue(key, [value, -multiplicity])
}
}

// Then, emit additions for new values that are not present in old or have changed
for (const [valueKey, { value, multiplicity }] of newOutputMap) {
const oldEntry = oldOutputMap.get(valueKey)
if (!oldEntry || oldEntry.multiplicity !== multiplicity) {
// Add the new value only if it has non-zero multiplicity
if (multiplicity !== 0) {
result.push([[key, value], multiplicity])
this.#indexOut.addValue(key, [value, multiplicity])
}
}
}
}

if (result.length > 0) {
this.output.sendData(new MultiSet(result))
}
this.#keysTodo.clear()

// Compact both indexes to consolidate values and remove zero-multiplicity entries
// Only compact changed keys for efficiency
this.#index.compact()
this.#indexOut.compact()
}
}

Expand Down
5 changes: 5 additions & 0 deletions packages/d2mini/src/operators/topKWithFractionalIndex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,11 @@ export class TopKWithFractionalIndexOperator<K, V1> extends UnaryOperator<
if (result.length > 0) {
this.output.sendData(new MultiSet(result))
}

// Compact both indexes to consolidate values and remove zero-multiplicity entries
// Only compact changed keys for efficiency
this.#index.compact()
this.#indexOut.compact()
}
}

Expand Down
Loading