Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ A D2TS pipe is also fully type safe, inferring the types at each step of the pip

- **Incremental Processing**: Efficiently process changes to input data without recomputing everything
- **Rich Operators**: Supports common operations with a pipeline API:
- `buffer()`: Buffer and emit versions when they are complete
- `concat()`: Concatenate two streams
- `consolidate()`: Consolidates the elements in the stream at each version
- `count()`: Count elements by key
Expand Down Expand Up @@ -142,6 +143,14 @@ const multiSet = new MultiSet<[string, Comment]>([

### Operators

#### `buffer()`

Buffers the elements of the stream, emitting a version when the buffer is complete.

```typescript
const output = input.pipe(buffer())
```

#### `concat(other: IStreamBuilder<T>)`

Concatenates two input streams - the output stream will contain the elements of both streams.
Expand Down
78 changes: 78 additions & 0 deletions packages/d2ts/src/operators/buffer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import {
IStreamBuilder,
PipedOperator,
DataMessage,
MessageType,
} from '../types.js'
import { DifferenceStreamWriter, UnaryOperator } from '../graph.js'
import { StreamBuilder } from '../d2.js'
import { MultiSet } from '../multiset.js'
import { Antichain, Version } from '../order.js'
import { DefaultMap } from '../utils.js'

/**
* Operator that buffers collections at each version
* Ensured that completed versions are sent to the output as a whole, and in order
*/
export class BufferOperator<T> extends UnaryOperator<T> {
#collections = new DefaultMap<Version, MultiSet<T>>(() => new MultiSet<T>())

run(): void {
for (const message of this.inputMessages()) {
if (message.type === MessageType.DATA) {
const { version, collection } = message.data as DataMessage<T>
this.#collections.update(version, (existing) => {
existing.extend(collection)
return existing
})
} else if (message.type === MessageType.FRONTIER) {
const frontier = message.data as Antichain
if (!this.inputFrontier().lessEqual(frontier)) {
throw new Error('Invalid frontier update')
}
this.setInputFrontier(frontier)
}
}

// Find versions that are complete (not covered by input frontier)
const finishedVersions = Array.from(this.#collections.entries()).filter(
([version]) => !this.inputFrontier().lessEqualVersion(version),
)

// Process and remove finished versions
for (const [version, collection] of finishedVersions) {
this.#collections.delete(version)
this.output.sendData(version, collection)
}

if (!this.outputFrontier.lessEqual(this.inputFrontier())) {
throw new Error('Invalid frontier state')
}
if (this.outputFrontier.lessThan(this.inputFrontier())) {
this.outputFrontier = this.inputFrontier()
this.output.sendFrontier(this.outputFrontier)
}
}
}

/**
* Buffers the elements in the stream
* Ensured that completed versions are sent to the output as a whole, and in order
*/
export function buffer<T>(): PipedOperator<T, T> {
return (stream: IStreamBuilder<T>): IStreamBuilder<T> => {
const output = new StreamBuilder<T>(
stream.graph,
new DifferenceStreamWriter<T>(),
)
const operator = new BufferOperator<T>(
stream.graph.getNextOperatorId(),
stream.connectReader(),
output.writer,
stream.graph.frontier(),
)
stream.graph.addOperator(operator)
stream.graph.addStream(output.connectReader())
return output
}
}
1 change: 1 addition & 0 deletions packages/d2ts/src/operators/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export * from './concat.js'
export * from './debug.js'
export * from './output.js'
export * from './consolidate.js'
export * from './buffer.js'
export * from './join.js'
export * from './reduce.js'
export * from './count.js'
Expand Down
164 changes: 164 additions & 0 deletions packages/d2ts/src/sqlite/operators/buffer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
import { StreamBuilder } from '../../d2.js'
import {
DataMessage,
MessageType,
IStreamBuilder,
PipedOperator,
} from '../../types.js'
import { MultiSet } from '../../multiset.js'
import {
DifferenceStreamReader,
DifferenceStreamWriter,
UnaryOperator,
} from '../../graph.js'
import { Version, Antichain } from '../../order.js'
import { SQLiteDb, SQLiteStatement } from '../database.js'

interface CollectionRow {
version: string
collection: string
}

interface CollectionParams {
version: string
collection: string
}

