Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 183 additions & 40 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,30 +1,70 @@
# D2TS - Differential Dataflow in TypeScript
<p align="center">
<a href="https://electric-sql.com" target="_blank">
<picture>
<source media="(prefers-color-scheme: dark)"
srcset="https://raw.githubusercontent.com/electric-sql/meta/main/identity/ElectricSQL-logo-next.svg"
/>
<source media="(prefers-color-scheme: light)"
srcset="https://raw.githubusercontent.com/electric-sql/meta/main/identity/ElectricSQL-logo-black.svg"
/>
<img alt="ElectricSQL logo"
src="https://raw.githubusercontent.com/electric-sql/meta/main/identity/ElectricSQL-logo-black.svg"
/>
</picture>
</a>
</p>

<p align="center">
<a href="https://github.com/electric-sql/d2ts/actions"><img src="https://github.com/electric-sql/d2ts/actions/workflows/ci.yml/badge.svg"></a>
<a href="https://github.com/electric-sql/d2ts/blob/main/LICENSE"><img src="https://img.shields.io/badge/license-Apache_2.0-green" alt="License - Apache 2.0"></a>
<a href="https://github.com/electric-sql/d2ts/
ext/milestones"><img src="https://img.shields.io/badge/status-alpha-orange" alt="Status - Alpha"></a>
<a href="https://discord.electric-sql.com"><img src="https://img.shields.io/discord/933657521581858818?color=5969EA&label=discord" alt="Chat - Discord"></a>
<a href="https://x.com/ElectricSQL" target="_blank"><img src="https://img.shields.io/twitter/follow/ElectricSQL.svg?style=social&label=Follow @ElectricSQL"></a>
</p>

# D2TS - Differential Dataflow in TypeScript <!-- omit in toc -->

