Skip to content

Commit fb90328

Browse files
authored
fix: index compaction for reduce (#69)
* fix: index compaction for reduce * changeset
1 parent a6b55e8 commit fb90328

File tree

10 files changed

+556
-100
lines changed

10 files changed

+556
-100
lines changed

.changeset/lucky-toys-fail.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@electric-sql/d2mini': patch
3+
---
4+
5+
fix a bug where `reduce` would not emit an message for deleted keys

packages/d2mini/src/indexes.ts

Lines changed: 72 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -7,32 +7,24 @@ import { DefaultMap, hash } from './utils.js'
77
* exploit the key-value structure of the data to run efficiently.
88
*/
99
export class Index<K, V> {
10-
#inner: DefaultMap<K, DefaultMap<string, [V, number]>>
10+
#inner: DefaultMap<K, [V, number][]>
11+
#changedKeys: Set<K>
1112

1213
constructor() {
13-
this.#inner = new DefaultMap<K, DefaultMap<string, [V, number]>>(
14-
() =>
15-
new DefaultMap<string, [V, number]>(() => [undefined as any as V, 0]),
16-
)
17-
// #inner is as map of:
18-
// {
19-
// [key]: {
20-
// [hash(value)]: [value, multiplicity]
21-
// }
22-
// }
14+
this.#inner = new DefaultMap<K, [V, number][]>(() => [])
15+
this.#changedKeys = new Set<K>()
2316
}
2417

2518
toString(indent = false): string {
2619
return `Index(${JSON.stringify(
27-
[...this.#inner].map(([k, valueMap]) => [k, [...valueMap]]),
20+
[...this.#inner],
2821
undefined,
2922
indent ? ' ' : undefined,
3023
)})`
3124
}
3225

3326
get(key: K): [V, number][] {
34-
const valueMap = this.#inner.get(key)
35-
return [...valueMap.values()]
27+
return this.#inner.get(key)
3628
}
3729

3830
entries() {
@@ -44,54 +36,78 @@ export class Index<K, V> {
4436
}
4537

4638
has(key: K): boolean {
47-
return this.#inner.has(key)
39+
return this.#inner.has(key) && this.#inner.get(key).length > 0
4840
}
4941

5042
get size(): number {
51-
return this.#inner.size
43+
let count = 0
44+
for (const [, values] of this.#inner.entries()) {
45+
if (values.length > 0) {
46+
count++
47+
}
48+
}
49+
return count
5250
}
5351

5452
addValue(key: K, value: [V, number]): void {
55-
const [val, multiplicity] = value
56-
const valueMap = this.#inner.get(key)
57-
const valueHash = hash(val)
58-
const [, existingMultiplicity] = valueMap.get(valueHash)
59-
if (existingMultiplicity !== 0) {
60-
const newMultiplicity = existingMultiplicity + multiplicity
61-
if (newMultiplicity === 0) {
62-
valueMap.delete(valueHash)
63-
} else {
64-
valueMap.set(valueHash, [val, newMultiplicity])
53+
const values = this.#inner.get(key)
54+
values.push(value)
55+
this.#changedKeys.add(key)
56+
}
57+
58+
append(other: Index<K, V>): void {
59+
for (const [key, otherValues] of other.entries()) {
60+
const thisValues = this.#inner.get(key)
61+
for (const value of otherValues) {
62+
thisValues.push(value)
63+
}
64+
this.#changedKeys.add(key)
65+
}
66+
}
67+
68+
compact(keys: K[] = []): void {
69+
// If no keys specified, use the changed keys
70+
const keysToProcess = keys.length === 0 ? [...this.#changedKeys] : keys
71+
72+
for (const key of keysToProcess) {
73+
if (!this.#inner.has(key)) continue
74+
75+
const values = this.#inner.get(key)
76+
const consolidated = this.consolidateValues(values)
77+
78+
// Remove the key entirely and re-add only if there are non-zero values
79+
this.#inner.delete(key)
80+
if (consolidated.length > 0) {
81+
this.#inner.get(key).push(...consolidated)
6582
}
83+
}
84+
85+
// Clear the changed keys after compaction
86+
if (keys.length === 0) {
87+
this.#changedKeys.clear()
6688
} else {
67-
if (multiplicity !== 0) {
68-
valueMap.set(valueHash, [val, multiplicity])
89+
// Only remove the keys that were explicitly compacted
90+
for (const key of keys) {
91+
this.#changedKeys.delete(key)
6992
}
7093
}
7194
}
7295

73-
append(other: Index<K, V>): void {
74-
for (const [key, otherValueMap] of other.entries()) {
75-
const thisValueMap = this.#inner.get(key)
76-
for (const [
77-
valueHash,
78-
[value, multiplicity],
79-
] of otherValueMap.entries()) {
80-
const [, existingMultiplicity] = thisValueMap.get(valueHash)
81-
if (existingMultiplicity !== 0) {
82-
const newMultiplicity = existingMultiplicity + multiplicity
83-
if (newMultiplicity === 0) {
84-
thisValueMap.delete(valueHash)
85-
} else {
86-
thisValueMap.set(valueHash, [value, newMultiplicity])
87-
}
88-
} else {
89-
if (multiplicity !== 0) {
90-
thisValueMap.set(valueHash, [value, multiplicity])
91-
}
92-
}
96+
private consolidateValues(values: [V, number][]): [V, number][] {
97+
const consolidated = new Map<string, { value: V; multiplicity: number }>()
98+
99+
for (const [value, multiplicity] of values) {
100+
const valueHash = hash(value)
101+
if (consolidated.has(valueHash)) {
102+
consolidated.get(valueHash)!.multiplicity += multiplicity
103+
} else {
104+
consolidated.set(valueHash, { value, multiplicity })
93105
}
94106
}
107+
108+
return [...consolidated.values()]
109+
.filter(({ multiplicity }) => multiplicity !== 0)
110+
.map(({ value, multiplicity }) => [value, multiplicity])
95111
}
96112

97113
join<V2>(other: Index<K, V2>): MultiSet<[K, [V, V2]]> {
@@ -100,23 +116,23 @@ export class Index<K, V> {
100116
// We want to iterate over the smaller of the two indexes to reduce the
101117
// number of operations we need to do.
102118
if (this.size <= other.size) {
103-
for (const [key, valueMap] of this.entries()) {
119+
for (const [key, values1] of this.entries()) {
104120
if (!other.has(key)) continue
105-
const otherValues = other.get(key)
106-
for (const [val1, mul1] of valueMap.values()) {
107-
for (const [val2, mul2] of otherValues) {
121+
const values2 = other.get(key)
122+
for (const [val1, mul1] of values1) {
123+
for (const [val2, mul2] of values2) {
108124
if (mul1 !== 0 && mul2 !== 0) {
109125
result.push([[key, [val1, val2]], mul1 * mul2])
110126
}
111127
}
112128
}
113129
}
114130
} else {
115-
for (const [key, otherValueMap] of other.entries()) {
131+
for (const [key, values2] of other.entries()) {
116132
if (!this.has(key)) continue
117-
const values = this.get(key)
118-
for (const [val2, mul2] of otherValueMap.values()) {
119-
for (const [val1, mul1] of values) {
133+
const values1 = this.get(key)
134+
for (const [val2, mul2] of values2) {
135+
for (const [val1, mul1] of values1) {
120136
if (mul1 !== 0 && mul2 !== 0) {
121137
result.push([[key, [val1, val2]], mul1 * mul2])
122138
}

packages/d2mini/src/operators/join.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,13 @@ export class JoinOperator<K, V1, V2> extends BinaryOperator<
7777

7878
// Append deltaB to indexB
7979
this.#indexB.append(deltaB)
80+
81+
// Compact both indexes to consolidate values and remove zero-multiplicity entries
82+
// Only compact changed keys for efficiency
83+
deltaA.compact()
84+
deltaB.compact()
85+
this.#indexA.compact()
86+
this.#indexB.compact()
8087
}
8188
}
8289

packages/d2mini/src/operators/reduce.ts

Lines changed: 50 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import { hash } from '../utils.js'
1515
export class ReduceOperator<K, V1, V2> extends UnaryOperator<[K, V1], [K, V2]> {
1616
#index = new Index<K, V1>()
1717
#indexOut = new Index<K, V2>()
18-
#keysTodo = new Set<K>()
1918
#f: (values: [V1, number][]) => [V2, number][]
2019

2120
constructor(
@@ -30,49 +29,83 @@ export class ReduceOperator<K, V1, V2> extends UnaryOperator<[K, V1], [K, V2]> {
3029

3130
run(): void {
3231
// Collect all input messages and update the index
32+
const keysTodo = new Set<K>()
3333
for (const message of this.inputMessages()) {
3434
for (const [item, multiplicity] of message.getInner()) {
3535
const [key, value] = item
3636
this.#index.addValue(key, [value, multiplicity])
37-
this.#keysTodo.add(key)
37+
keysTodo.add(key)
3838
}
3939
}
4040

4141
// For each key, compute the reduction and delta
4242
const result: [[K, V2], number][] = []
43-
for (const key of this.#keysTodo) {
43+
for (const key of keysTodo) {
4444
const curr = this.#index.get(key)
4545
const currOut = this.#indexOut.get(key)
4646
const out = this.#f(curr)
4747

48-
// Calculate delta between current and previous output
49-
const delta = new Map<string, number>()
50-
const values = new Map<string, V2>()
48+
// Create maps for current and previous outputs
49+
const newOutputMap = new Map<
50+
string,
51+
{ value: V2; multiplicity: number }
52+
>()
53+
const oldOutputMap = new Map<
54+
string,
55+
{ value: V2; multiplicity: number }
56+
>()
57+
58+
// Process new output
5159
for (const [value, multiplicity] of out) {
5260
const valueKey = hash(value)
53-
values.set(valueKey, value)
54-
delta.set(valueKey, (delta.get(valueKey) || 0) + multiplicity)
61+
if (newOutputMap.has(valueKey)) {
62+
newOutputMap.get(valueKey)!.multiplicity += multiplicity
63+
} else {
64+
newOutputMap.set(valueKey, { value, multiplicity })
65+
}
5566
}
67+
68+
// Process previous output
5669
for (const [value, multiplicity] of currOut) {
5770
const valueKey = hash(value)
58-
values.set(valueKey, value)
59-
delta.set(valueKey, (delta.get(valueKey) || 0) - multiplicity)
71+
if (oldOutputMap.has(valueKey)) {
72+
oldOutputMap.get(valueKey)!.multiplicity += multiplicity
73+
} else {
74+
oldOutputMap.set(valueKey, { value, multiplicity })
75+
}
6076
}
6177

62-
// Add non-zero deltas to result
63-
for (const [valueKey, multiplicity] of delta) {
64-
const value = values.get(valueKey)!
65-
if (multiplicity !== 0) {
66-
result.push([[key, value], multiplicity])
67-
this.#indexOut.addValue(key, [value, multiplicity])
78+
// First, emit removals for old values that are no longer present or have changed
79+
for (const [valueKey, { value, multiplicity }] of oldOutputMap) {
80+
const newEntry = newOutputMap.get(valueKey)
81+
if (!newEntry || newEntry.multiplicity !== multiplicity) {
82+
// Remove the old value entirely
83+
result.push([[key, value], -multiplicity])
84+
this.#indexOut.addValue(key, [value, -multiplicity])
85+
}
86+
}
87+
88+
// Then, emit additions for new values that are not present in old or have changed
89+
for (const [valueKey, { value, multiplicity }] of newOutputMap) {
90+
const oldEntry = oldOutputMap.get(valueKey)
91+
if (!oldEntry || oldEntry.multiplicity !== multiplicity) {
92+
// Add the new value only if it has non-zero multiplicity
93+
if (multiplicity !== 0) {
94+
result.push([[key, value], multiplicity])
95+
this.#indexOut.addValue(key, [value, multiplicity])
96+
}
6897
}
6998
}
7099
}
71100

72101
if (result.length > 0) {
73102
this.output.sendData(new MultiSet(result))
74103
}
75-
this.#keysTodo.clear()
104+
105+
// Compact both indexes to consolidate values and remove zero-multiplicity entries
106+
// Only compact changed keys for efficiency
107+
this.#index.compact()
108+
this.#indexOut.compact()
76109
}
77110
}
78111

packages/d2mini/src/operators/topKWithFractionalIndex.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,11 @@ export class TopKWithFractionalIndexOperator<K, V1> extends UnaryOperator<
293293
if (result.length > 0) {
294294
this.output.sendData(new MultiSet(result))
295295
}
296+
297+
// Compact both indexes to consolidate values and remove zero-multiplicity entries
298+
// Only compact changed keys for efficiency
299+
this.#index.compact()
300+
this.#indexOut.compact()
296301
}
297302
}
298303

0 commit comments

Comments
 (0)