Skip to content

Commit a37172b

Browse files
committed
Update readme
1 parent 432e3ab commit a37172b

File tree

1 file changed

+169
-28
lines changed

1 file changed

+169
-28
lines changed

README.md

Lines changed: 169 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -9,25 +9,25 @@ Differential dataflow is a powerful data-parallel programming framework that ena
99
- Core differential dataflow operators (map, filter, join, reduce, etc.)
1010
- Support for iterative computations
1111
- Incremental updates with partially ordered versions
12-
- Optional SQLite backend for state management and reusability
12+
- Optional SQLite backend for state management and restartability
1313

1414
## Key Features
1515

1616
- **Incremental Processing**: Efficiently process changes to input data without recomputing everything
1717
- **Rich Operators**: Supports common operations with a pipeline API:
1818
- `concat()`: Concatenate two streams
19-
- `consolidate()`: Consolidates the elements in the stream
19+
- `consolidate()`: Consolidates the elements in the stream at each version
2020
- `count()`: Count elements by key
2121
- `distinct()`: Remove duplicates
2222
- `filter()`: Filter elements based on predicates
2323
- `iterate()`: Perform iterative computations
24-
- `join()`: Join two collections
25-
- `map()`: Transform elements
24+
- `join()`: Join two streams
25+
- `map()`: Transform elements in a stream
2626
- `reduce()`: Aggregate values by key
27-
- `output()`: Output the results of the stream
28-
- `pipe()`: Build a pipeline of operators
27+
- `output()`: Output the messages of the stream
28+
- `pipe()`: Build a pipeline of operators enabling reuse of combinations of operators
2929
- **SQLite Integration**: Optional SQLite backend for managing operator state
30-
- **Type Safety**: Full TypeScript type safety and inference
30+
- **Type Safety**: Full TypeScript type safety and inference through the pipeline API
3131

3232
## Quick Start
3333

@@ -42,13 +42,10 @@ npm install {TODO}
4242
Here's a simple example that demonstrates the core concepts:
4343

4444
```typescript
45-
import { D2 } from 'differential-dataflow-ts'
46-
import { map, filter, debug } from 'differential-dataflow-ts/operators'
47-
import { MultiSet } from 'differential-dataflow-ts/multiset'
48-
import { v } from 'differential-dataflow-ts/order'
45+
import { D2, map, filter, debug, MultiSet, v } from 'd2ts'
4946

5047
// Create a new D2 graph with initial frontier
51-
const graph = new D2({ initialFrontier: v([0, 0]) })
48+
const graph = new D2({ initialFrontier: 0 })
5249

5350
// Create an input stream
5451
const input = graph.newInput<number>()
@@ -67,12 +64,12 @@ const output = input.pipe(
6764
graph.finalize()
6865

6966
// Send some data
70-
input.sendData(v([0, 0]), new MultiSet([
67+
input.sendData(0), new MultiSet([
7168
[1, 1],
7269
[2, 1],
7370
[3, 1]
7471
]))
75-
input.sendFrontier(v([0, 1]))
72+
input.sendFrontier(1)
7673

7774
// Process the data
7875
graph.run()
@@ -82,6 +79,164 @@ graph.run()
8279
// 8 (from 3 + 5)
8380
```
8481

