diff --git a/README.md b/README.md index a934696..336a9d9 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ A D2TS pipe is also fully type safe, inferring the types at each step of the pip - **Incremental Processing**: Efficiently process changes to input data without recomputing everything - **Rich Operators**: Supports common operations with a pipeline API: + - `buffer()`: Buffer and emit versions when they are complete - `concat()`: Concatenate two streams - `consolidate()`: Consolidates the elements in the stream at each version - `count()`: Count elements by key @@ -142,6 +143,14 @@ const multiSet = new MultiSet<[string, Comment]>([ ### Operators +#### `buffer()` + +Buffers the elements of the stream, emitting a version when the buffer is complete. + +```typescript +const output = input.pipe(buffer()) +``` + #### `concat(other: IStreamBuilder)` Concatenates two input streams - the output stream will contain the elements of both streams. diff --git a/packages/d2ts/src/operators/buffer.ts b/packages/d2ts/src/operators/buffer.ts new file mode 100644 index 0000000..d9680e3 --- /dev/null +++ b/packages/d2ts/src/operators/buffer.ts @@ -0,0 +1,78 @@ +import { + IStreamBuilder, + PipedOperator, + DataMessage, + MessageType, +} from '../types.js' +import { DifferenceStreamWriter, UnaryOperator } from '../graph.js' +import { StreamBuilder } from '../d2.js' +import { MultiSet } from '../multiset.js' +import { Antichain, Version } from '../order.js' +import { DefaultMap } from '../utils.js' + +/** + * Operator that buffers collections at each version + * Ensured that completed versions are sent to the output as a whole, and in order + */ +export class BufferOperator extends UnaryOperator { + #collections = new DefaultMap>(() => new MultiSet()) + + run(): void { + for (const message of this.inputMessages()) { + if (message.type === MessageType.DATA) { + const { version, collection } = message.data as DataMessage + this.#collections.update(version, (existing) => { + existing.extend(collection) + return existing + }) + } else if (message.type === MessageType.FRONTIER) { + const frontier = message.data as Antichain + if (!this.inputFrontier().lessEqual(frontier)) { + throw new Error('Invalid frontier update') + } + this.setInputFrontier(frontier) + } + } + + // Find versions that are complete (not covered by input frontier) + const finishedVersions = Array.from(this.#collections.entries()).filter( + ([version]) => !this.inputFrontier().lessEqualVersion(version), + ) + + // Process and remove finished versions + for (const [version, collection] of finishedVersions) { + this.#collections.delete(version) + this.output.sendData(version, collection) + } + + if (!this.outputFrontier.lessEqual(this.inputFrontier())) { + throw new Error('Invalid frontier state') + } + if (this.outputFrontier.lessThan(this.inputFrontier())) { + this.outputFrontier = this.inputFrontier() + this.output.sendFrontier(this.outputFrontier) + } + } +} + +/** + * Buffers the elements in the stream + * Ensured that completed versions are sent to the output as a whole, and in order + */ +export function buffer(): PipedOperator { + return (stream: IStreamBuilder): IStreamBuilder => { + const output = new StreamBuilder( + stream.graph, + new DifferenceStreamWriter(), + ) + const operator = new BufferOperator( + stream.graph.getNextOperatorId(), + stream.connectReader(), + output.writer, + stream.graph.frontier(), + ) + stream.graph.addOperator(operator) + stream.graph.addStream(output.connectReader()) + return output + } +} diff --git a/packages/d2ts/src/operators/index.ts b/packages/d2ts/src/operators/index.ts index 61b16da..fa728f2 100644 --- a/packages/d2ts/src/operators/index.ts +++ b/packages/d2ts/src/operators/index.ts @@ -6,6 +6,7 @@ export * from './concat.js' export * from './debug.js' export * from './output.js' export * from './consolidate.js' +export * from './buffer.js' export * from './join.js' export * from './reduce.js' export * from './count.js' diff --git a/packages/d2ts/src/sqlite/operators/buffer.ts b/packages/d2ts/src/sqlite/operators/buffer.ts new file mode 100644 index 0000000..4029d83 --- /dev/null +++ b/packages/d2ts/src/sqlite/operators/buffer.ts @@ -0,0 +1,164 @@ +import { StreamBuilder } from '../../d2.js' +import { + DataMessage, + MessageType, + IStreamBuilder, + PipedOperator, +} from '../../types.js' +import { MultiSet } from '../../multiset.js' +import { + DifferenceStreamReader, + DifferenceStreamWriter, + UnaryOperator, +} from '../../graph.js' +import { Version, Antichain } from '../../order.js' +import { SQLiteDb, SQLiteStatement } from '../database.js' + +interface CollectionRow { + version: string + collection: string +} + +interface CollectionParams { + version: string + collection: string +} + +/** + * Operator that buffers collections at each version, persisting state to SQLite + * Ensures that completed versions are sent to the output as a whole, and in order + */ +export class BufferOperatorSQLite extends UnaryOperator { + #preparedStatements: { + insert: SQLiteStatement + update: SQLiteStatement + get: SQLiteStatement<[string], CollectionRow> + delete: SQLiteStatement<[string]> + getAllVersions: SQLiteStatement<[], CollectionRow> + } + + constructor( + id: number, + inputA: DifferenceStreamReader, + output: DifferenceStreamWriter, + initialFrontier: Antichain, + db: SQLiteDb, + ) { + super(id, inputA, output, initialFrontier) + + // Initialize database + db.exec(` + CREATE TABLE IF NOT EXISTS buffer_collections_${this.id} ( + version TEXT PRIMARY KEY, + collection TEXT NOT NULL + ) + `) + db.exec(` + CREATE INDEX IF NOT EXISTS buffer_collections_${this.id}_version + ON buffer_collections_${this.id}(version); + `) + + // Prepare statements + this.#preparedStatements = { + insert: db.prepare( + `INSERT INTO buffer_collections_${this.id} (version, collection) VALUES (@version, @collection)`, + ), + update: db.prepare( + `UPDATE buffer_collections_${this.id} SET collection = @collection WHERE version = @version`, + ), + get: db.prepare( + `SELECT collection FROM buffer_collections_${this.id} WHERE version = ?`, + ), + delete: db.prepare( + `DELETE FROM buffer_collections_${this.id} WHERE version = ?`, + ), + getAllVersions: db.prepare( + `SELECT version, collection FROM buffer_collections_${this.id}`, + ), + } + } + + run(): void { + for (const message of this.inputMessages()) { + if (message.type === MessageType.DATA) { + const { version, collection } = message.data as DataMessage + + // Get existing collection or create new one + const existingData = this.#preparedStatements.get.get(version.toJSON()) + const existingCollection = existingData + ? MultiSet.fromJSON(existingData.collection) + : new MultiSet() + + // Merge collections + existingCollection.extend(collection) + + // Store updated collection + if (existingData) { + this.#preparedStatements.update.run({ + version: version.toJSON(), + collection: existingCollection.toJSON(), + }) + } else { + this.#preparedStatements.insert.run({ + version: version.toJSON(), + collection: existingCollection.toJSON(), + }) + } + } else if (message.type === MessageType.FRONTIER) { + const frontier = message.data as Antichain + if (!this.inputFrontier().lessEqual(frontier)) { + throw new Error('Invalid frontier update') + } + this.setInputFrontier(frontier) + } + } + + // Find versions that are complete (not covered by input frontier) + const allVersions = this.#preparedStatements.getAllVersions.all() + const finishedVersions = allVersions + .map((row) => ({ + version: Version.fromJSON(row.version), + collection: MultiSet.fromJSON(row.collection), + })) + .filter(({ version }) => !this.inputFrontier().lessEqualVersion(version)) + + // Process and remove finished versions + for (const { version, collection } of finishedVersions) { + this.#preparedStatements.delete.run(version.toJSON()) + this.output.sendData(version, collection) + } + + if (!this.outputFrontier.lessEqual(this.inputFrontier())) { + throw new Error('Invalid frontier state') + } + if (this.outputFrontier.lessThan(this.inputFrontier())) { + this.outputFrontier = this.inputFrontier() + this.output.sendFrontier(this.outputFrontier) + } + } +} + +/** + * Buffers the elements in the stream + * Ensures that completed versions are sent to the output as a whole, and in order + * Persists state to SQLite + * @param db - The SQLite database + */ +export function buffer(db: SQLiteDb): PipedOperator { + return (stream: IStreamBuilder): IStreamBuilder => { + const output = new StreamBuilder( + stream.graph, + new DifferenceStreamWriter(), + ) + const operator = new BufferOperatorSQLite( + stream.graph.getNextOperatorId(), + stream.connectReader(), + output.writer, + stream.graph.frontier(), + db, + ) + stream.graph.addOperator(operator) + stream.graph.addStream(output.connectReader()) + return output + } +} diff --git a/packages/d2ts/tests/operators-sqlite/buffer.test.ts b/packages/d2ts/tests/operators-sqlite/buffer.test.ts new file mode 100644 index 0000000..2804c7d --- /dev/null +++ b/packages/d2ts/tests/operators-sqlite/buffer.test.ts @@ -0,0 +1,191 @@ +import { describe, test, expect, beforeEach, afterEach } from 'vitest' +import { D2 } from '../../src/d2.js' +import { MultiSet } from '../../src/multiset.js' +import { Antichain, v } from '../../src/order.js' +import { DataMessage, MessageType } from '../../src/types.js' +import { buffer } from '../../src/sqlite/operators/buffer.js' +import { output } from '../../src/operators/index.js' +import Database from 'better-sqlite3' +import { BetterSQLite3Wrapper } from '../../src/sqlite/database.js' +import fs from 'fs' +import path from 'path' + +const DB_FILENAME = 'test-buffer.db' + +describe('SQLite Operators', () => { + describe('Buffer operation', () => { + let db: BetterSQLite3Wrapper + + beforeEach(() => { + const sqlite = new Database(':memory:') + db = new BetterSQLite3Wrapper(sqlite) + }) + + afterEach(() => { + db.close() + }) + + test('basic buffer operation', () => { + const graph = new D2({ initialFrontier: v([0]) }) + const input = graph.newInput() + const messages: DataMessage[] = [] + + input.pipe( + buffer(db), + output((message) => { + if (message.type === MessageType.DATA) { + messages.push(message.data) + } + }), + ) + + graph.finalize() + + input.sendData( + v([1]), + new MultiSet([ + [1, 1], + [2, 1], + ]), + ) + input.sendData( + v([2]), + new MultiSet([ + [3, 1], + [4, 1], + ]), + ) + + input.sendFrontier(new Antichain([v([2])])) + graph.run() + + const data = messages.map((m) => ({ + version: m.version, + collection: m.collection.getInner(), + })) + + // Should output complete buffered collection for version [1] + expect(data).toEqual([ + { + version: v([2]), + collection: [ + [1, 1], + [2, 1], + ], + }, + ]) + + messages.length = 0 + + input.sendFrontier(new Antichain([v([3])])) + graph.run() + + const data2 = messages.map((m) => ({ + version: m.version, + collection: m.collection.getInner(), + })) + + // Should output complete buffered collection for version [2] + expect(data2).toEqual([ + { + version: v([3]), + collection: [ + [3, 1], + [4, 1], + ], + }, + ]) + }) + }) + + describe('Buffer operation with persistence', () => { + const dbPath = path.join(import.meta.dirname, DB_FILENAME) + let db: BetterSQLite3Wrapper + + beforeEach(() => { + if (fs.existsSync(dbPath)) { + fs.unlinkSync(dbPath) + } + const sqlite = new Database(dbPath) + db = new BetterSQLite3Wrapper(sqlite) + }) + + afterEach(() => { + db.close() + if (fs.existsSync(dbPath)) { + fs.unlinkSync(dbPath) + } + }) + + test('persists and recovers state', () => { + // First graph instance - initial processing + let messages: DataMessage[] = [] + let graph = new D2({ initialFrontier: v([0]) }) + const input = graph.newInput() + + input.pipe( + buffer(db), + output((message) => { + if (message.type === MessageType.DATA) { + messages.push(message.data) + } + }), + ) + + graph.finalize() + + // Send initial data + input.sendData( + v([1]), + new MultiSet([ + [1, 1], + [2, 1], + ]), + ) + + graph.run() + + // Close first graph instance and database + db.close() + + // Create new graph instance with same database + messages = [] + db = new BetterSQLite3Wrapper(new Database(dbPath)) + graph = new D2({ initialFrontier: v([1]) }) + const newInput = graph.newInput() + + newInput.pipe( + buffer(db), + output((message) => { + if (message.type === MessageType.DATA) { + messages.push(message.data) + } + }), + ) + + graph.finalize() + + // Send new data + newInput.sendData( + v([1]), + new MultiSet([ + [3, 1], + [4, 1], + ]), + ) + newInput.sendFrontier(new Antichain([v([2])])) + + graph.run() + + // Verify that new results work with persisted state + expect(messages.map((m) => m.collection.getInner())).toEqual([ + [ + [1, 1], + [2, 1], + [3, 1], + [4, 1], + ], + ]) + }) + }) +}) diff --git a/packages/d2ts/tests/operators/buffer.test.ts b/packages/d2ts/tests/operators/buffer.test.ts new file mode 100644 index 0000000..60d61f0 --- /dev/null +++ b/packages/d2ts/tests/operators/buffer.test.ts @@ -0,0 +1,82 @@ +import { describe, test, expect } from 'vitest' +import { D2 } from '../../src/d2.js' +import { MultiSet } from '../../src/multiset.js' +import { Antichain, v } from '../../src/order.js' +import { DataMessage, MessageType } from '../../src/types.js' +import { buffer, output } from '../../src/operators/index.js' + +describe('Operators', () => { + describe('Buffer operation', () => { + test('basic buffer operation', () => { + const graph = new D2({ initialFrontier: v([0]) }) + const input = graph.newInput() + const messages: DataMessage[] = [] + + input.pipe( + buffer(), + output((message) => { + if (message.type === MessageType.DATA) { + messages.push(message.data) + } + }), + ) + + graph.finalize() + + input.sendData( + v([1]), + new MultiSet([ + [1, 1], + [2, 1], + ]), + ) + input.sendData( + v([2]), + new MultiSet([ + [3, 1], + [4, 1], + ]), + ) + + input.sendFrontier(new Antichain([v([2])])) + graph.run() + + const data = messages.map((m) => ({ + version: m.version, + collection: m.collection.getInner(), + })) + + // Should output complete buffered collection for version [1] + expect(data).toEqual([ + { + version: v([2]), + collection: [ + [1, 1], + [2, 1], + ], + }, + ]) + + messages.length = 0 + + input.sendFrontier(new Antichain([v([3])])) + graph.run() + + const data2 = messages.map((m) => ({ + version: m.version, + collection: m.collection.getInner(), + })) + + // Should output complete buffered collection for version [2] + expect(data2).toEqual([ + { + version: v([3]), + collection: [ + [3, 1], + [4, 1], + ], + }, + ]) + }) + }) +})