Skip to content

Commit 080c0f4

Browse files
authored
d2mini - d2ts without the multi-dimensional versioning (#65)
* clone d2ts to d2mini with a few omissions * Stateless operators done * version-index done * join and consolidate done * reduce * count and distinct * topK * graph test * orderBy tests * filterBy * groupby * rename test files * rename some stuff * remove itterate from multiset * use hash * format * tidy * fix tests * update package.json * changeset * improvments to index * refactor index * hash return string - forward compat with 128bit hash * remove unused code
1 parent 56c1211 commit 080c0f4

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+10202
-0
lines changed

.changeset/silly-ads-scream.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@electric-sql/d2mini': patch
3+
---
4+
5+
First release of D2mini - a minimal implementation of the D2TS Differential Dataflow library but simplified and without the complexities of multi-dimensional versioning.

packages/d2mini/README.md

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# D2Mini is a minimal IVM implementation based on Differential Dataflow
2+
3+
D2Mini is a minimal implementation of the D2TS dataflow graph library but simplified and without the complexities of multi-dimensional versioning.
4+
5+
The API is almost identical to D2TS, but without the need to specify a version when sending data, or to send a frontier to mark the end of a version.
6+
7+
### Basic Usage
8+
9+
Here's a simple example that demonstrates the core concepts:
10+
11+
```typescript
12+
import { D2, map, filter, debug, MultiSet, v } from '@electric-sql/d2ts'
13+
14+
// Create a new D2 graph
15+
const graph = new D2()
16+
17+
// Create an input stream
18+
// We can specify the type of the input stream, here we are using number.
19+
const input = graph.newInput<number>()
20+
21+
// Build a simple pipeline that:
22+
// 1. Takes numbers as input
23+
// 2. Adds 5 to each number
24+
// 3. Filters to keep only even numbers
25+
// Pipelines can have multiple inputs and outputs.
26+
const output = input.pipe(
27+
map((x) => x + 5),
28+
filter((x) => x % 2 === 0),
29+
debug('output'),
30+
)
31+
32+
// Finalize the pipeline, after this point we can no longer add operators or
33+
// inputs
34+
graph.finalize()
35+
36+
// Send some data
37+
// Data is sent as a MultiSet, which is a map of values to their multiplicity
38+
// Here we are sending 3 numbers (1-3), each with a multiplicity of 1
39+
// The key thing to understand is that the MultiSet represents a *change* to
40+
// the data, not the data itself. "Inserts" and "Deletes" are represented as
41+
// an element with a multiplicity of 1 or -1 respectively.
42+
input.sendData(
43+
new MultiSet([
44+
[1, 1],
45+
[2, 1],
46+
[3, 1],
47+
]),
48+
)
49+
50+
// Process the data
51+
graph.run()
52+
53+
// Output will show:
54+
// 6 (from 1 + 5)
55+
// 8 (from 3 + 5)
56+
```

packages/d2mini/eslint.config.mjs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import baseConfig from '../../eslint.base.mjs';
2+
3+
export default [
4+
...baseConfig,
5+
{
6+
files: ['**/*.ts'],
7+
ignores: ['**/dist/**', '**/node_modules/**'],
8+
rules: {
9+
// Package-specific rules can go here
10+
},
11+
},
12+
{
13+
files: ['**/tests/**/*.ts'],
14+
rules: {
15+
'@typescript-eslint/no-unused-vars': 'off',
16+
},
17+
},
18+
];

packages/d2mini/package.json

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
{
2+
"name": "@electric-sql/d2mini",
3+
"version": "0.1.0",
4+
"license": "Apache-2.0",
5+
"description": "D2Mini is a minimal implementation of Differential Dataflow for performing in-memory incremental view maintenance.",
6+
"author": "Electric DB Limited",
7+
"homepage": "https://github.com/electric-sql/d2ts",
8+
"repository": {
9+
"type": "git",
10+
"url": "https://github.com/electric-sql/d2ts.git",
11+
"directory": "packages/d2mini"
12+
},
13+
"keywords": [
14+
"differential dataflow",
15+
"differential",
16+
"dataflow",
17+
"stream processing",
18+
"sync"
19+
],
20+
"type": "module",
21+
"main": "dist/index.js",
22+
"types": "dist/index.d.ts",
23+
"exports": {
24+
".": {
25+
"types": "./dist/index.d.ts",
26+
"default": "./dist/index.js"
27+
}
28+
},
29+
"scripts": {
30+
"build": "tsc",
31+
"test": "vitest run",
32+
"lint": "eslint './src/**/*.ts' './tests/**/*.ts' --config eslint.config.mjs",
33+
"lint:fix": "eslint src/**/*.ts tests/**/*.ts --fix",
34+
"typecheck": "tsc --noEmit",
35+
"format": "prettier --write \"src/**/*.{ts,tsx,js,jsx,json}\" \"tests/**/*.{ts,tsx,js,jsx,json}\"",
36+
"format:check": "prettier --check \"src/**/*.{ts,tsx,js,jsx,json}\" \"tests/**/*.{ts,tsx,js,jsx,json}\""
37+
},
38+
"devDependencies": {
39+
"@types/murmurhash-js": "^1.0.6",
40+
"@types/node": "^22.15.3",
41+
"@typescript-eslint/eslint-plugin": "^8.31.1",
42+
"@typescript-eslint/parser": "^8.31.1",
43+
"eslint": "^9.25.1",
44+
"eslint-config-prettier": "^10.1.2",
45+
"eslint-plugin-prettier": "^5.2.6",
46+
"prettier": "^3.5.3",
47+
"tsx": "^4.19.4",
48+
"typescript": "^5.8.3",
49+
"vitest": "^3.1.2"
50+
},
51+
"dependencies": {
52+
"fractional-indexing": "^3.2.0",
53+
"murmurhash-js": "^1.0.0"
54+
}
55+
}

packages/d2mini/src/d2.ts

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
import {
2+
BinaryOperator,
3+
DifferenceStreamWriter,
4+
UnaryOperator,
5+
} from './graph.js'
6+
import { DifferenceStreamReader } from './graph.js'
7+
import { MultiSetArray, MultiSet } from './multiset.js'
8+
import { PipedOperator, IStreamBuilder, ID2 } from './types.js'
9+
10+
export class D2 implements ID2 {
11+
#streams: DifferenceStreamReader<any>[] = []
12+
#operators: (UnaryOperator<any> | BinaryOperator<any>)[] = []
13+
#nextOperatorId = 0
14+
#finalized = false
15+
16+
constructor() {}
17+
18+
#checkNotFinalized(): void {
19+
if (this.#finalized) {
20+
throw new Error('Graph already finalized')
21+
}
22+
}
23+
24+
getNextOperatorId(): number {
25+
this.#checkNotFinalized()
26+
return this.#nextOperatorId++
27+
}
28+
29+
newInput<T>(): RootStreamBuilder<T> {
30+
this.#checkNotFinalized()
31+
const writer = new DifferenceStreamWriter<T>()
32+
// Use the root stream builder that exposes the sendData and sendFrontier methods
33+
const streamBuilder = new RootStreamBuilder<T>(this, writer)
34+
this.#streams.push(streamBuilder.connectReader())
35+
return streamBuilder
36+
}
37+
38+
addOperator(operator: UnaryOperator<any> | BinaryOperator<any>): void {
39+
this.#checkNotFinalized()
40+
this.#operators.push(operator)
41+
}
42+
43+
addStream(stream: DifferenceStreamReader<any>): void {
44+
this.#checkNotFinalized()
45+
this.#streams.push(stream)
46+
}
47+
48+
finalize() {
49+
this.#checkNotFinalized()
50+
this.#finalized = true
51+
}
52+
53+
step(): void {
54+
if (!this.#finalized) {
55+
throw new Error('Graph not finalized')
56+
}
57+
for (const op of this.#operators) {
58+
op.run()
59+
}
60+
}
61+
62+
pendingWork(): boolean {
63+
return this.#operators.some((op) => op.hasPendingWork())
64+
}
65+
66+
run(): void {
67+
while (this.pendingWork()) {
68+
this.step()
69+
}
70+
}
71+
}
72+
73+
export class StreamBuilder<T> implements IStreamBuilder<T> {
74+
#graph: ID2
75+
#writer: DifferenceStreamWriter<T>
76+
77+
constructor(graph: ID2, writer: DifferenceStreamWriter<T>) {
78+
this.#graph = graph
79+
this.#writer = writer
80+
}
81+
82+
connectReader(): DifferenceStreamReader<T> {
83+
return this.#writer.newReader()
84+
}
85+
86+
get writer(): DifferenceStreamWriter<T> {
87+
return this.#writer
88+
}
89+
90+
get graph(): ID2 {
91+
return this.#graph
92+
}
93+
94+
// Don't judge, this is the only way to type this function.
95+
// rxjs has very similar code to type its pipe function
96+
// https://github.com/ReactiveX/rxjs/blob/master/packages/rxjs/src/internal/util/pipe.ts
97+
// We go to 20 operators deep, because surly that's enough for anyone...
98+
// A user can always split the pipe into multiple pipes to get around this.
99+
pipe<O>(o1: PipedOperator<T, O>): IStreamBuilder<O>
100+
// prettier-ignore
101+
pipe<T2, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, O>): IStreamBuilder<O>
102+
// prettier-ignore
103+
pipe<T2, T3, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, O>): IStreamBuilder<O>
104+
// prettier-ignore
105+
pipe<T2, T3, T4, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, O>): IStreamBuilder<O>
106+
// prettier-ignore
107+
pipe<T2, T3, T4, T5, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, T5>, o5: PipedOperator<T5, O>): IStreamBuilder<O>
108+
// prettier-ignore
109+
pipe<T2, T3, T4, T5, T6, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, T5>, o5: PipedOperator<T5, T6>, o6: PipedOperator<T6, O>): IStreamBuilder<O>
110+
// prettier-ignore
111+
pipe<T2, T3, T4, T5, T6, T7, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, T5>, o5: PipedOperator<T5, T6>, o6: PipedOperator<T6, T7>, o7: PipedOperator<T7, O>): IStreamBuilder<O>
112+
// prettier-ignore
113+
pipe<T2, T3, T4, T5, T6, T7, T8, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, T5>, o5: PipedOperator<T5, T6>, o6: PipedOperator<T6, T7>, o7: PipedOperator<T7, T8>, o8: PipedOperator<T8, O>): IStreamBuilder<O>
114+
// prettier-ignore
115+
pipe<T2, T3, T4, T5, T6, T7, T8, T9, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, T5>, o5: PipedOperator<T5, T6>, o6: PipedOperator<T6, T7>, o7: PipedOperator<T7, T8>, o8: PipedOperator<T8, T9>, o9: PipedOperator<T9, O>): IStreamBuilder<O>
116+
// prettier-ignore
117+
pipe<T2, T3, T4, T5, T6, T7, T8, T9, T10, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, T5>, o5: PipedOperator<T5, T6>, o6: PipedOperator<T6, T7>, o7: PipedOperator<T7, T8>, o8: PipedOperator<T8, T9>, o9: PipedOperator<T9, T10>, o10: PipedOperator<T10, O>): IStreamBuilder<O>
118+
// prettier-ignore
119+
pipe<T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, T5>, o5: PipedOperator<T5, T6>, o6: PipedOperator<T6, T7>, o7: PipedOperator<T7, T8>, o8: PipedOperator<T8, T9>, o9: PipedOperator<T9, T10>, o10: PipedOperator<T10, T11>, o11: PipedOperator<T11, O>): IStreamBuilder<O>
120+
// prettier-ignore
121+
pipe<T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, T5>, o5: PipedOperator<T5, T6>, o6: PipedOperator<T6, T7>, o7: PipedOperator<T7, T8>, o8: PipedOperator<T8, T9>, o9: PipedOperator<T9, T10>, o10: PipedOperator<T10, T11>, o11: PipedOperator<T11, T12>, o12: PipedOperator<T12, O>): IStreamBuilder<O>
122+
// prettier-ignore
123+
pipe<T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, T5>, o5: PipedOperator<T5, T6>, o6: PipedOperator<T6, T7>, o7: PipedOperator<T7, T8>, o8: PipedOperator<T8, T9>, o9: PipedOperator<T9, T10>, o10: PipedOperator<T10, T11>, o11: PipedOperator<T11, T12>, o12: PipedOperator<T12, T13>, o13: PipedOperator<T13, O>): IStreamBuilder<O>
124+
// prettier-ignore
125+
pipe<T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, T5>, o5: PipedOperator<T5, T6>, o6: PipedOperator<T6, T7>, o7: PipedOperator<T7, T8>, o8: PipedOperator<T8, T9>, o9: PipedOperator<T9, T10>, o10: PipedOperator<T10, T11>, o11: PipedOperator<T11, T12>, o12: PipedOperator<T12, T13>, o13: PipedOperator<T13, T14>, o14: PipedOperator<T14, O>): IStreamBuilder<O>
126+
// prettier-ignore
127+
pipe<T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, T5>, o5: PipedOperator<T5, T6>, o6: PipedOperator<T6, T7>, o7: PipedOperator<T7, T8>, o8: PipedOperator<T8, T9>, o9: PipedOperator<T9, T10>, o10: PipedOperator<T10, T11>, o11: PipedOperator<T11, T12>, o12: PipedOperator<T12, T13>, o13: PipedOperator<T13, T14>, o14: PipedOperator<T14, T15>, o15: PipedOperator<T15, O>): IStreamBuilder<O>
128+
// prettier-ignore
129+
pipe<T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, T5>, o5: PipedOperator<T5, T6>, o6: PipedOperator<T6, T7>, o7: PipedOperator<T7, T8>, o8: PipedOperator<T8, T9>, o9: PipedOperator<T9, T10>, o10: PipedOperator<T10, T11>, o11: PipedOperator<T11, T12>, o12: PipedOperator<T12, T13>, o13: PipedOperator<T13, T14>, o14: PipedOperator<T14, T15>, o15: PipedOperator<T15, T16>, o16: PipedOperator<T16, O>): IStreamBuilder<O>
130+
// prettier-ignore
131+
pipe<T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, T5>, o5: PipedOperator<T5, T6>, o6: PipedOperator<T6, T7>, o7: PipedOperator<T7, T8>, o8: PipedOperator<T8, T9>, o9: PipedOperator<T9, T10>, o10: PipedOperator<T10, T11>, o11: PipedOperator<T11, T12>, o12: PipedOperator<T12, T13>, o13: PipedOperator<T13, T14>, o14: PipedOperator<T14, T15>, o15: PipedOperator<T15, T16>, o16: PipedOperator<T16, T17>, o17: PipedOperator<T17, O>): IStreamBuilder<O>
132+
// prettier-ignore
133+
pipe<T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, T5>, o5: PipedOperator<T5, T6>, o6: PipedOperator<T6, T7>, o7: PipedOperator<T7, T8>, o8: PipedOperator<T8, T9>, o9: PipedOperator<T9, T10>, o10: PipedOperator<T10, T11>, o11: PipedOperator<T11, T12>, o12: PipedOperator<T12, T13>, o13: PipedOperator<T13, T14>, o14: PipedOperator<T14, T15>, o15: PipedOperator<T15, T16>, o16: PipedOperator<T16, T17>, o17: PipedOperator<T17, T18>, o18: PipedOperator<T18, O>): IStreamBuilder<O>
134+
// prettier-ignore
135+
pipe<T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, T5>, o5: PipedOperator<T5, T6>, o6: PipedOperator<T6, T7>, o7: PipedOperator<T7, T8>, o8: PipedOperator<T8, T9>, o9: PipedOperator<T9, T10>, o10: PipedOperator<T10, T11>, o11: PipedOperator<T11, T12>, o12: PipedOperator<T12, T13>, o13: PipedOperator<T13, T14>, o14: PipedOperator<T14, T15>, o15: PipedOperator<T15, T16>, o16: PipedOperator<T16, T17>, o17: PipedOperator<T17, T18>, o18: PipedOperator<T18, T19>, o19: PipedOperator<T19, O>): IStreamBuilder<O>
136+
// prettier-ignore
137+
pipe<T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, O>(o1: PipedOperator<T, T2>, o2: PipedOperator<T2, T3>, o3: PipedOperator<T3, T4>, o4: PipedOperator<T4, T5>, o5: PipedOperator<T5, T6>, o6: PipedOperator<T6, T7>, o7: PipedOperator<T7, T8>, o8: PipedOperator<T8, T9>, o9: PipedOperator<T9, T10>, o10: PipedOperator<T10, T11>, o11: PipedOperator<T11, T12>, o12: PipedOperator<T12, T13>, o13: PipedOperator<T13, T14>, o14: PipedOperator<T14, T15>, o15: PipedOperator<T15, T16>, o16: PipedOperator<T16, T17>, o17: PipedOperator<T17, T18>, o18: PipedOperator<T18, T19>, o19: PipedOperator<T19, T20>, o20: PipedOperator<T20, O>): IStreamBuilder<O>
138+
139+
pipe(...operators: PipedOperator<any, any>[]): IStreamBuilder<any> {
140+
return operators.reduce((stream, operator) => {
141+
return operator(stream)
142+
}, this as IStreamBuilder<any>)
143+
}
144+
}
145+
146+
export class RootStreamBuilder<T> extends StreamBuilder<T> {
147+
sendData(collection: MultiSet<T> | MultiSetArray<T>): void {
148+
this.writer.sendData(collection)
149+
}
150+
}

0 commit comments

Comments
 (0)