82+
### Operators
83+
84+
#### `concat(other: IStreamBuilder<T>)`
85+
86+
Concatenates two input streams
87+
88+
```typescript
89+
const output = input.pipe(
90+
concat(other)
91+
)
92+
```
93+
94+
#### `consolidate()`
95+
96+
Consolidates the elements in the stream at each version, essentially it ensures the output stream is at the latest known *complete* version.
97+
98+
```typescript
99+
const output = input.pipe(
100+
consolidate()
101+
)
102+
```
103+
104+
#### `count()`
105+
106+
Counts the number of elements in the stream by key
107+
108+
```typescript
109+
const output = input.pipe(
110+
map((data) => [data.somethingToKeyOn, data]),
111+
count()
112+
)
113+
```
114+
115+
#### `debug(name: string)`
116+
117+
Logs the messages of the stream to the console, the name is used to identify the stream in the logs.
118+
119+
```typescript
120+
const output = input.pipe(
121+
debug('output')
122+
)
123+
```
124+
125+
#### `distinct()`
126+
127+
Removes duplicate values from the stream
128+
129+
```typescript
130+
const output = input.pipe(
131+
distinct()
132+
)
133+
```
134+
135+
#### `filter(predicate: (data: T) => boolean)`
136+
137+
Filters the stream based on a predicate
138+
139+
```typescript
140+
const output = input.pipe(
141+
filter(x => x % 2 === 0)
142+
)
143+
```
144+
145+
#### `iterate(f: (data: T) => T, initial: T)`
146+
147+
Performs an iterative computation on the stream
148+
149+
TODO: Explain and add example
150+
151+
#### `join(other: IStreamBuilder<T>)`
152+
153+
Joins two keyed streams, the output stream will contain the elements of the two streams combined, with the key of the element from the left stream.
154+
155+
This is an inner join, so only elements with matching keys will be included in the output.
156+
157+
```typescript
158+
const input = graph.newInput<{ key: string, value: number }>()
159+
const other = graph.newInput<{ key: string, value: string }>()
160+
161+
const output = input.pipe(
162+
join(other)
163+
)
164+
```
165+
166+
TODO: Add links to other joins when we have them
167+
168+
#### `map(f: (data: T) => T)`
169+
170+
Transforms the elements of the stream using a function
171+
172+
```typescript
173+
const output = input.pipe(
174+
map(x => x + 5)
175+
)
176+
```
177+
178+
#### `output(messageHandler: (message: Message<T>) => void)`
179+
180+
Outputs the messages of the stream
181+
182+
TODO: expand on the Message type and how it works
183+
184+
```typescript
185+
const output = input.pipe(
186+
output((message) => {
187+
console.log(message)
188+
})
189+
)
190+
```
191+
192+
#### `pipe(operator: (stream: IStreamBuilder<T>) => IStreamBuilder<T>)`
193+
194+
Pipes the stream through a series of operators
195+
196+
```typescript
197+
const composedPipeline = pipe(
198+
map(x => x + 5),
199+
filter(x => x % 2 === 0),
200+
debug('output')
201+
)
202+
203+
const output = input.pipe(
204+
composedPipeline
205+
)
206+
207+
// Or as a function
208+
209+
const myPipe = (a: number, b: number) => pipe(
210+
map(x => x + a),
211+
filter(x => x % b === 0),
212+
debug('output')
213+
)
214+
215+
const output = input.pipe(
216+
myPipe(5, 2)
217+
)
218+
```
219+
220+
#### `reduce(f: (values: [T, multiplicity: number][]) => [R, multiplicity: number][])`
221+
222+
Performs a reduce operation on the stream grouped by key.
223+
224+
The function `f` takes an array of values and their multiplicities and returns an array of the result and their multiplicities.
225+
226+
```typescript
227+
// Count the number of elements in the stream by key
228+
const output = input.pipe(
229+
map((data) => [data.somethingToKeyOn, data]),
230+
reduce((values) => values.map(([value, multiplicity]) => {
231+
let count = 0
232+
for (const [num, multiplicity] of values) {
233+
count += num * multiplicity
234+
}
235+
return [[count, 1]]
236+
}))
237+
)
238+
```
239+
85240
### Using SQLite Backend
86241

87242
For persistence and larger datasets, a number of operators are provided that persist to SQLite:
@@ -95,20 +250,6 @@ For persistence and larger datasets, a number of operators are provided that per
95250

96251
Each take a SQLite database as the final argument.
97252

98-
### Key Concepts
99-
100-
1. **Versions**: Each piece of data has an associated version (timestamp)
101-
2. **MultiSets**: Collections that track element counts (can be negative for deletions)
102-
3. **Frontiers**: Track progress of computation through version space
103-
4. **Incremental Updates**: Only recompute what's necessary when data changes
104-
105-
See the `examples/` directory for more complex scenarios including:
106-
107-
- Joins between datasets
108-
- Iterative computations
109-
- Graph processing
110-
- Real-time updates
111-
112253
## Implementation Details
113254

114255
The implementation is based on the the one outlined in the [Materialize blog post](https://materialize.com/blog/differential-from-scratch/), with some TypeScript-specific adaptations, along with using a pipeline rather than builder api pattern.

0 commit comments

Comments
 (0)