Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
4981ba6
Draft of an electricStreamToD2Input method
samwillis Dec 27, 2024
c43bd1f
Add tests for the electric adapter
samwillis Dec 30, 2024
0002e2c
WIP Example with Electric
samwillis Dec 31, 2024
629877f
Fix tests
samwillis Dec 31, 2024
5c95b52
Merge branch 'main' into samwillis/electric
samwillis Feb 25, 2025
accae85
Working electric adapter with MultiShapeStream
samwillis Feb 25, 2025
32eba44
rename checkForUpdatesAfter to checkForUpdatesAfterMs to match new Mu…
samwillis Feb 26, 2025
3998357
SQLite version of the buffer operator
samwillis Feb 26, 2025
bb3384e
Fix electric tests
samwillis Feb 26, 2025
8284c4b
fully working example
samwillis Feb 26, 2025
1728b3b
Merge branch 'main' into samwillis/electric
samwillis Feb 26, 2025
4dac4f1
Update electric example to latest draft electric
samwillis Feb 27, 2025
7b2b2cd
Move the electric example to the examples dir
samwillis Feb 27, 2025
787fb1b
readme for the example
samwillis Feb 27, 2025
3bc838c
add a outputElectricMessages operator that converts messeges in the D…
samwillis Feb 27, 2025
cc4b0c5
Remove import
samwillis Feb 27, 2025
b11f5a1
Merge branch 'main' into samwillis/electric
samwillis Feb 27, 2025
4ae3b7b
Merge branch 'main' into samwillis/electric
samwillis Mar 2, 2025
9fd3141
Merge branch 'main' into samwillis/electric
samwillis Mar 3, 2025
f2a5f2e
Update to use published client packages
samwillis Mar 3, 2025
834558a
additional tests for electric
samwillis Mar 3, 2025
affc945
changeset
samwillis Mar 3, 2025
2915b8e
Fix linting errors
samwillis Mar 3, 2025
1fc066c
use correct docker for electric
samwillis Mar 3, 2025
40701fb
Remove empty test script from example
samwillis Mar 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion packages/d2ts/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
"./sqlite": {
"types": "./dist/sqlite/index.d.ts",
"default": "./dist/sqlite/index.js"
},
"./electric": {
"types": "./dist/electric/index.d.ts",
"default": "./dist/electric/index.js"
}
},
"scripts": {
Expand All @@ -24,23 +28,28 @@
"format:check": "prettier --check \"src/**/*.{ts,tsx,js,jsx,json}\" \"tests/**/*.{ts,tsx,js,jsx,json}\""
},
"devDependencies": {
"@electric-sql/client": "file:/Users/samwillis/Code/electric/packages/typescript-client/electric-sql-client-1.0.0-beta.3.tgz",
"@types/better-sqlite3": "^7.6.12",
"@types/node": "^22.10.2",
"@typescript-eslint/eslint-plugin": "^8.18.0",
"@typescript-eslint/parser": "^8.18.0",
"better-sqlite3": "^11.7.0",
"eslint": "^9.16.0",
"eslint-config-prettier": "^9.1.0",
"eslint-plugin-prettier": "^5.1.3",
"eslint": "^9.16.0",
"prettier": "^3.2.5",
"tsx": "^4.7.0",
"typescript": "^5.7.2",
"vitest": "^2.1.8"
},
"peerDependencies": {
"@electric-sql/client": "file:/Users/samwillis/Code/electric/packages/typescript-client/electric-sql-client-1.0.0-beta.3.tgz",
"better-sqlite3": "^11.7.0"
},
"peerDependenciesMeta": {
"@electric-sql/client": {
"optional": true
},
"better-sqlite3": {
"optional": true
}
Expand Down
190 changes: 190 additions & 0 deletions packages/d2ts/src/electric/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
import { D2, RootStreamBuilder } from '../d2.js'
import { MultiSetArray } from '../multiset.js'
import { type Version, type Antichain } from '../order.js'
import {
type Row,
type ShapeStreamInterface,
isChangeMessage,
isControlMessage,
} from '@electric-sql/client'

/*
Electric uses Postgres LSNs to track progress, each message is annotated with an LSN.
Currently the LSN is a string in the format "LSN_sequence", we need to extract the
number from this to use as the version for each message. In the future we intend to add
the LSN as a header to each message, so we can remove this logic.
We need to keep track of these and use them to send as the version for each message to
the D2 input stream.
D2 also requires a a frontier message for be sent, this is the lower bound for all
future messages.
To do this we:
- Keep track of the last LSN we've seen
- Send the LSN as the version for each message
- When we receive an `up-to-date` message, we send the last LSN+1 as the frontier. The
addition of 1 is to account for the fact that the last LSN is the version of the last
message, and we need to send the version of the next message as the frontier.
*/

export interface ElectricStreamToD2InputOptions<T extends Row<unknown> = Row> {
/** D2 Graph to send messages to */
graph: D2
/** The Electric ShapeStream to consume */
stream: ShapeStreamInterface<T>
/** The D2 input stream to send messages to */
input: RootStreamBuilder<[key: string, T]>
/** Optional function to convert LSN to version number/Version */
lsnToVersion?: (lsn: number) => number | Version
/** Optional function to convert LSN to frontier number/Antichain */
lsnToFrontier?: (lsn: number) => number | Antichain
/** Initial LSN */
initialLsn?: number
/** When to run the graph */
runOn?: 'up-to-date' | 'lsn-advance' | false
/** Whether to log debug messages */
debug?: boolean | typeof console.log
}

/**
* Connects an Electric ShapeStream to a D2 input stream
* IMPORTANT: Requires the ShapeStream to be configured with `replica: 'full'`
* @param options Configuration options
* @param options.stream The Electric ShapeStream to consume
* @param options.input The D2 input stream to send messages to
* @param options.lsnToVersion Optional function to convert LSN to version number/Version
* @param options.lsnToFrontier Optional function to convert LSN to frontier number/Antichain
* @returns The input stream for chaining
*/
export function electricStreamToD2Input<T extends Row<unknown> = Row>({
graph,
stream,
input,
lsnToVersion = (lsn: number) => lsn,
lsnToFrontier = (lsn: number) => lsn,
initialLsn = 0,
runOn = 'up-to-date',
debug = false,
}: ElectricStreamToD2InputOptions<T>): RootStreamBuilder<[key: string, T]> {
let lastLsn: number = initialLsn
let changes: MultiSetArray<[key: string, T]> = []

const sendChanges = (lsn: number) => {
const version = lsnToVersion(lsn)
log?.(`sending ${changes.length} changes at version ${version}`)
if (changes.length > 0) {
input.sendData(version, [...changes])
}
changes = []
}

const sendFrontier = (lsn: number) => {
const frontier = lsnToFrontier(lsn + 1) // +1 to account for the fact that the last LSN is the version of the last message
log?.(`sending frontier ${frontier}`)
input.sendFrontier(frontier)
}

const log =
typeof debug === 'function'
? debug
: debug === true
? console.log
: undefined

log?.('subscribing to stream')
stream.subscribe((messages) => {
log?.(`received ${messages.length} messages`)
for (const message of messages) {
if (isControlMessage(message)) {
log?.(`- control message: ${message.headers.control}`)
// Handle control message
if (message.headers.control === 'up-to-date') {
log?.(`up-to-date ${JSON.stringify(message, null, 2)}`)
if (changes.length > 0) {
sendChanges(lastLsn)
}
if (typeof message.headers.global_last_seen_lsn !== `number`) {
throw new Error(`global_last_seen_lsn is not a number`)
}
const lsn = message.headers.global_last_seen_lsn
sendFrontier(lsn)
if (runOn === 'up-to-date' || runOn === 'lsn-advance') {
log?.('Running graph on up-to-date')
graph.run()
}
}
} else if (isChangeMessage(message)) {
log?.(`- change message: ${message.headers.operation}`)
// Handle change message
if (
message.headers.lsn !== undefined &&
typeof message.headers.lsn !== `number`
) {
throw new Error(`lsn is not a number`)
}
const lsn = message.headers.lsn ?? initialLsn // The LSN is not present on the initial snapshot
const last: boolean =
(message.headers.last as boolean | undefined) ?? false
switch (message.headers.operation) {
case 'insert':
changes.push([[message.key, message.value], 1])
break
case 'update':
// An update is a delete followed by an insert
changes.push([[message.key, message.value], -1]) // We don't have the old value, TODO: check if this causes issues
changes.push([[message.key, message.value], 1])
break
case 'delete':
changes.push([[message.key, message.value], -1])
break
}
if (last) {
sendChanges(lsn)
sendFrontier(lsn)
if (runOn === 'lsn-advance') {
log?.('Running graph on lsn-advance')
graph.run()
}
}
if (lsn > lastLsn) {
lastLsn = lsn
}
}
}
})

return input
}

/*
// Used something like this:

// Create D2 graph
const graph = new D2({ initialFrontier: 0 })

// Create D2 input
const input = graph.newInput<any>()

// Configure the pipeline
input
.pipe(
map(([key, data]) => data.value),
filter(value => value > 10)
)

// Finalize graph
graph.finalize()

// Create Electric stream (example)
const electricStream = new ShapeStream({
url: 'http://localhost:3000/v1/shape',
params: {
table: 'items',
replica: 'full',
}
})

// Connect Electric stream to D2 input
electricStreamToD2Input({
stream: electricStream,
input,
})
*/
78 changes: 78 additions & 0 deletions packages/d2ts/src/operators/buffer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import {
IStreamBuilder,
PipedOperator,
DataMessage,
MessageType,
} from '../types.js'
import { DifferenceStreamWriter, UnaryOperator } from '../graph.js'
import { StreamBuilder } from '../d2.js'
import { MultiSet } from '../multiset.js'
import { Antichain, Version } from '../order.js'
import { DefaultMap } from '../utils.js'

/**
* Operator that buffers collections at each version
* Ensured that completed versions are sent to the output as a whole, and in order
*/
export class BufferOperator<T> extends UnaryOperator<T> {
#collections = new DefaultMap<Version, MultiSet<T>>(() => new MultiSet<T>())

run(): void {
for (const message of this.inputMessages()) {
if (message.type === MessageType.DATA) {
const { version, collection } = message.data as DataMessage<T>
this.#collections.update(version, (existing) => {
existing.extend(collection)
return existing
})
} else if (message.type === MessageType.FRONTIER) {
const frontier = message.data as Antichain
if (!this.inputFrontier().lessEqual(frontier)) {
throw new Error('Invalid frontier update')
}
this.setInputFrontier(frontier)
}
}

// Find versions that are complete (not covered by input frontier)
const finishedVersions = Array.from(this.#collections.entries()).filter(
([version]) => !this.inputFrontier().lessEqualVersion(version),
)

// Process and remove finished versions
for (const [version, collection] of finishedVersions) {
this.#collections.delete(version)
this.output.sendData(version, collection)
}

if (!this.outputFrontier.lessEqual(this.inputFrontier())) {
throw new Error('Invalid frontier state')
}
if (this.outputFrontier.lessThan(this.inputFrontier())) {
this.outputFrontier = this.inputFrontier()
this.output.sendFrontier(this.outputFrontier)
}
}
}

/**
* Buffers the elements in the stream
* Ensured that completed versions are sent to the output as a whole, and in order
*/
export function buffer<T>(): PipedOperator<T, T> {
return (stream: IStreamBuilder<T>): IStreamBuilder<T> => {
const output = new StreamBuilder<T>(
stream.graph,
new DifferenceStreamWriter<T>(),
)
const operator = new BufferOperator<T>(
stream.graph.getNextOperatorId(),
stream.connectReader(),
output.writer,
stream.graph.frontier(),
)
stream.graph.addOperator(operator)
stream.graph.addStream(output.connectReader())
return output
}
}
1 change: 1 addition & 0 deletions packages/d2ts/src/operators/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export * from './concat.js'
export * from './debug.js'
export * from './output.js'
export * from './consolidate.js'
export * from './buffer.js'
export * from './join.js'
export * from './reduce.js'
export * from './count.js'
Expand Down
Loading
Loading