Skip to content

Commit b255fba

Browse files
authored
feat: new buffer operator (#13)
* Add a `buffer` operator * update readme
1 parent 74e9c99 commit b255fba

File tree

6 files changed

+525
-0
lines changed

6 files changed

+525
-0
lines changed

README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ A D2TS pipe is also fully type safe, inferring the types at each step of the pip
1212

1313
- **Incremental Processing**: Efficiently process changes to input data without recomputing everything
1414
- **Rich Operators**: Supports common operations with a pipeline API:
15+
- `buffer()`: Buffer and emit versions when they are complete
1516
- `concat()`: Concatenate two streams
1617
- `consolidate()`: Consolidates the elements in the stream at each version
1718
- `count()`: Count elements by key
@@ -142,6 +143,14 @@ const multiSet = new MultiSet<[string, Comment]>([
142143

143144
### Operators
144145

146+
#### `buffer()`
147+
148+
Buffers the elements of the stream, emitting a version when the buffer is complete.
149+
150+
```typescript
151+
const output = input.pipe(buffer())
152+
```
153+
145154
#### `concat(other: IStreamBuilder<T>)`
146155

147156
Concatenates two input streams - the output stream will contain the elements of both streams.
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import {
2+
IStreamBuilder,
3+
PipedOperator,
4+
DataMessage,
5+
MessageType,
6+
} from '../types.js'
7+
import { DifferenceStreamWriter, UnaryOperator } from '../graph.js'
8+
import { StreamBuilder } from '../d2.js'
9+
import { MultiSet } from '../multiset.js'
10+
import { Antichain, Version } from '../order.js'
11+
import { DefaultMap } from '../utils.js'
12+
13+
/**
14+
* Operator that buffers collections at each version
15+
* Ensured that completed versions are sent to the output as a whole, and in order
16+
*/
17+
export class BufferOperator<T> extends UnaryOperator<T> {
18+
#collections = new DefaultMap<Version, MultiSet<T>>(() => new MultiSet<T>())
19+
20+
run(): void {
21+
for (const message of this.inputMessages()) {
22+
if (message.type === MessageType.DATA) {
23+
const { version, collection } = message.data as DataMessage<T>
24+
this.#collections.update(version, (existing) => {
25+
existing.extend(collection)
26+
return existing
27+
})
28+
} else if (message.type === MessageType.FRONTIER) {
29+
const frontier = message.data as Antichain
30+
if (!this.inputFrontier().lessEqual(frontier)) {
31+
throw new Error('Invalid frontier update')
32+
}
33+
this.setInputFrontier(frontier)
34+
}
35+
}
36+
37+
// Find versions that are complete (not covered by input frontier)
38+
const finishedVersions = Array.from(this.#collections.entries()).filter(
39+
([version]) => !this.inputFrontier().lessEqualVersion(version),
40+
)
41+
42+
// Process and remove finished versions
43+
for (const [version, collection] of finishedVersions) {
44+
this.#collections.delete(version)
45+
this.output.sendData(version, collection)
46+
}
47+
48+
if (!this.outputFrontier.lessEqual(this.inputFrontier())) {
49+
throw new Error('Invalid frontier state')
50+
}
51+
if (this.outputFrontier.lessThan(this.inputFrontier())) {
52+
this.outputFrontier = this.inputFrontier()
53+
this.output.sendFrontier(this.outputFrontier)
54+
}
55+
}
56+
}
57+
58+
/**
59+
* Buffers the elements in the stream
60+
* Ensured that completed versions are sent to the output as a whole, and in order
61+
*/
62+
export function buffer<T>(): PipedOperator<T, T> {
63+
return (stream: IStreamBuilder<T>): IStreamBuilder<T> => {
64+
const output = new StreamBuilder<T>(
65+
stream.graph,
66+
new DifferenceStreamWriter<T>(),
67+
)
68+
const operator = new BufferOperator<T>(
69+
stream.graph.getNextOperatorId(),
70+
stream.connectReader(),
71+
output.writer,
72+
stream.graph.frontier(),
73+
)
74+
stream.graph.addOperator(operator)
75+
stream.graph.addStream(output.connectReader())
76+
return output
77+
}
78+
}

packages/d2ts/src/operators/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ export * from './concat.js'
66
export * from './debug.js'
77
export * from './output.js'
88
export * from './consolidate.js'
9+
export * from './buffer.js'
910
export * from './join.js'
1011
export * from './reduce.js'
1112
export * from './count.js'
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
import { StreamBuilder } from '../../d2.js'
2+
import {
3+
DataMessage,
4+
MessageType,
5+
IStreamBuilder,
6+
PipedOperator,
7+
} from '../../types.js'
8+
import { MultiSet } from '../../multiset.js'
9+
import {
10+
DifferenceStreamReader,
11+
DifferenceStreamWriter,
12+
UnaryOperator,
13+
} from '../../graph.js'
14+
import { Version, Antichain } from '../../order.js'
15+
import { SQLiteDb, SQLiteStatement } from '../database.js'
16+
17+
interface CollectionRow {
18+
version: string
19+
collection: string
20+
}
21+
22+
interface CollectionParams {
23+
version: string
24+
collection: string
25+
}
26+
27+
/**
28+
* Operator that buffers collections at each version, persisting state to SQLite
29+
* Ensures that completed versions are sent to the output as a whole, and in order
30+
*/
31+
export class BufferOperatorSQLite<T> extends UnaryOperator<T> {
32+
#preparedStatements: {
33+
insert: SQLiteStatement<CollectionParams>
34+
update: SQLiteStatement<CollectionParams>
35+
get: SQLiteStatement<[string], CollectionRow>
36+
delete: SQLiteStatement<[string]>
37+
getAllVersions: SQLiteStatement<[], CollectionRow>
38+
}
39+
40+
constructor(
41+
id: number,
42+
inputA: DifferenceStreamReader<T>,
43+
output: DifferenceStreamWriter<T>,
44+
initialFrontier: Antichain,
45+
db: SQLiteDb,
46+
) {
47+
super(id, inputA, output, initialFrontier)
48+
49+
// Initialize database
50+
db.exec(`
51+
CREATE TABLE IF NOT EXISTS buffer_collections_${this.id} (
52+
version TEXT PRIMARY KEY,
53+
collection TEXT NOT NULL
54+
)
55+
`)
56+
db.exec(`
57+
CREATE INDEX IF NOT EXISTS buffer_collections_${this.id}_version
58+
ON buffer_collections_${this.id}(version);
59+
`)
60+
61+
// Prepare statements
62+
this.#preparedStatements = {
63+
insert: db.prepare(
64+
`INSERT INTO buffer_collections_${this.id} (version, collection) VALUES (@version, @collection)`,
65+
),
66+
update: db.prepare(
67+
`UPDATE buffer_collections_${this.id} SET collection = @collection WHERE version = @version`,
68+
),
69+
get: db.prepare(
70+
`SELECT collection FROM buffer_collections_${this.id} WHERE version = ?`,
71+
),
72+
delete: db.prepare(
73+
`DELETE FROM buffer_collections_${this.id} WHERE version = ?`,
74+
),
75+
getAllVersions: db.prepare(
76+
`SELECT version, collection FROM buffer_collections_${this.id}`,
77+
),
78+
}
79+
}
80+
81+
run(): void {
82+
for (const message of this.inputMessages()) {
83+
if (message.type === MessageType.DATA) {
84+
const { version, collection } = message.data as DataMessage<T>
85+
86+
// Get existing collection or create new one
87+
const existingData = this.#preparedStatements.get.get(version.toJSON())
88+
const existingCollection = existingData
89+
? MultiSet.fromJSON(existingData.collection)
90+
: new MultiSet<T>()
91+
92+
// Merge collections
93+
existingCollection.extend(collection)
94+
95+
// Store updated collection
96+
if (existingData) {
97+
this.#preparedStatements.update.run({
98+
version: version.toJSON(),
99+
collection: existingCollection.toJSON(),
100+
})
101+
} else {
102+
this.#preparedStatements.insert.run({
103+
version: version.toJSON(),
104+
collection: existingCollection.toJSON(),
105+
})
106+
}
107+
} else if (message.type === MessageType.FRONTIER) {
108+
const frontier = message.data as Antichain
109+
if (!this.inputFrontier().lessEqual(frontier)) {
110+
throw new Error('Invalid frontier update')
111+
}
112+
this.setInputFrontier(frontier)
113+
}
114+
}
115+
116+
// Find versions that are complete (not covered by input frontier)
117+
const allVersions = this.#preparedStatements.getAllVersions.all()
118+
const finishedVersions = allVersions
119+
.map((row) => ({
120+
version: Version.fromJSON(row.version),
121+
collection: MultiSet.fromJSON<T>(row.collection),
122+
}))
123+
.filter(({ version }) => !this.inputFrontier().lessEqualVersion(version))
124+
125+
// Process and remove finished versions
126+
for (const { version, collection } of finishedVersions) {
127+
this.#preparedStatements.delete.run(version.toJSON())
128+
this.output.sendData(version, collection)
129+
}
130+
131+
if (!this.outputFrontier.lessEqual(this.inputFrontier())) {
132+
throw new Error('Invalid frontier state')
133+
}
134+
if (this.outputFrontier.lessThan(this.inputFrontier())) {
135+
this.outputFrontier = this.inputFrontier()
136+
this.output.sendFrontier(this.outputFrontier)
137+
}
138+
}
139+
}
140+
141+
/**
142+
* Buffers the elements in the stream
143+
* Ensures that completed versions are sent to the output as a whole, and in order
144+
* Persists state to SQLite
145+
* @param db - The SQLite database
146+
*/
147+
export function buffer<T>(db: SQLiteDb): PipedOperator<T, T> {
148+
return (stream: IStreamBuilder<T>): IStreamBuilder<T> => {
149+
const output = new StreamBuilder<T>(
150+
stream.graph,
151+
new DifferenceStreamWriter<T>(),
152+
)
153+
const operator = new BufferOperatorSQLite<T>(
154+
stream.graph.getNextOperatorId(),
155+
stream.connectReader(),
156+
output.writer,
157+
stream.graph.frontier(),
158+
db,
159+
)
160+
stream.graph.addOperator(operator)
161+
stream.graph.addStream(output.connectReader())
162+
return output
163+
}
164+
}

0 commit comments

Comments
 (0)