/**
* Operator that buffers collections at each version, persisting state to SQLite
* Ensures that completed versions are sent to the output as a whole, and in order
*/
export class BufferOperatorSQLite<T> extends UnaryOperator<T> {
#preparedStatements: {
insert: SQLiteStatement<CollectionParams>
update: SQLiteStatement<CollectionParams>
get: SQLiteStatement<[string], CollectionRow>
delete: SQLiteStatement<[string]>
getAllVersions: SQLiteStatement<[], CollectionRow>
}

constructor(
id: number,
inputA: DifferenceStreamReader<T>,
output: DifferenceStreamWriter<T>,
initialFrontier: Antichain,
db: SQLiteDb,
) {
super(id, inputA, output, initialFrontier)

// Initialize database
db.exec(`
CREATE TABLE IF NOT EXISTS buffer_collections_${this.id} (
version TEXT PRIMARY KEY,
collection TEXT NOT NULL
)
`)
db.exec(`
CREATE INDEX IF NOT EXISTS buffer_collections_${this.id}_version
ON buffer_collections_${this.id}(version);
`)

// Prepare statements
this.#preparedStatements = {
insert: db.prepare(
`INSERT INTO buffer_collections_${this.id} (version, collection) VALUES (@version, @collection)`,
),
update: db.prepare(
`UPDATE buffer_collections_${this.id} SET collection = @collection WHERE version = @version`,
),
get: db.prepare(
`SELECT collection FROM buffer_collections_${this.id} WHERE version = ?`,
),
delete: db.prepare(
`DELETE FROM buffer_collections_${this.id} WHERE version = ?`,
),
getAllVersions: db.prepare(
`SELECT version, collection FROM buffer_collections_${this.id}`,
),
}
}

run(): void {
for (const message of this.inputMessages()) {
if (message.type === MessageType.DATA) {
const { version, collection } = message.data as DataMessage<T>

// Get existing collection or create new one
const existingData = this.#preparedStatements.get.get(version.toJSON())
const existingCollection = existingData
? MultiSet.fromJSON(existingData.collection)
: new MultiSet<T>()

// Merge collections
existingCollection.extend(collection)

// Store updated collection
if (existingData) {
this.#preparedStatements.update.run({
version: version.toJSON(),
collection: existingCollection.toJSON(),
})
} else {
this.#preparedStatements.insert.run({
version: version.toJSON(),
collection: existingCollection.toJSON(),
})
}
} else if (message.type === MessageType.FRONTIER) {
const frontier = message.data as Antichain
if (!this.inputFrontier().lessEqual(frontier)) {
throw new Error('Invalid frontier update')
}
this.setInputFrontier(frontier)
}
}

// Find versions that are complete (not covered by input frontier)
const allVersions = this.#preparedStatements.getAllVersions.all()
const finishedVersions = allVersions
.map((row) => ({
version: Version.fromJSON(row.version),
collection: MultiSet.fromJSON<T>(row.collection),
}))
.filter(({ version }) => !this.inputFrontier().lessEqualVersion(version))

// Process and remove finished versions
for (const { version, collection } of finishedVersions) {
this.#preparedStatements.delete.run(version.toJSON())
this.output.sendData(version, collection)
}

if (!this.outputFrontier.lessEqual(this.inputFrontier())) {
throw new Error('Invalid frontier state')
}
if (this.outputFrontier.lessThan(this.inputFrontier())) {
this.outputFrontier = this.inputFrontier()
this.output.sendFrontier(this.outputFrontier)
}
}
}

/**
* Buffers the elements in the stream
* Ensures that completed versions are sent to the output as a whole, and in order
* Persists state to SQLite
* @param db - The SQLite database
*/
export function buffer<T>(db: SQLiteDb): PipedOperator<T, T> {
return (stream: IStreamBuilder<T>): IStreamBuilder<T> => {
const output = new StreamBuilder<T>(
stream.graph,
new DifferenceStreamWriter<T>(),
)
const operator = new BufferOperatorSQLite<T>(
stream.graph.getNextOperatorId(),
stream.connectReader(),
output.writer,
stream.graph.frontier(),
db,
)
stream.graph.addOperator(operator)
stream.graph.addStream(output.connectReader())
return output
}
}
Loading