Skip to content

Conversation

@samwillis
Copy link
Contributor

D2Mini is a minimal implementation of the D2TS dataflow graph library but simplified and without the complexities of multi-dimensional versioning.

The API is almost identical to D2TS, but without the need to specify a version when sending data, or to send a frontier to mark the end of a version.

Basic Usage

Here's a simple example that demonstrates the core concepts:

import { D2, map, filter, debug, MultiSet, v } from '@electric-sql/d2ts'

// Create a new D2 graph
const graph = new D2()

// Create an input stream
// We can specify the type of the input stream, here we are using number.
const input = graph.newInput<number>()

// Build a simple pipeline that:
// 1. Takes numbers as input
// 2. Adds 5 to each number
// 3. Filters to keep only even numbers
// Pipelines can have multiple inputs and outputs.
const output = input.pipe(
  map((x) => x + 5),
  filter((x) => x % 2 === 0),
  debug('output'),
)

// Finalize the pipeline, after this point we can no longer add operators or
// inputs
graph.finalize()

// Send some data
// Data is sent as a MultiSet, which is a map of values to their multiplicity
// Here we are sending 3 numbers (1-3), each with a multiplicity of 1
// The key thing to understand is that the MultiSet represents a *change* to
// the data, not the data itself. "Inserts" and "Deletes" are represented as
// an element with a multiplicity of 1 or -1 respectively.
input.sendData(
  new MultiSet([
    [1, 1],
    [2, 1],
    [3, 1],
  ]),
)

// Process the data
graph.run()

// Output will show:
// 6 (from 1 + 5)
// 8 (from 3 + 5)

Copy link
Contributor

@KyleAMathews KyleAMathews left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:shipit:

The mini as far as concepts is clear — I'm curious how much code this shaves off?

@samwillis
Copy link
Contributor Author

@KyleAMathews

samwillis@Sams-MacBook-Air d2ts % diff -r packages/d2ts/src/ packages/d2mini/src | diffstat | tail -n 1
 29 files changed, 343 insertions(+), 799 deletions(-)
samwillis@Sams-MacBook-Air d2ts % diff -r packages/d2ts/tests/ packages/d2mini/tests | diffstat | tail -n 1 
 41 files changed, 488 insertions(+), 758 deletions(-)

-456 lines from the code and -270 from the tests

The largest changes are in the index and the join and reduce operators.

@kevin-dp
Copy link
Contributor

I think we still want to explicitly tag updates with versions and send frontiers indicating stable versions. Versions are needed for operators to coordinate data that they receive from multiple inputs. For example, say that we are joining two tables. And we perform a transaction on the database that touches both tables. The transaction may execute several inserts/updates/deletes so the transaction results in multiple changes that will be streamed. We will want to send all those changes through the D2 graph tagged with the version that corresponds to the transaction in the DB (i.e. the LSN). Thus we need to version for 2 reasons: 1) such that we can batch updates in a given version, and 2) as a consistency mechanism to avoid combining data from different transactions.

More concretely, for join, we want the join to only output data when a version is stable. Without versions and frontiers, join can't know to which version/transaction changes belong and it may break transactional guarantees. Say that we executed a transaction in the DB which results in several changes. Then join would output data for each change but we only want it to output the joined result after all changes of the transaction were processed. Otherwise users may observe weird intermediate states.

@samwillis samwillis merged commit 080c0f4 into main Jun 17, 2025
1 check passed
@samwillis samwillis deleted the samwillis/d2mini branch June 17, 2025 09:14
cursor bot pushed a commit to samwillis/d2ts that referenced this pull request Jul 13, 2025
* clone d2ts to d2mini with a few omissions

* Stateless operators done

* version-index done

* join and consolidate done

* reduce

* count and distinct

* topK

* graph test

* orderBy tests

* filterBy

* groupby

* rename test files

* rename some stuff

* remove itterate from multiset

* use hash

* format

* tidy

* fix tests

* update package.json

* changeset

* improvments to index

* refactor index

* hash return string - forward compat with 128bit hash

* remove unused code
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants