Skip to content

Commit d6c4ed9

Browse files
kevin-dpsamwillis
andauthored
Efficient distinct operator (#80)
* Replace distinct operator atop reduce by a dedicated more efficient distinct operator * Add additional unit tests for distinct operator * Fix bug where distinct wasn't properly summing the multiplicites if an element occurs multiple times in the input stream * Extend distinct with optional argument to determine what to deduplicate by. * Formatting * Small change to test * changest --------- Co-authored-by: Sam Willis <[email protected]>
1 parent b64697c commit d6c4ed9

File tree

3 files changed

+165
-31
lines changed

3 files changed

+165
-31
lines changed

.changeset/blue-papayas-try.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@electric-sql/d2mini': patch
3+
---
4+
5+
rebuild the distinct to be more efficient

packages/d2mini/src/operators/distinct.ts

Lines changed: 75 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,98 @@
1-
import { IStreamBuilder, KeyValue } from '../types.js'
2-
import { DifferenceStreamReader, DifferenceStreamWriter } from '../graph.js'
1+
import { IStreamBuilder } from '../types.js'
2+
import {
3+
DifferenceStreamReader,
4+
DifferenceStreamWriter,
5+
UnaryOperator,
6+
} from '../graph.js'
37
import { StreamBuilder } from '../d2.js'
4-
import { ReduceOperator } from './reduce.js'
58
import { hash } from '../utils.js'
9+
import { MultiSet } from '../multiset.js'
10+
11+
type HashedValue = string
12+
type Multiplicity = number
613

714
/**
8-
* Operator that removes duplicates by key (version-free)
15+
* Operator that removes duplicates
916
*/
10-
export class DistinctOperator<K, V> extends ReduceOperator<K, V, V> {
17+
export class DistinctOperator<T> extends UnaryOperator<T> {
18+
#by: (value: T) => any
19+
#values: Map<HashedValue, Multiplicity> // keeps track of the number of times each value has been seen
20+
1121
constructor(
1222
id: number,
13-
inputA: DifferenceStreamReader<[K, V]>,
14-
output: DifferenceStreamWriter<[K, V]>,
23+
input: DifferenceStreamReader<T>,
24+
output: DifferenceStreamWriter<T>,
25+
by: (value: T) => any = (value: T) => value,
1526
) {
16-
const distinctInner = (vals: [V, number][]): [V, number][] => {
17-
const consolidated = new Map<string, number>()
18-
const values = new Map<string, V>()
19-
for (const [val, diff] of vals) {
20-
const key = hash(val)
21-
consolidated.set(key, (consolidated.get(key) || 0) + diff)
22-
values.set(key, val)
27+
super(id, input, output)
28+
this.#by = by
29+
this.#values = new Map()
30+
}
31+
32+
run(): void {
33+
const updatedValues = new Map<HashedValue, [Multiplicity, T]>()
34+
35+
// Compute the new multiplicity for each value
36+
for (const message of this.inputMessages()) {
37+
for (const [value, diff] of message.getInner()) {
38+
const hashedValue = hash(this.#by(value))
39+
40+
const oldMultiplicity =
41+
updatedValues.get(hashedValue)?.[0] ??
42+
this.#values.get(hashedValue) ??
43+
0
44+
const newMultiplicity = oldMultiplicity + diff
45+
46+
updatedValues.set(hashedValue, [newMultiplicity, value])
2347
}
24-
return Array.from(consolidated.entries())
25-
.filter(([_, count]) => count > 0)
26-
.map(([key, _]) => [values.get(key) as V, 1])
2748
}
2849

29-
super(id, inputA, output, distinctInner)
50+
const result: Array<[T, number]> = []
51+
52+
// Check which values became visible or disappeared
53+
for (const [
54+
hashedValue,
55+
[newMultiplicity, value],
56+
] of updatedValues.entries()) {
57+
const oldMultiplicity = this.#values.get(hashedValue) ?? 0
58+
59+
if (newMultiplicity === 0) {
60+
this.#values.delete(hashedValue)
61+
} else {
62+
this.#values.set(hashedValue, newMultiplicity)
63+
}
64+
65+
if (oldMultiplicity <= 0 && newMultiplicity > 0) {
66+
// The value wasn't present in the stream
67+
// but with this change it is now present in the stream
68+
result.push([value, 1])
69+
} else if (oldMultiplicity > 0 && newMultiplicity <= 0) {
70+
// The value was present in the stream
71+
// but with this change it is no longer present in the stream
72+
result.push([value, -1])
73+
}
74+
}
75+
76+
if (result.length > 0) {
77+
this.output.sendData(new MultiSet(result))
78+
}
3079
}
3180
}
3281

3382
/**
34-
* Removes duplicates by key (version-free)
83+
* Removes duplicate values
3584
*/
36-
export function distinct<
37-
K extends T extends KeyValue<infer K, infer _V> ? K : never,
38-
V extends T extends KeyValue<K, infer V> ? V : never,
39-
T,
40-
>() {
41-
return (stream: IStreamBuilder<T>): IStreamBuilder<KeyValue<K, V>> => {
42-
const output = new StreamBuilder<KeyValue<K, V>>(
85+
export function distinct<T>(by: (value: T) => any = (value: T) => value) {
86+
return (stream: IStreamBuilder<T>): IStreamBuilder<T> => {
87+
const output = new StreamBuilder<T>(
4388
stream.graph,
44-
new DifferenceStreamWriter<KeyValue<K, V>>(),
89+
new DifferenceStreamWriter<T>(),
4590
)
46-
const operator = new DistinctOperator<K, V>(
91+
const operator = new DistinctOperator<T>(
4792
stream.graph.getNextOperatorId(),
48-
stream.connectReader() as DifferenceStreamReader<KeyValue<K, V>>,
93+
stream.connectReader() as DifferenceStreamReader<T>,
4994
output.writer,
95+
by,
5096
)
5197
stream.graph.addOperator(operator)
5298
stream.graph.addStream(output.connectReader())

packages/d2mini/tests/operators/distinct.test.ts

Lines changed: 85 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { distinct } from '../../src/operators/distinct.js'
55
import { output } from '../../src/operators/output.js'
66

77
describe('Operators', () => {
8-
describe('Distinct operation', () => {
8+
describe('Efficient distinct operation', () => {
99
testDistinct()
1010
})
1111
})
@@ -45,6 +45,47 @@ function testDistinct() {
4545
])
4646
})
4747

48+
test('distinct by certain property', () => {
49+
const graph = new D2()
50+
const input = graph.newInput<[number, { name: string; country: string }]>()
51+
const messages: MultiSet<[number, { name: string; country: string }]>[] = []
52+
53+
input.pipe(
54+
distinct(([_, value]) => value.country),
55+
output((message) => {
56+
messages.push(message)
57+
}),
58+
)
59+
60+
graph.finalize()
61+
62+
input.sendData(
63+
new MultiSet([
64+
[[1, { name: 'Valter', country: 'Portugal' }], 1],
65+
[[2, { name: 'Sam', country: 'UK' }], 1],
66+
[[2, { name: 'Kevin', country: 'Belgium' }], 1],
67+
[[3, { name: 'Garry', country: 'UK' }], 1],
68+
[[4, { name: 'Kyle', country: 'USA' }], 1],
69+
]),
70+
)
71+
72+
graph.run()
73+
74+
const data = messages.map((m) => m.getInner())[0]
75+
const countries = data
76+
.map(([[_, value], multiplicity]) => [value.country, multiplicity])
77+
.sort()
78+
79+
expect(countries).toEqual(
80+
[
81+
['Belgium', 1],
82+
['Portugal', 1],
83+
['UK', 1],
84+
['USA', 1],
85+
].sort(),
86+
)
87+
})
88+
4889
test('distinct with updates', () => {
4990
const graph = new D2()
5091
const input = graph.newInput<[number, string]>()
@@ -63,18 +104,23 @@ function testDistinct() {
63104
new MultiSet([
64105
[[1, 'a'], 1],
65106
[[1, 'b'], 1],
107+
[[1, 'a'], 1],
66108
]),
67109
)
68110
graph.run()
69111

70112
input.sendData(
71113
new MultiSet([
72114
[[1, 'b'], -1],
73-
[[1, 'c'], 1],
115+
[[1, 'c'], 2],
116+
[[1, 'a'], -1],
74117
]),
75118
)
76119
graph.run()
77120

121+
input.sendData(new MultiSet([[[1, 'c'], -2]]))
122+
graph.run()
123+
78124
const data = messages.map((m) => m.getInner())
79125

80126
expect(data).toEqual([
@@ -86,6 +132,7 @@ function testDistinct() {
86132
[[1, 'b'], -1],
87133
[[1, 'c'], 1],
88134
],
135+
[[[1, 'c'], -1]],
89136
])
90137
})
91138

@@ -122,4 +169,40 @@ function testDistinct() {
122169
],
123170
])
124171
})
172+
173+
test('distinct with multiple batches of same key that cancel out', () => {
174+
const graph = new D2()
175+
const input = graph.newInput<[string, number]>()
176+
const messages: MultiSet<[string, number]>[] = []
177+
178+
input.pipe(
179+
distinct(),
180+
output((message) => {
181+
messages.push(message)
182+
}),
183+
)
184+
185+
graph.finalize()
186+
187+
input.sendData(
188+
new MultiSet([
189+
[['key1', 1], 2],
190+
[['key1', 2], 2],
191+
[['key1', 2], 1],
192+
[['key2', 1], 1],
193+
[['key1', 2], -3], // cancels out the previous addition of [['key2', 2], 3]
194+
[['key2', 1], 1],
195+
]),
196+
)
197+
graph.run()
198+
199+
const data = messages.map((m) => m.getInner())
200+
201+
expect(data).toEqual([
202+
[
203+
[['key1', 1], 1],
204+
[['key2', 1], 1],
205+
],
206+
])
207+
})
125208
}

0 commit comments

Comments
 (0)