Skip to content

Commit 47e33ab

Browse files
authored
Revert to old index implementation (#76)
* Revert back to map implementation of index * Modify index tests to no longer assert uncompacted form * Remove obsolete comments * changeset * Refactor reduce * Update changeset
1 parent 8718ae5 commit 47e33ab

File tree

7 files changed

+182
-141
lines changed

7 files changed

+182
-141
lines changed

.changeset/evil-pianos-hear.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@electric-sql/d2mini': patch
3+
---
4+
5+
Modify index implementation to keep a map of consolidated values and their multiplicities. This improves efficiency to get a value's multiplicity since it's already precomputed. Also modify reduce operator to emit a single diff instead of 2 diffs (1 that is -oldMultiplicity and 1 that is +newMultiplicity).

packages/d2mini/src/indexes.ts

Lines changed: 48 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,32 @@ import { DefaultMap, hash } from './utils.js'
77
* exploit the key-value structure of the data to run efficiently.
88
*/
99
export class Index<K, V> {
10-
#inner: DefaultMap<K, [V, number][]>
11-
#changedKeys: Set<K>
10+
#inner: DefaultMap<K, DefaultMap<string, [V, number]>>
1211

1312
constructor() {
14-
this.#inner = new DefaultMap<K, [V, number][]>(() => [])
15-
this.#changedKeys = new Set<K>()
13+
this.#inner = new DefaultMap<K, DefaultMap<string, [V, number]>>(
14+
() =>
15+
new DefaultMap<string, [V, number]>(() => [undefined as any as V, 0]),
16+
)
17+
// #inner is as map of:
18+
// {
19+
// [key]: {
20+
// [hash(value)]: [value, multiplicity]
21+
// }
22+
// }
1623
}
1724

1825
toString(indent = false): string {
1926
return `Index(${JSON.stringify(
20-
[...this.#inner],
27+
[...this.#inner].map(([k, valueMap]) => [k, [...valueMap]]),
2128
undefined,
2229
indent ? ' ' : undefined,
2330
)})`
2431
}
2532

2633
get(key: K): [V, number][] {
27-
return this.#inner.get(key)
34+
const valueMap = this.#inner.get(key)
35+
return [...valueMap.values()]
2836
}
2937

3038
entries() {
@@ -36,78 +44,44 @@ export class Index<K, V> {
3644
}
3745

3846
has(key: K): boolean {
39-
return this.#inner.has(key) && this.#inner.get(key).length > 0
47+
return this.#inner.has(key)
4048
}
4149

4250
get size(): number {
43-
let count = 0
44-
for (const [, values] of this.#inner.entries()) {
45-
if (values.length > 0) {
46-
count++
47-
}
48-
}
49-
return count
51+
return this.#inner.size
5052
}
5153

5254
addValue(key: K, value: [V, number]): void {
53-
const values = this.#inner.get(key)
54-
values.push(value)
55-
this.#changedKeys.add(key)
56-
}
57-
58-
append(other: Index<K, V>): void {
59-
for (const [key, otherValues] of other.entries()) {
60-
const thisValues = this.#inner.get(key)
61-
for (const value of otherValues) {
62-
thisValues.push(value)
63-
}
64-
this.#changedKeys.add(key)
65-
}
66-
}
67-
68-
compact(keys: K[] = []): void {
69-
// If no keys specified, use the changed keys
70-
const keysToProcess = keys.length === 0 ? [...this.#changedKeys] : keys
71-
72-
for (const key of keysToProcess) {
73-
if (!this.#inner.has(key)) continue
74-
75-
const values = this.#inner.get(key)
76-
const consolidated = this.consolidateValues(values)
77-
78-
// Remove the key entirely and re-add only if there are non-zero values
79-
this.#inner.delete(key)
80-
if (consolidated.length > 0) {
81-
this.#inner.get(key).push(...consolidated)
82-
}
83-
}
84-
85-
// Clear the changed keys after compaction
86-
if (keys.length === 0) {
87-
this.#changedKeys.clear()
88-
} else {
89-
// Only remove the keys that were explicitly compacted
90-
for (const key of keys) {
91-
this.#changedKeys.delete(key)
55+
const [val, multiplicity] = value
56+
const valueMap = this.#inner.get(key)
57+
const valueHash = hash(val)
58+
const [, existingMultiplicity] = valueMap.get(valueHash)
59+
const newMultiplicity = existingMultiplicity + multiplicity
60+
if (multiplicity !== 0) {
61+
if (newMultiplicity === 0) {
62+
valueMap.delete(valueHash)
63+
} else {
64+
valueMap.set(valueHash, [val, newMultiplicity])
9265
}
9366
}
9467
}
9568

96-
private consolidateValues(values: [V, number][]): [V, number][] {
97-
const consolidated = new Map<string, { value: V; multiplicity: number }>()
98-
99-
for (const [value, multiplicity] of values) {
100-
const valueHash = hash(value)
101-
if (consolidated.has(valueHash)) {
102-
consolidated.get(valueHash)!.multiplicity += multiplicity
103-
} else {
104-
consolidated.set(valueHash, { value, multiplicity })
69+
append(other: Index<K, V>): void {
70+
for (const [key, otherValueMap] of other.entries()) {
71+
const thisValueMap = this.#inner.get(key)
72+
for (const [
73+
valueHash,
74+
[value, multiplicity],
75+
] of otherValueMap.entries()) {
76+
const [, existingMultiplicity] = thisValueMap.get(valueHash)
77+
const newMultiplicity = existingMultiplicity + multiplicity
78+
if (newMultiplicity === 0) {
79+
thisValueMap.delete(valueHash)
80+
} else {
81+
thisValueMap.set(valueHash, [value, newMultiplicity])
82+
}
10583
}
10684
}
107-
108-
return [...consolidated.values()]
109-
.filter(({ multiplicity }) => multiplicity !== 0)
110-
.map(({ value, multiplicity }) => [value, multiplicity])
11185
}
11286

11387
join<V2>(other: Index<K, V2>): MultiSet<[K, [V, V2]]> {
@@ -116,23 +90,23 @@ export class Index<K, V> {
11690
// We want to iterate over the smaller of the two indexes to reduce the
11791
// number of operations we need to do.
11892
if (this.size <= other.size) {
119-
for (const [key, values1] of this.entries()) {
93+
for (const [key, valueMap] of this.entries()) {
12094
if (!other.has(key)) continue
121-
const values2 = other.get(key)
122-
for (const [val1, mul1] of values1) {
123-
for (const [val2, mul2] of values2) {
95+
const otherValues = other.get(key)
96+
for (const [val1, mul1] of valueMap.values()) {
97+
for (const [val2, mul2] of otherValues) {
12498
if (mul1 !== 0 && mul2 !== 0) {
12599
result.push([[key, [val1, val2]], mul1 * mul2])
126100
}
127101
}
128102
}
129103
}
130104
} else {
131-
for (const [key, values2] of other.entries()) {
105+
for (const [key, otherValueMap] of other.entries()) {
132106
if (!this.has(key)) continue
133-
const values1 = this.get(key)
134-
for (const [val2, mul2] of values2) {
135-
for (const [val1, mul1] of values1) {
107+
const values = this.get(key)
108+
for (const [val2, mul2] of otherValueMap.values()) {
109+
for (const [val1, mul1] of values) {
136110
if (mul1 !== 0 && mul2 !== 0) {
137111
result.push([[key, [val1, val2]], mul1 * mul2])
138112
}

packages/d2mini/src/operators/join.ts

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,13 +77,6 @@ export class JoinOperator<K, V1, V2> extends BinaryOperator<
7777

7878
// Append deltaB to indexB
7979
this.#indexB.append(deltaB)
80-
81-
// Compact both indexes to consolidate values and remove zero-multiplicity entries
82-
// Only compact changed keys for efficiency
83-
deltaA.compact()
84-
deltaB.compact()
85-
this.#indexA.compact()
86-
this.#indexB.compact()
8780
}
8881
}
8982

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
import { IStreamBuilder, KeyValue } from '../types.js'
2+
import {
3+
DifferenceStreamReader,
4+
DifferenceStreamWriter,
5+
UnaryOperator,
6+
} from '../graph.js'
7+
import { StreamBuilder } from '../d2.js'
8+
import { MultiSet } from '../multiset.js'
9+
import { Index } from '../indexes.js'
10+
import { hash } from '../utils.js'
11+
12+
/**
13+
* Base operator for reduction operations (version-free)
14+
*/
15+
export class ReduceOperator<K, V1, V2> extends UnaryOperator<[K, V1], [K, V2]> {
16+
#index = new Index<K, V1>()
17+
#indexOut = new Index<K, V2>()
18+
#f: (values: [V1, number][]) => [V2, number][]
19+
20+
constructor(
21+
id: number,
22+
inputA: DifferenceStreamReader<[K, V1]>,
23+
output: DifferenceStreamWriter<[K, V2]>,
24+
f: (values: [V1, number][]) => [V2, number][],
25+
) {
26+
super(id, inputA, output)
27+
this.#f = f
28+
}
29+
30+
run(): void {
31+
const keysTodo = new Set<K>()
32+
33+
// Collect all input messages and update the index
34+
for (const message of this.inputMessages()) {
35+
for (const [item, multiplicity] of message.getInner()) {
36+
const [key, value] = item
37+
this.#index.addValue(key, [value, multiplicity])
38+
keysTodo.add(key)
39+
}
40+
}
41+
42+
// For each key, compute the reduction and delta
43+
const result: [[K, V2], number][] = []
44+
for (const key of keysTodo) {
45+
const curr = this.#index.get(key)
46+
const currOut = this.#indexOut.get(key)
47+
const out = this.#f(curr)
48+
49+
// Calculate delta between current and previous output
50+
const delta = new Map<string, number>()
51+
const values = new Map<string, V2>()
52+
for (const [value, multiplicity] of out) {
53+
const valueKey = hash(value)
54+
values.set(valueKey, value)
55+
delta.set(valueKey, (delta.get(valueKey) || 0) + multiplicity)
56+
}
57+
for (const [value, multiplicity] of currOut) {
58+
const valueKey = hash(value)
59+
values.set(valueKey, value)
60+
delta.set(valueKey, (delta.get(valueKey) || 0) - multiplicity)
61+
}
62+
63+
// Add non-zero deltas to result
64+
for (const [valueKey, multiplicity] of delta) {
65+
const value = values.get(valueKey)!
66+
if (multiplicity !== 0) {
67+
result.push([[key, value], multiplicity])
68+
this.#indexOut.addValue(key, [value, multiplicity])
69+
}
70+
}
71+
}
72+
73+
if (result.length > 0) {
74+
this.output.sendData(new MultiSet(result))
75+
}
76+
}
77+
}
78+
79+
/**
80+
* Reduces the elements in the stream by key (version-free)
81+
*/
82+
export function reduce<
83+
K extends T extends KeyValue<infer K, infer _V> ? K : never,
84+
V1 extends T extends KeyValue<K, infer V> ? V : never,
85+
R,
86+
T,
87+
>(f: (values: [V1, number][]) => [R, number][]) {
88+
return (stream: IStreamBuilder<T>): IStreamBuilder<KeyValue<K, R>> => {
89+
const output = new StreamBuilder<KeyValue<K, R>>(
90+
stream.graph,
91+
new DifferenceStreamWriter<KeyValue<K, R>>(),
92+
)
93+
const operator = new ReduceOperator<K, V1, R>(
94+
stream.graph.getNextOperatorId(),
95+
stream.connectReader() as DifferenceStreamReader<KeyValue<K, V1>>,
96+
output.writer,
97+
f,
98+
)
99+
stream.graph.addOperator(operator)
100+
stream.graph.addStream(output.connectReader())
101+
return output
102+
}
103+
}

packages/d2mini/src/operators/reduce.ts

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -75,37 +75,50 @@ export class ReduceOperator<K, V1, V2> extends UnaryOperator<[K, V1], [K, V2]> {
7575
}
7676
}
7777

78-
// First, emit removals for old values that are no longer present or have changed
78+
const commonKeys = new Set<string>()
79+
80+
// First, emit removals for old values that are no longer present
7981
for (const [valueKey, { value, multiplicity }] of oldOutputMap) {
8082
const newEntry = newOutputMap.get(valueKey)
81-
if (!newEntry || newEntry.multiplicity !== multiplicity) {
83+
if (!newEntry) {
8284
// Remove the old value entirely
8385
result.push([[key, value], -multiplicity])
8486
this.#indexOut.addValue(key, [value, -multiplicity])
87+
} else {
88+
commonKeys.add(valueKey)
8589
}
8690
}
8791

88-
// Then, emit additions for new values that are not present in old or have changed
92+
// Then, emit additions for new values that are not present in old
8993
for (const [valueKey, { value, multiplicity }] of newOutputMap) {
9094
const oldEntry = oldOutputMap.get(valueKey)
91-
if (!oldEntry || oldEntry.multiplicity !== multiplicity) {
95+
if (!oldEntry) {
9296
// Add the new value only if it has non-zero multiplicity
9397
if (multiplicity !== 0) {
9498
result.push([[key, value], multiplicity])
9599
this.#indexOut.addValue(key, [value, multiplicity])
96100
}
101+
} else {
102+
commonKeys.add(valueKey)
103+
}
104+
}
105+
106+
// Then, emit multiplicity changes for values that were present and are still present
107+
for (const valueKey of commonKeys) {
108+
const newEntry = newOutputMap.get(valueKey)
109+
const oldEntry = oldOutputMap.get(valueKey)
110+
const delta = newEntry!.multiplicity - oldEntry!.multiplicity
111+
// Only emit actual changes, i.e. non-zero deltas
112+
if (delta !== 0) {
113+
result.push([[key, newEntry!.value], delta])
114+
this.#indexOut.addValue(key, [newEntry!.value, delta])
97115
}
98116
}
99117
}
100118

101119
if (result.length > 0) {
102120
this.output.sendData(new MultiSet(result))
103121
}
104-
105-
// Compact both indexes to consolidate values and remove zero-multiplicity entries
106-
// Only compact changed keys for efficiency
107-
this.#index.compact()
108-
this.#indexOut.compact()
109122
}
110123
}
111124

packages/d2mini/src/operators/topKWithFractionalIndex.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -293,11 +293,6 @@ export class TopKWithFractionalIndexOperator<K, V1> extends UnaryOperator<
293293
if (result.length > 0) {
294294
this.output.sendData(new MultiSet(result))
295295
}
296-
297-
// Compact both indexes to consolidate values and remove zero-multiplicity entries
298-
// Only compact changed keys for efficiency
299-
this.#index.compact()
300-
this.#indexOut.compact()
301296
}
302297
}
303298

0 commit comments

Comments
 (0)