D2TS is a TypeScript implementation of [differential dataflow](https://github.com/MaterializeInc/differential) - a powerful data-parallel programming framework that enables incremental computations over changing input data.

You can use D2TS to build data pipelines that can be executed incrementally, meaning you can process data as it comes in, and only recompute the parts that have changed. This could be as simple as remapping data, or as complex as performing a full join combining two datasources where one is a computed aggregate.

D2TS can be used in conjunction with [ElectricSQL](https://electric-sql.com) to build data pipelines on top os "Shape Streams" that can be executed incrementally.
D2TS can be used in conjunction with [ElectricSQL](https://electric-sql.com) to build data pipelines on top of [ShapeStreams](https://electric-sql.com/docs/api/clients/typescript#shapestream) that can be executed incrementally.

A D2TS pipe is also fully type safe, inferring the types at each step of the pipeline, and supports auto-complete in your IDE.
A D2TS pipeline is also fully type safe, inferring the types at each step of the pipeline, and supports auto-complete in your IDE.

## Table of Contents

- [Key Features](#key-features)
- [Quick Start](#quick-start)
- [Examples](#examples)
- [API](#api)
- [D2 Graph Construction](#d2-graph-construction)
- [Input Streams](#input-streams)
- [Versions and Frontiers](#versions-and-frontiers)
- [MultiSet as a changeset](#multiset-as-a-changeset)
- [Operators](#operators)
- [Using SQLite Backend](#using-sqlite-backend)
- [Implementation Details](#implementation-details)
- [References](#references)

## Key Features

- **Incremental Processing**: Efficiently process changes to input data without recomputing everything
- **Rich Operators**: Supports common operations with a pipeline API:
- `buffer()`: Buffer and emit versions when they are complete
- `concat()`: Concatenate two streams
- `consolidate()`: Consolidates the elements in the stream at each version
- `count()`: Count elements by key
- `distinct()`: Remove duplicates
- `filter()`: Filter elements based on predicates
- `iterate()`: Perform iterative computations
- `join()`: Join two streams
- `map()`: Transform elements in a stream
- `reduce()`: Aggregate values by key
- `output()`: Output the messages of the stream
- `pipe()`: Build a pipeline of operators enabling reuse of combinations of operators
- **SQLite Integration**: Optional SQLite backend for managing operator state
- [`buffer`](#buffer): Buffer and emit versions when they are complete
- [`concat`](#concat): Concatenate two streams
- [`consolidate`](#consolidate): Consolidates the elements in the stream at each version
- [`count`](#count): Count elements by key
- [`distinct`](#distinct): Remove duplicates
- [`filter`](#filter): Filter elements based on predicates
- [`iterate`](#iterate): Perform iterative computations
- [`join`](#join): Join two streams
- [`map`](#map): Transform elements in a stream
- [`reduce`](#reduce): Aggregate values by key
- [`output`](#output): Output the messages of the stream
- [`pipe`](#pipe): Build a pipeline of operators enabling reuse of combinations of operators
- **SQLite Integration**: Optional SQLite backend for persisting operator state allowing for larger datasets and resumable pipelines
- **Type Safety**: Full TypeScript type safety and inference through the pipeline API

## Quick Start
Expand Down Expand Up @@ -95,14 +135,88 @@ graph.run()
// 8 (from 3 + 5)
```

### MultiSet as a Change to a Collection
## Examples

There are a number of examples in the [./examples](./examples) directory, covering:

- [Basic usage](./examples/basic.ts) (map and filter)
- ["Fruit processed"](./examples/fruit-processed.ts) (reduce and consolidate)
- [Joins between two streams](./examples/join.ts)
- [Iterative computations](./examples/iterate.ts)
- [Modeling "includes" using joins](./examples/includes.ts)

## API

### D2 graph construction

```typescript
const graph = new D2({ initialFrontier: 0 })
```

The `D2` constructor takes an optional `options` object with the following properties:

- `initialFrontier`: The initial frontier of the graph, defaults to `0`

An instance of a D2 graph is used to build a dataflow graph, and has the following main methods:

- `newInput<T>(): IStreamBuilder<T>`: Create a new input stream
- `finalize(): void`: Finalize the graph, after this point no more operators or inputs can be added
- `run(): void`: Process all pending versions of the dataflow graph

### Input Streams

Input streams are created using the `newInput<T>()` method, and have the following methods:

- `sendData(version: Version | number | number[], data: MultiSet<T>): void`: Send data to the input stream
- `sendFrontier(version: Antichain | Version | number | number[]): void`: Send a frontier to the input stream

### Versions and Frontiers

Versions are used to represent the version of the data, and are a lattice of integers. For most use cases you will only need to provide a single integer version, and all apis that take a version will work with a single integer. More advanced use cases may require the use of the latice to track multidimensional versions.

Frontiers are used to represent the lower bound of the version of the data that may come in future, and are an antichain of versions. Again in most cases you can just use a single integer version to represent the frontier.

#### Version

There is a `Version` class that represents a version, the prefered way to create a version is using the `v` helper function as this ensures that you reuse the same object for the same version making equality checks and comparisons more efficient:

```typescript
const version = v(1)
```

Multidimensional versions are also supported, and are created using the `v` helper function:

```typescript
const version = v([1, 2])
```

In most cases you will only need to use a single integer version to represent the version which can be passed directly to the `sendData` and `sendFrontier` methods:

```typescript
input.sendData(1, new MultiSet([[1, 1]]))
```

#### Antichain (frontier)

An `Antichain` is a set of versions that are disjoint, it is used to represent the frontier of the data. An antichain can be created using the `Antichain` constructor:

```typescript
const frontier = new Antichain([v(1), v([2])])
```

In most cases you will only need to use a single integer version to represent the frontier and can be passed directly to the `sendFrontier` method:

```typescript
input.sendFrontier(1)
```

### MultiSet as a changeset

A `MultiSet` is a map of values to their multiplicity. It is used to represent the changes to a collection.

A MultiSet is created by passing an array of `[value, multiplicity]` pairs. Here we are creating a MultiSet with the values 1, 2, and 3, each with a multiplicity of 1:

```typescript
// A MultiSet is created by passing an array of [value, multiplicity] pairs
// Here we are creating a MultiSet with the values 1, 2, and 3, each with a
// multiplicity of 1
const multiSet = new MultiSet([
[1, 1],
[2, 1],
Expand Down Expand Up @@ -143,31 +257,39 @@ const multiSet = new MultiSet<[string, Comment]>([

### Operators

#### `buffer()`
#### buffer

`buffer()`

Buffers the elements of the stream, emitting a version when the buffer is complete.

```typescript
const output = input.pipe(buffer())
```

#### `concat(other: IStreamBuilder<T>)`
#### concat

`concat(other: IStreamBuilder<T>)`

Concatenates two input streams - the output stream will contain the elements of both streams.

```typescript
const output = input.pipe(concat(other))
```

#### `consolidate()`
#### consolidate

`consolidate()`

Consolidates the elements in the stream at each version, essentially it ensures the output stream is at the latest known _complete_ version.

```typescript
const output = input.pipe(consolidate())
```

#### `count()`
#### count

`count()`

Counts the number of elements in the stream by key

Expand All @@ -178,37 +300,47 @@ const output = input.pipe(
)
```

#### `debug(name: string)`
#### debug

`debug(name: string)`

Logs the messages of the stream to the console, the name is used to identify the stream in the logs.

```typescript
const output = input.pipe(debug('output'))
```

#### `distinct()`
#### distinct

`distinct()`

Removes duplicate values from the stream by key

```typescript
const output = input.pipe(distinct())
```

#### `filter(predicate: (data: T) => boolean)`
#### filter

`filter(predicate: (data: T) => boolean)`

Filters the stream based on a predicate

```typescript
const output = input.pipe(filter((x) => x % 2 === 0))
```

#### `iterate(f: (data: T) => T, initial: T)`
#### iterate

`iterate(f: (data: T) => T, initial: T)`

Performs an iterative computation on the stream

TODO: Explain and add example

#### `join(other: IStreamBuilder<T>)`
#### join

`join<(other: IStreamBuilder<T>)`

Joins two keyed streams, the output stream will contain the elements of the two streams combined, with the key of the element from the left stream.

Expand Down Expand Up @@ -250,15 +382,19 @@ const output = commentsByUser.pipe(
)
```

#### `map(f: (data: T) => T)`
#### map

`map<U>(f: (data: T) => U)`

Transforms the elements of the stream using a function

```typescript
const output = input.pipe(map((x) => x + 5))
```

#### `output(messageHandler: (message: Message<T>) => void)`
#### output

`output(messageHandler: (message: Message<T>) => void)`

Outputs the messages of the stream

Expand Down Expand Up @@ -303,7 +439,9 @@ A frontier message represents a new frontier, and has the following data payload
type FrontierMessage = Version | Antichain
```

#### `pipe(operator: (stream: IStreamBuilder<T>) => IStreamBuilder<T>)`
#### pipe

`pipe(operator: (stream: IStreamBuilder<T>) => IStreamBuilder<T>)`

Pipes the stream through a series of operators

Expand Down Expand Up @@ -335,7 +473,9 @@ const myPipe = (a: number, b: number) =>
const output = input.pipe(myPipe(5, 2))
```

#### `reduce(f: (values: [T, multiplicity: number][]) => [R, multiplicity: number][])`
#### reduce

`reduce(f: (values: [T, multiplicity: number][]) => [R, multiplicity: number][])`

Performs a reduce operation on the stream grouped by key.

Expand Down Expand Up @@ -376,16 +516,19 @@ For persistence and larger datasets, a number of operators are provided that per
- `map()`: Transforms elements
- `reduce()`: Aggregates values by key

Each take a SQLite database as the final argument.
Each take a SQLite database as the final argument, for example:

## Examples
```typescript
// Using better-sqlite3
const sqlite = new Database('./my_database.db')
const db = new BetterSQLite3Wrapper(sqlite)

const output = input.pipe(consolidate(db))
```

There are a number of examples in the [packages/d2ts/examples](./packages/d2ts/examples) directory, covering:
The operators will automatically create the necessary tables and indexes to store the state of the operators. It is advised to use the same database for all operators to ensure that the state is stored in a single location.

- Basic usage (map and filter)
- Joins between two streams
- Iterative computations
- Modeling "includes" using joins
The `BetterSQLite3Wrapper` is a wrapper around the `better-sqlite3` library that provides a unified interface for the operators. Other SQLite database drivers can be supported by implementing the `SQLiteDb` interface.

## Implementation Details

Expand Down
11 changes: 5 additions & 6 deletions packages/d2ts/examples/basic.ts → examples/basic.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { D2 } from '../src/index.js'
import { map, filter, debug } from '../src/operators/index.js'
import { D2, map, filter, debug } from '@electric-sql/d2ts'

// Create a new D2 graph with an initial frontier of 0
// This is the lower bound of the version space
Expand Down Expand Up @@ -27,15 +26,15 @@ graph.finalize()
// The graph will process the data and frontier updates in a single step
for (let i = 1; i <= 10; i++) {
// Sending a frontier is informing the graph what the new lower bound of the version
// space is *on that input*. Each input essentially can have its own lower bound.
// space is *on that input*. Each input essentially can have its own lower bound.
// These are then passed through all the operators
input.sendFrontier(i)

// Sending data to the graph
// The first param is the version of the data
// The second param is a MultiSetArray of *changes in the collection*, where the first
// element is the record and the second is the multiplicity. A positive multiplicity
// means that the record is added to the collection at that version. A negative
// The second param is a MultiSetArray of *changes in the collection*, where the first
// element is the record and the second is the multiplicity. A positive multiplicity
// means that the record is added to the collection at that version. A negative
// multiplicity means that the record is removed from the collection at that version.
input.sendData(i, [
[i + 1, 1],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { map, reduce, consolidate } from '../src/operators/index.js'
import { Store } from '../src/store.js'
import { map, reduce, consolidate } from '@electric-sql/d2ts'
import { Store } from '@electric-sql/d2ts/store'

type FruitOrder = {
name: string
Expand Down
Loading