diff --git a/README.md b/README.md index 336a9d9..409eb70 100644 --- a/README.md +++ b/README.md @@ -1,30 +1,70 @@ -# D2TS - Differential Dataflow in TypeScript +

+ + + + + ElectricSQL logo + + +

+ +

+ + License - Apache 2.0 + Status - Alpha + Chat - Discord + +

+ +# D2TS - Differential Dataflow in TypeScript D2TS is a TypeScript implementation of [differential dataflow](https://github.com/MaterializeInc/differential) - a powerful data-parallel programming framework that enables incremental computations over changing input data. You can use D2TS to build data pipelines that can be executed incrementally, meaning you can process data as it comes in, and only recompute the parts that have changed. This could be as simple as remapping data, or as complex as performing a full join combining two datasources where one is a computed aggregate. -D2TS can be used in conjunction with [ElectricSQL](https://electric-sql.com) to build data pipelines on top os "Shape Streams" that can be executed incrementally. +D2TS can be used in conjunction with [ElectricSQL](https://electric-sql.com) to build data pipelines on top of [ShapeStreams](https://electric-sql.com/docs/api/clients/typescript#shapestream) that can be executed incrementally. -A D2TS pipe is also fully type safe, inferring the types at each step of the pipeline, and supports auto-complete in your IDE. +A D2TS pipeline is also fully type safe, inferring the types at each step of the pipeline, and supports auto-complete in your IDE. + +## Table of Contents + +- [Key Features](#key-features) +- [Quick Start](#quick-start) +- [Examples](#examples) +- [API](#api) + - [D2 Graph Construction](#d2-graph-construction) + - [Input Streams](#input-streams) + - [Versions and Frontiers](#versions-and-frontiers) + - [MultiSet as a changeset](#multiset-as-a-changeset) + - [Operators](#operators) + - [Using SQLite Backend](#using-sqlite-backend) +- [Implementation Details](#implementation-details) +- [References](#references) ## Key Features - **Incremental Processing**: Efficiently process changes to input data without recomputing everything - **Rich Operators**: Supports common operations with a pipeline API: - - `buffer()`: Buffer and emit versions when they are complete - - `concat()`: Concatenate two streams - - `consolidate()`: Consolidates the elements in the stream at each version - - `count()`: Count elements by key - - `distinct()`: Remove duplicates - - `filter()`: Filter elements based on predicates - - `iterate()`: Perform iterative computations - - `join()`: Join two streams - - `map()`: Transform elements in a stream - - `reduce()`: Aggregate values by key - - `output()`: Output the messages of the stream - - `pipe()`: Build a pipeline of operators enabling reuse of combinations of operators -- **SQLite Integration**: Optional SQLite backend for managing operator state + - [`buffer`](#buffer): Buffer and emit versions when they are complete + - [`concat`](#concat): Concatenate two streams + - [`consolidate`](#consolidate): Consolidates the elements in the stream at each version + - [`count`](#count): Count elements by key + - [`distinct`](#distinct): Remove duplicates + - [`filter`](#filter): Filter elements based on predicates + - [`iterate`](#iterate): Perform iterative computations + - [`join`](#join): Join two streams + - [`map`](#map): Transform elements in a stream + - [`reduce`](#reduce): Aggregate values by key + - [`output`](#output): Output the messages of the stream + - [`pipe`](#pipe): Build a pipeline of operators enabling reuse of combinations of operators +- **SQLite Integration**: Optional SQLite backend for persisting operator state allowing for larger datasets and resumable pipelines - **Type Safety**: Full TypeScript type safety and inference through the pipeline API ## Quick Start @@ -95,14 +135,88 @@ graph.run() // 8 (from 3 + 5) ``` -### MultiSet as a Change to a Collection +## Examples + +There are a number of examples in the [./examples](./examples) directory, covering: + +- [Basic usage](./examples/basic.ts) (map and filter) +- ["Fruit processed"](./examples/fruit-processed.ts) (reduce and consolidate) +- [Joins between two streams](./examples/join.ts) +- [Iterative computations](./examples/iterate.ts) +- [Modeling "includes" using joins](./examples/includes.ts) + +## API + +### D2 graph construction + +```typescript +const graph = new D2({ initialFrontier: 0 }) +``` + +The `D2` constructor takes an optional `options` object with the following properties: + +- `initialFrontier`: The initial frontier of the graph, defaults to `0` + +An instance of a D2 graph is used to build a dataflow graph, and has the following main methods: + +- `newInput(): IStreamBuilder`: Create a new input stream +- `finalize(): void`: Finalize the graph, after this point no more operators or inputs can be added +- `run(): void`: Process all pending versions of the dataflow graph + +### Input Streams + +Input streams are created using the `newInput()` method, and have the following methods: + +- `sendData(version: Version | number | number[], data: MultiSet): void`: Send data to the input stream +- `sendFrontier(version: Antichain | Version | number | number[]): void`: Send a frontier to the input stream + +### Versions and Frontiers + +Versions are used to represent the version of the data, and are a lattice of integers. For most use cases you will only need to provide a single integer version, and all apis that take a version will work with a single integer. More advanced use cases may require the use of the latice to track multidimensional versions. + +Frontiers are used to represent the lower bound of the version of the data that may come in future, and are an antichain of versions. Again in most cases you can just use a single integer version to represent the frontier. + +#### Version + +There is a `Version` class that represents a version, the prefered way to create a version is using the `v` helper function as this ensures that you reuse the same object for the same version making equality checks and comparisons more efficient: + +```typescript +const version = v(1) +``` + +Multidimensional versions are also supported, and are created using the `v` helper function: + +```typescript +const version = v([1, 2]) +``` + +In most cases you will only need to use a single integer version to represent the version which can be passed directly to the `sendData` and `sendFrontier` methods: + +```typescript +input.sendData(1, new MultiSet([[1, 1]])) +``` + +#### Antichain (frontier) + +An `Antichain` is a set of versions that are disjoint, it is used to represent the frontier of the data. An antichain can be created using the `Antichain` constructor: + +```typescript +const frontier = new Antichain([v(1), v([2])]) +``` + +In most cases you will only need to use a single integer version to represent the frontier and can be passed directly to the `sendFrontier` method: + +```typescript +input.sendFrontier(1) +``` + +### MultiSet as a changeset A `MultiSet` is a map of values to their multiplicity. It is used to represent the changes to a collection. +A MultiSet is created by passing an array of `[value, multiplicity]` pairs. Here we are creating a MultiSet with the values 1, 2, and 3, each with a multiplicity of 1: + ```typescript -// A MultiSet is created by passing an array of [value, multiplicity] pairs -// Here we are creating a MultiSet with the values 1, 2, and 3, each with a -// multiplicity of 1 const multiSet = new MultiSet([ [1, 1], [2, 1], @@ -143,7 +257,9 @@ const multiSet = new MultiSet<[string, Comment]>([ ### Operators -#### `buffer()` +#### buffer + +`buffer()` Buffers the elements of the stream, emitting a version when the buffer is complete. @@ -151,7 +267,9 @@ Buffers the elements of the stream, emitting a version when the buffer is comple const output = input.pipe(buffer()) ``` -#### `concat(other: IStreamBuilder)` +#### concat + +`concat(other: IStreamBuilder)` Concatenates two input streams - the output stream will contain the elements of both streams. @@ -159,7 +277,9 @@ Concatenates two input streams - the output stream will contain the elements of const output = input.pipe(concat(other)) ``` -#### `consolidate()` +#### consolidate + +`consolidate()` Consolidates the elements in the stream at each version, essentially it ensures the output stream is at the latest known _complete_ version. @@ -167,7 +287,9 @@ Consolidates the elements in the stream at each version, essentially it ensures const output = input.pipe(consolidate()) ``` -#### `count()` +#### count + +`count()` Counts the number of elements in the stream by key @@ -178,7 +300,9 @@ const output = input.pipe( ) ``` -#### `debug(name: string)` +#### debug + +`debug(name: string)` Logs the messages of the stream to the console, the name is used to identify the stream in the logs. @@ -186,7 +310,9 @@ Logs the messages of the stream to the console, the name is used to identify the const output = input.pipe(debug('output')) ``` -#### `distinct()` +#### distinct + +`distinct()` Removes duplicate values from the stream by key @@ -194,7 +320,9 @@ Removes duplicate values from the stream by key const output = input.pipe(distinct()) ``` -#### `filter(predicate: (data: T) => boolean)` +#### filter + +`filter(predicate: (data: T) => boolean)` Filters the stream based on a predicate @@ -202,13 +330,17 @@ Filters the stream based on a predicate const output = input.pipe(filter((x) => x % 2 === 0)) ``` -#### `iterate(f: (data: T) => T, initial: T)` +#### iterate + +`iterate(f: (data: T) => T, initial: T)` Performs an iterative computation on the stream TODO: Explain and add example -#### `join(other: IStreamBuilder)` +#### join + +`join<(other: IStreamBuilder)` Joins two keyed streams, the output stream will contain the elements of the two streams combined, with the key of the element from the left stream. @@ -250,7 +382,9 @@ const output = commentsByUser.pipe( ) ``` -#### `map(f: (data: T) => T)` +#### map + +`map(f: (data: T) => U)` Transforms the elements of the stream using a function @@ -258,7 +392,9 @@ Transforms the elements of the stream using a function const output = input.pipe(map((x) => x + 5)) ``` -#### `output(messageHandler: (message: Message) => void)` +#### output + +`output(messageHandler: (message: Message) => void)` Outputs the messages of the stream @@ -303,7 +439,9 @@ A frontier message represents a new frontier, and has the following data payload type FrontierMessage = Version | Antichain ``` -#### `pipe(operator: (stream: IStreamBuilder) => IStreamBuilder)` +#### pipe + +`pipe(operator: (stream: IStreamBuilder) => IStreamBuilder)` Pipes the stream through a series of operators @@ -335,7 +473,9 @@ const myPipe = (a: number, b: number) => const output = input.pipe(myPipe(5, 2)) ``` -#### `reduce(f: (values: [T, multiplicity: number][]) => [R, multiplicity: number][])` +#### reduce + +`reduce(f: (values: [T, multiplicity: number][]) => [R, multiplicity: number][])` Performs a reduce operation on the stream grouped by key. @@ -376,16 +516,19 @@ For persistence and larger datasets, a number of operators are provided that per - `map()`: Transforms elements - `reduce()`: Aggregates values by key -Each take a SQLite database as the final argument. +Each take a SQLite database as the final argument, for example: -## Examples +```typescript +// Using better-sqlite3 +const sqlite = new Database('./my_database.db') +const db = new BetterSQLite3Wrapper(sqlite) + +const output = input.pipe(consolidate(db)) +``` -There are a number of examples in the [packages/d2ts/examples](./packages/d2ts/examples) directory, covering: +The operators will automatically create the necessary tables and indexes to store the state of the operators. It is advised to use the same database for all operators to ensure that the state is stored in a single location. -- Basic usage (map and filter) -- Joins between two streams -- Iterative computations -- Modeling "includes" using joins +The `BetterSQLite3Wrapper` is a wrapper around the `better-sqlite3` library that provides a unified interface for the operators. Other SQLite database drivers can be supported by implementing the `SQLiteDb` interface. ## Implementation Details diff --git a/packages/d2ts/examples/basic.ts b/examples/basic.ts similarity index 89% rename from packages/d2ts/examples/basic.ts rename to examples/basic.ts index 8cd474f..0769606 100644 --- a/packages/d2ts/examples/basic.ts +++ b/examples/basic.ts @@ -1,5 +1,4 @@ -import { D2 } from '../src/index.js' -import { map, filter, debug } from '../src/operators/index.js' +import { D2, map, filter, debug } from '@electric-sql/d2ts' // Create a new D2 graph with an initial frontier of 0 // This is the lower bound of the version space @@ -27,15 +26,15 @@ graph.finalize() // The graph will process the data and frontier updates in a single step for (let i = 1; i <= 10; i++) { // Sending a frontier is informing the graph what the new lower bound of the version - // space is *on that input*. Each input essentially can have its own lower bound. + // space is *on that input*. Each input essentially can have its own lower bound. // These are then passed through all the operators input.sendFrontier(i) // Sending data to the graph // The first param is the version of the data - // The second param is a MultiSetArray of *changes in the collection*, where the first - // element is the record and the second is the multiplicity. A positive multiplicity - // means that the record is added to the collection at that version. A negative + // The second param is a MultiSetArray of *changes in the collection*, where the first + // element is the record and the second is the multiplicity. A positive multiplicity + // means that the record is added to the collection at that version. A negative // multiplicity means that the record is removed from the collection at that version. input.sendData(i, [ [i + 1, 1], diff --git a/packages/d2ts/examples/fruit-processed.ts b/examples/fruit-processed.ts similarity index 97% rename from packages/d2ts/examples/fruit-processed.ts rename to examples/fruit-processed.ts index d6bb8aa..54c20aa 100644 --- a/packages/d2ts/examples/fruit-processed.ts +++ b/examples/fruit-processed.ts @@ -1,5 +1,5 @@ -import { map, reduce, consolidate } from '../src/operators/index.js' -import { Store } from '../src/store.js' +import { map, reduce, consolidate } from '@electric-sql/d2ts' +import { Store } from '@electric-sql/d2ts/store' type FruitOrder = { name: string diff --git a/packages/d2ts/examples/includes.ts b/examples/includes.ts similarity index 77% rename from packages/d2ts/examples/includes.ts rename to examples/includes.ts index e252244..63d888c 100644 --- a/packages/d2ts/examples/includes.ts +++ b/examples/includes.ts @@ -1,6 +1,13 @@ -import { D2 } from '../src/index.js' -import { map, filter, join, concat, distinct, debug } from '../src/operators/index.js' -import { v } from '../src/order.js' +import { + D2, + map, + filter, + join, + concat, + distinct, + debug, + v, +} from '@electric-sql/d2ts' type Issue = { type: 'issue' @@ -31,46 +38,44 @@ const inputComments = graph.newInput<[number, Comment]>() // Transform comments into [issue_id, comment] pairs for joining const commentsByIssue = inputComments.pipe( - map(([id, comment]) => [comment.issue_id, comment] as [number, Comment]) + map(([id, comment]) => [comment.issue_id, comment] as [number, Comment]), ) // Issues for our project const issuesForProject = inputIssues.pipe( - filter(([id, issue]) => issue.project_id === 1) + filter(([id, issue]) => issue.project_id === 1), ) // Issues ids for joining with comments const issueIds = issuesForProject.pipe( - map(([id, issue]) => [issue.id, undefined] as [number, undefined]) + map(([id, issue]) => [issue.id, undefined] as [number, undefined]), ) // Join comments and map back to just the comment const commentsForProject = commentsByIssue.pipe( join(issueIds), - map(([id, [comment, _]]) => [comment.id, comment] as [number, Comment]) + map(([id, [comment, _]]) => [comment.id, comment] as [number, Comment]), ) // Users const usersIdsForIssues = issuesForProject.pipe( - map(([id, issue]) => [issue.owner_id, undefined] as [number, undefined]) + map(([id, issue]) => [issue.owner_id, undefined] as [number, undefined]), ) const usersIdsForComments = commentsForProject.pipe( - map(([id, comment]) => [comment.owner_id, undefined] as [number, undefined]) -) -const usersIds = usersIdsForIssues.pipe( - concat(usersIdsForComments) + map(([id, comment]) => [comment.owner_id, undefined] as [number, undefined]), ) +const usersIds = usersIdsForIssues.pipe(concat(usersIdsForComments)) const users = usersIds.pipe( join(inputUsers), map(([id, [_, user]]) => [id, user] as [number, User]), - distinct() + distinct(), ) // Concat comments and issues and output the result const output = commentsForProject.pipe( concat(issuesForProject), concat(users), - debug('output', true) + debug('output', true), ) graph.finalize() @@ -84,19 +89,64 @@ inputUsers.sendData(v([1, 0]), [ // Add some issues inputIssues.sendData(v([1, 0]), [ - [[1, { type: 'issue', id: 1, project_id: 1, title: 'Issue 1', owner_id: 1 }], 1], - [[2, { type: 'issue', id: 2, project_id: 2, title: 'Issue 2', owner_id: 2 }], 1], - [[3, { type: 'issue', id: 3, project_id: 1, title: 'Issue 3', owner_id: 3 }], 1], + [ + [1, { type: 'issue', id: 1, project_id: 1, title: 'Issue 1', owner_id: 1 }], + 1, + ], + [ + [2, { type: 'issue', id: 2, project_id: 2, title: 'Issue 2', owner_id: 2 }], + 1, + ], + [ + [3, { type: 'issue', id: 3, project_id: 1, title: 'Issue 3', owner_id: 3 }], + 1, + ], ]) // Add some comments inputComments.sendData(v([1, 0]), [ - [[1, { type: 'comment', id: 1, issue_id: 1, text: 'Comment 1', owner_id: 1 }], 1], - [[2, { type: 'comment', id: 2, issue_id: 1, text: 'Comment 2', owner_id: 3 }], 1], - [[3, { type: 'comment', id: 3, issue_id: 2, text: 'Comment 3', owner_id: 1 }], 1], - [[4, { type: 'comment', id: 4, issue_id: 2, text: 'Comment 4', owner_id: 3 }], 1], - [[5, { type: 'comment', id: 5, issue_id: 3, text: 'Comment 5', owner_id: 1 }], 1], - [[6, { type: 'comment', id: 6, issue_id: 3, text: 'Comment 6', owner_id: 3 }], 1], + [ + [ + 1, + { type: 'comment', id: 1, issue_id: 1, text: 'Comment 1', owner_id: 1 }, + ], + 1, + ], + [ + [ + 2, + { type: 'comment', id: 2, issue_id: 1, text: 'Comment 2', owner_id: 3 }, + ], + 1, + ], + [ + [ + 3, + { type: 'comment', id: 3, issue_id: 2, text: 'Comment 3', owner_id: 1 }, + ], + 1, + ], + [ + [ + 4, + { type: 'comment', id: 4, issue_id: 2, text: 'Comment 4', owner_id: 3 }, + ], + 1, + ], + [ + [ + 5, + { type: 'comment', id: 5, issue_id: 3, text: 'Comment 5', owner_id: 1 }, + ], + 1, + ], + [ + [ + 6, + { type: 'comment', id: 6, issue_id: 3, text: 'Comment 6', owner_id: 3 }, + ], + 1, + ], ]) // Send frontiers @@ -107,7 +157,13 @@ graph.run() // Add a new Comment to an issue in project 1 inputComments.sendData(v([2, 0]), [ - [[7, { type: 'comment', id: 7, issue_id: 1, text: 'Comment 7', owner_id: 1 }], 1], + [ + [ + 7, + { type: 'comment', id: 7, issue_id: 1, text: 'Comment 7', owner_id: 1 }, + ], + 1, + ], ]) inputUsers.sendFrontier(v([3, 0])) inputIssues.sendFrontier(v([3, 0])) @@ -116,7 +172,13 @@ graph.run() // Add a new Comment to an issue in project 2 inputComments.sendData(v([3, 0]), [ - [[8, { type: 'comment', id: 8, issue_id: 2, text: 'Comment 8', owner_id: 1 }], 1], + [ + [ + 8, + { type: 'comment', id: 8, issue_id: 2, text: 'Comment 8', owner_id: 1 }, + ], + 1, + ], ]) inputUsers.sendFrontier(v([4, 0])) inputIssues.sendFrontier(v([4, 0])) @@ -126,8 +188,14 @@ console.log('> Comment 8 should not be included in the output above') // Move issue 2 to project 1 inputIssues.sendData(v([4, 0]), [ - [[2, { type: 'issue', id: 2, project_id: 2, title: 'Issue 2', owner_id: 2 }], -1], - [[2, { type: 'issue', id: 2, project_id: 1, title: 'Issue 2', owner_id: 2 }], 1], + [ + [2, { type: 'issue', id: 2, project_id: 2, title: 'Issue 2', owner_id: 2 }], + -1, + ], + [ + [2, { type: 'issue', id: 2, project_id: 1, title: 'Issue 2', owner_id: 2 }], + 1, + ], ]) inputUsers.sendFrontier(v([5, 0])) inputIssues.sendFrontier(v([5, 0])) @@ -137,14 +205,22 @@ console.log('> Issue 2 and its comments should be included in the output above') // Move issue 2 back to project 2 inputIssues.sendData(v([5, 0]), [ - [[2, { type: 'issue', id: 2, project_id: 1, title: 'Issue 2', owner_id: 2 }], -1], - [[2, { type: 'issue', id: 2, project_id: 2, title: 'Issue 2', owner_id: 2 }], 1], + [ + [2, { type: 'issue', id: 2, project_id: 1, title: 'Issue 2', owner_id: 2 }], + -1, + ], + [ + [2, { type: 'issue', id: 2, project_id: 2, title: 'Issue 2', owner_id: 2 }], + 1, + ], ]) inputUsers.sendFrontier(v([6, 0])) inputIssues.sendFrontier(v([6, 0])) inputComments.sendFrontier(v([6, 0])) graph.run() -console.log('> Issue 2 and its comments should have a multiplicity of -1 in the output above') +console.log( + '> Issue 2 and its comments should have a multiplicity of -1 in the output above', +) /* Output looks like this: @@ -413,4 +489,4 @@ debug output data: version: Version([5,0]) collection: MultiSet([ debug output notification: frontier Antichain([[6,0]]) > Issue 2 and its comments should have a multiplicity of -1 in the output above -*/ \ No newline at end of file +*/ diff --git a/packages/d2ts/examples/iterate.ts b/examples/iterate.ts similarity index 83% rename from packages/d2ts/examples/iterate.ts rename to examples/iterate.ts index c7d7138..2c3a253 100644 --- a/packages/d2ts/examples/iterate.ts +++ b/examples/iterate.ts @@ -1,15 +1,16 @@ -import { D2 } from '../src/index.js' -import { MultiSet } from '../src/multiset.js' -import { Antichain, v } from '../src/order.js' import { - iterate, - debug, - map, + D2, + Antichain, + concat, consolidate, + debug, distinct, filter, - concat, -} from '../src/operators/index.js' + iterate, + map, + MultiSet, + v, +} from '@electric-sql/d2ts' const graph = new D2({ initialFrontier: new Antichain([v(0)]) }) diff --git a/packages/d2ts/examples/join.ts b/examples/join.ts similarity index 61% rename from packages/d2ts/examples/join.ts rename to examples/join.ts index 740cf87..50370a5 100644 --- a/packages/d2ts/examples/join.ts +++ b/examples/join.ts @@ -1,7 +1,4 @@ -import { D2 } from '../src/index.js' -import { map, join, distinct, debug } from '../src/operators/index.js' -import { v } from '../src/order.js' -import { parseArgs } from 'node:util' +import { D2, map, join, distinct, debug } from '@electric-sql/d2ts' type Issue = { id: number @@ -18,13 +15,13 @@ type User = { const issues: Issue[] = [ { id: 1, - title: "Fix login bug", - userId: 1 + title: 'Fix login bug', + userId: 1, }, { - id: 2, - title: "Add dark mode", - userId: 1 + id: 2, + title: 'Add dark mode', + userId: 1, }, { id: 3, @@ -71,29 +68,19 @@ const issues: Issue[] = [ const users: User[] = [ { id: 1, - name: "Alice Johnson" + name: 'Alice Johnson', }, { id: 2, - name: "Bob Smith" + name: 'Bob Smith', }, { id: 3, - name: "Carol Williams" - } + name: 'Carol Williams', + }, ] -const args = parseArgs({ - options: { - sqlite: { - type: 'boolean', - short: 's', - default: false - } - } -}) - -const graph = new D2({ +const graph = new D2({ initialFrontier: 0, }) @@ -103,14 +90,14 @@ const inputUsers = graph.newInput<[number, User]>() // Transform issues into [key, value] pairs for joining const issuesStream = inputIssues.pipe( // debug('issues_stream'), - map(([issueId, issue]) => [issue.userId, issue] as [number, Issue]) + map(([issueId, issue]) => [issue.userId, issue] as [number, Issue]), // debug('issues_stream_map') ) // Transform users into [key, value] pairs for joining const usersStream = inputUsers.pipe( // debug('users_stream'), - map(([userId, user]) => [userId, user] as [number, User]) + map(([userId, user]) => [userId, user] as [number, User]), // debug('users_stream_map') ) @@ -118,14 +105,17 @@ const usersStream = inputUsers.pipe( const joinedStream = issuesStream.pipe( join(usersStream), // debug('join'), - map(([_key, [issue, user]]) => ([issue.id, { - id: issue.id, - title: issue.title, - userName: user.name - }])), + map(([_key, [issue, user]]) => [ + issue.id, + { + id: issue.id, + title: issue.title, + userName: user.name, + }, + ]), debug('map', true), distinct(), - debug('distinct', true) + debug('distinct', true), ) graph.finalize() @@ -144,11 +134,19 @@ inputUsers.sendFrontier(2) graph.run() // Add a new issue -inputIssues.sendData(2, [[[11, { - id: 11, - title: 'New issue', - userId: 1, -}], 1]]) +inputIssues.sendData(2, [ + [ + [ + 11, + { + id: 11, + title: 'New issue', + userId: 1, + }, + ], + 1, + ], +]) inputIssues.sendFrontier(3) inputUsers.sendFrontier(3) @@ -156,11 +154,19 @@ inputUsers.sendFrontier(3) graph.run() // Delete an issue -inputIssues.sendData(3, [[[1, { - id: 1, - title: 'Fix login bug', - userId: 1, -}], -1]]) +inputIssues.sendData(3, [ + [ + [ + 1, + { + id: 1, + title: 'Fix login bug', + userId: 1, + }, + ], + -1, + ], +]) inputIssues.sendFrontier(4) inputUsers.sendFrontier(4) @@ -168,16 +174,32 @@ inputUsers.sendFrontier(4) graph.run() // Insert a new user and issue by the same user -inputUsers.sendData(4, [[[4, { - id: 4, - name: 'Dave Brown', -}], 1]]) - -inputIssues.sendData(4, [[[12, { - id: 12, - title: 'New issue 2', - userId: 4, -}], 1]]) +inputUsers.sendData(4, [ + [ + [ + 4, + { + id: 4, + name: 'Dave Brown', + }, + ], + 1, + ], +]) + +inputIssues.sendData(4, [ + [ + [ + 12, + { + id: 12, + title: 'New issue 2', + userId: 4, + }, + ], + 1, + ], +]) inputIssues.sendFrontier(5) inputUsers.sendFrontier(5) @@ -185,17 +207,33 @@ inputUsers.sendFrontier(5) graph.run() // Delete a user and their issues -inputUsers.sendData(5, [[[4, { - id: 4, - name: 'Dave Brown', -}], -1]]) -inputIssues.sendData(5, [[[12, { - id: 12, - title: 'New issue 2', - userId: 4, -}], -1]]) +inputUsers.sendData(5, [ + [ + [ + 4, + { + id: 4, + name: 'Dave Brown', + }, + ], + -1, + ], +]) +inputIssues.sendData(5, [ + [ + [ + 12, + { + id: 12, + title: 'New issue 2', + userId: 4, + }, + ], + -1, + ], +]) inputIssues.sendFrontier(6) inputUsers.sendFrontier(6) -graph.run() \ No newline at end of file +graph.run() diff --git a/package.json b/package.json index 440398c..a11327d 100644 --- a/package.json +++ b/package.json @@ -10,12 +10,15 @@ "bench": "pnpm --filter d2ts-benchmark bench" }, "devDependencies": { - "typescript": "^5.0.0", - "vitest": "^1.0.0", + "@electric-sql/d2ts": "workspace:*", "@eslint/js": "latest", + "@types/node": "^22.10.2", "@typescript-eslint/eslint-plugin": "^7.0.0", "@typescript-eslint/parser": "^7.0.0", "eslint": "^9.0.0", - "eslint-config-prettier": "^9.0.0" + "eslint-config-prettier": "^9.0.0", + "tsx": "^4.7.0", + "typescript": "^5.0.0", + "vitest": "^1.0.0" } } diff --git a/packages/d2ts-benchmark/package.json b/packages/d2ts-benchmark/package.json index 23a2653..bfcc2e2 100644 --- a/packages/d2ts-benchmark/package.json +++ b/packages/d2ts-benchmark/package.json @@ -1,5 +1,5 @@ { - "name": "d2ts-benchmark", + "name": "@electric-sql/d2ts-benchmark", "private": true, "version": "0.0.1", "license": "Apache-2.0", @@ -15,6 +15,6 @@ "typescript": "^5.0.0" }, "dependencies": { - "d2ts": "workspace:*" + "@electric-sql/d2ts": "workspace:*" } } \ No newline at end of file diff --git a/packages/d2ts-benchmark/src/index.ts b/packages/d2ts-benchmark/src/index.ts index e9f63eb..847f89f 100644 --- a/packages/d2ts-benchmark/src/index.ts +++ b/packages/d2ts-benchmark/src/index.ts @@ -1,7 +1,7 @@ import { Suite } from './base' -import { D2, MultiSet, map, join, filter, v, output } from 'd2ts' -import { join as joinSql } from 'd2ts/sqlite' -import { BetterSQLite3Wrapper } from 'd2ts/sqlite' +import { D2, MultiSet, map, join, filter, v, output } from '@electric-sql/d2ts' +import { join as joinSql } from '@electric-sql/d2ts/sqlite' +import { BetterSQLite3Wrapper } from '@electric-sql/d2ts/sqlite' import Database from 'better-sqlite3' // Sample data generation diff --git a/packages/d2ts/examples/basic-timing.ts b/packages/d2ts/examples/basic-timing.ts deleted file mode 100644 index 65c291d..0000000 --- a/packages/d2ts/examples/basic-timing.ts +++ /dev/null @@ -1,97 +0,0 @@ -import { D2 } from '../src/index.js' -import { map, filter, negate, concat, join, count } from '../src/operators/index.js' - -function one() { - console.log('=== one: map/filter/negate/concat ===') - - const graph = new D2({ initialFrontier: 0 }) - const input_a = graph.newInput() - - input_a - .pipe( - map((x) => x + 5), - filter((x) => x % 2 === 0), - concat(input_a.pipe(negate())) - // debug('output') // uncomment to debug - ) - - graph.finalize() - - for (let i = 0; i < 10; i++) { - input_a.sendFrontier(i) - input_a.sendData(i, [[i, 1]]) - graph.run() - } -} - -function two() { - console.log('=== two: join/count ===') - - const graph = new D2({ initialFrontier: 0 }) - const input_a = graph.newInput<[number, number]>() - const input_b = graph.newInput<[number, number]>() - - input_a - .pipe( - join(input_b), - count() - // debug('count') // uncomment to debug - ) - - graph.finalize() - - for (let i = 0; i < 2; i++) { - input_a.sendData(i + 1, [ - [[1, i], 2], - [[2, i], 2] - ]) - input_a.sendFrontier(i + 2) - - input_b.sendData(i, [ - [[1, i + 2], 2], - [[2, i + 3], 2] - ]) - input_b.sendFrontier(i) - graph.run() - } - - input_a.sendFrontier(11) - input_b.sendFrontier(11) - graph.run() -} - -const run = async () => { - console.time('one') - one() - console.timeEnd('one') - - console.time('two') - two() - console.timeEnd('two') -} - -/** - * After JS is JIT compiled, the first run is slower. - * use --multi-run to run multiple times (blocked on hitting enter) - */ -const main = async () => { - const multiRun = process.argv.includes('--multi-run') - - if (multiRun) { - while (true) { - run() - // Wait for enter key press before continuing - console.log('===') - console.log('Press enter to run again') - await new Promise((resolve) => { - process.stdin.once('data', () => { - resolve() - }) - }) - } - } else { - run() - } -} - -main() diff --git a/packages/d2ts/package.json b/packages/d2ts/package.json index f7e692e..4cbbe39 100644 --- a/packages/d2ts/package.json +++ b/packages/d2ts/package.json @@ -1,5 +1,5 @@ { - "name": "d2ts", + "name": "@electric-sql/d2ts", "version": "0.1.0", "type": "module", "main": "dist/index.js", @@ -12,6 +12,10 @@ "./sqlite": { "types": "./dist/sqlite/index.d.ts", "default": "./dist/sqlite/index.js" + }, + "./store": { + "types": "./dist/store.d.ts", + "default": "./dist/store.js" } }, "scripts": { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index a4e0bb0..ee2f53b 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -8,9 +8,15 @@ importers: .: devDependencies: + '@electric-sql/d2ts': + specifier: workspace:* + version: link:packages/d2ts '@eslint/js': specifier: latest - version: 9.17.0 + version: 9.21.0 + '@types/node': + specifier: ^22.10.2 + version: 22.10.2 '@typescript-eslint/eslint-plugin': specifier: ^7.0.0 version: 7.18.0(@typescript-eslint/parser@7.18.0(eslint@9.16.0)(typescript@5.6.3))(eslint@9.16.0)(typescript@5.6.3) @@ -23,6 +29,9 @@ importers: eslint-config-prettier: specifier: ^9.0.0 version: 9.1.0(eslint@9.16.0) + tsx: + specifier: ^4.7.0 + version: 4.19.2 typescript: specifier: ^5.0.0 version: 5.6.3 @@ -71,7 +80,7 @@ importers: packages/d2ts-benchmark: dependencies: - d2ts: + '@electric-sql/d2ts': specifier: workspace:* version: link:../d2ts devDependencies: @@ -401,8 +410,8 @@ packages: resolution: {integrity: sha512-tw2HxzQkrbeuvyj1tG2Yqq+0H9wGoI2IMk4EOsQeX+vmd75FtJAzf+gTA69WF+baUKRYQ3x2kbLE08js5OsTVg==} engines: {node: ^18.18.0 || ^20.9.0 || >=21.1.0} - '@eslint/js@9.17.0': - resolution: {integrity: sha512-Sxc4hqcs1kTu0iID3kcZDW3JHq2a77HO9P8CP6YEA/FpH3Ll8UXE2r/86Rz9YJLKme39S9vU5OWNjC6Xl0Cr3w==} + '@eslint/js@9.21.0': + resolution: {integrity: sha512-BqStZ3HX8Yz6LvsF5ByXYrtigrV5AXADWLAGc7PH/1SxOb7/FIYYMszZZWiUou/GB9P2lXWk2SV4d+Z8h0nknw==} engines: {node: ^18.18.0 || ^20.9.0 || >=21.1.0} '@eslint/object-schema@2.1.5': @@ -558,9 +567,6 @@ packages: '@types/json-schema@7.0.15': resolution: {integrity: sha512-5+fP8P8MFNC+AyZCDxrB2pkZFPGzqQWUzpSeuuVLvm8VMcorNYavBqoFcxK8bQz4Qsbn4oUEEem4wDLfcysGHA==} - '@types/node@20.17.6': - resolution: {integrity: sha512-VEI7OdvK2wP7XHnsuXbAJnEpEkF6NjSN45QJlL4VGqZSXsnicpesdTWsg9RISeSdYd3yeRj/y3k5KGjUXYnFwQ==} - '@types/node@22.10.2': resolution: {integrity: sha512-Xxr6BBRCAOQixvonOye19wnzyDiUtTeqldOOmj3CkeblonbccA12PFwlufvRdrpjXxqnmUaeiU5EOA+7s5diUQ==} @@ -1513,9 +1519,6 @@ packages: ufo@1.5.4: resolution: {integrity: sha512-UsUk3byDzKd04EyoZ7U4DOlxQaD14JUKQl6/P7wiX4FNvUfm3XL246n9W5AmqwW5RSFJ27NAuM0iLscAOYUiGQ==} - undici-types@6.19.8: - resolution: {integrity: sha512-ve2KP6f/JnbPBFyobGHuerC9g1FYGn/F8n1LWTwNxCEzd6IfqTwUQcNXgEtmmQ6DlRrC1hrSrBnCZPokRrDHjw==} - undici-types@6.20.0: resolution: {integrity: sha512-Ny6QZ2Nju20vw1SRHe3d9jVu6gJ+4e3+MMpqu7pqE5HT6WsTSlce++GQmK5UXS8mzV8DSYHrQH+Xrf2jVcuKNg==} @@ -1819,7 +1822,7 @@ snapshots: '@eslint/js@9.16.0': {} - '@eslint/js@9.17.0': {} + '@eslint/js@9.21.0': {} '@eslint/object-schema@2.1.5': {} @@ -1918,16 +1921,12 @@ snapshots: '@types/better-sqlite3@7.6.12': dependencies: - '@types/node': 20.17.6 + '@types/node': 22.10.2 '@types/estree@1.0.6': {} '@types/json-schema@7.0.15': {} - '@types/node@20.17.6': - dependencies: - undici-types: 6.19.8 - '@types/node@22.10.2': dependencies: undici-types: 6.20.0 @@ -2979,8 +2978,6 @@ snapshots: ufo@1.5.4: {} - undici-types@6.19.8: {} - undici-types@6.20.0: {} uri-js@4.4.1: