Skip to content

Commit 4edb9b0

Browse files
authored
Make compaction track modified keys (#5)
* Make compaction track modified keys * SQLite index: only compact modified keys, and other improvments * Improve compaction in sqlite index * Remove unused prepared statements * Remove early return for debugging
1 parent 51bc937 commit 4edb9b0

File tree

5 files changed

+462
-322
lines changed

5 files changed

+462
-322
lines changed

packages/d2ts-benchmark/src/index.ts

Lines changed: 106 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -206,53 +206,53 @@ function run({
206206
},
207207
})
208208

209-
// // Add D2TS join with frontier benchmark
210-
// joinSuite.add({
211-
// name: 'D2TS Join with Frontier',
212-
// setup: () => {
213-
// const graph = new D2({ initialFrontier: v([0]) })
214-
// const usersStream = graph.newInput<[number, (typeof allUsers)[0]]>()
215-
// const postsStream = graph.newInput<[number, (typeof allPosts)[0]]>()
216-
217-
// const joined = usersStream.pipe(
218-
// join(postsStream),
219-
// map(([_key, [user, post]]) => ({
220-
// userName: user.name,
221-
// postTitle: post.title,
222-
// })),
223-
// output((_data) => {
224-
// // do nothing
225-
// }),
226-
// )
227-
228-
// graph.finalize()
229-
// return { graph, usersStream, postsStream, joined }
230-
// },
231-
// firstRun: (ctx) => {
232-
// ctx.usersStream.sendData(v([1]), initialUsersSet)
233-
// ctx.postsStream.sendData(v([1]), initialPostsSet)
234-
// ctx.usersStream.sendFrontier(v([2]))
235-
// ctx.postsStream.sendFrontier(v([2]))
236-
// ctx.graph.step()
237-
// },
238-
// incrementalRun: (ctx, i) => {
239-
// const user = incrementalUsers[i]
240-
// const post1 = incrementalPosts[i * 2]
241-
// const post2 = incrementalPosts[i * 2 + 1]
242-
243-
// ctx.usersStream.sendData(v([i + 2]), new MultiSet([[[user.id, user], 1]]))
244-
// ctx.postsStream.sendData(
245-
// v([i + 2]),
246-
// new MultiSet([
247-
// [[post1.userId, post1], 1],
248-
// [[post2.userId, post2], 1],
249-
// ]),
250-
// )
251-
// ctx.usersStream.sendFrontier(v([i + 3]))
252-
// ctx.postsStream.sendFrontier(v([i + 3]))
253-
// ctx.graph.step()
254-
// },
255-
// })
209+
// Add D2TS join with frontier benchmark
210+
joinSuite.add({
211+
name: 'D2TS Join with Frontier',
212+
setup: () => {
213+
const graph = new D2({ initialFrontier: v([0]) })
214+
const usersStream = graph.newInput<[number, (typeof allUsers)[0]]>()
215+
const postsStream = graph.newInput<[number, (typeof allPosts)[0]]>()
216+
217+
const joined = usersStream.pipe(
218+
join(postsStream),
219+
map(([_key, [user, post]]) => ({
220+
userName: user.name,
221+
postTitle: post.title,
222+
})),
223+
output((_data) => {
224+
// do nothing
225+
}),
226+
)
227+
228+
graph.finalize()
229+
return { graph, usersStream, postsStream, joined }
230+
},
231+
firstRun: (ctx) => {
232+
ctx.usersStream.sendData(v([1]), initialUsersSet)
233+
ctx.postsStream.sendData(v([1]), initialPostsSet)
234+
ctx.usersStream.sendFrontier(v([2]))
235+
ctx.postsStream.sendFrontier(v([2]))
236+
ctx.graph.step()
237+
},
238+
incrementalRun: (ctx, i) => {
239+
const user = incrementalUsers[i]
240+
const post1 = incrementalPosts[i * 2]
241+
const post2 = incrementalPosts[i * 2 + 1]
242+
243+
ctx.usersStream.sendData(v([i + 2]), new MultiSet([[[user.id, user], 1]]))
244+
ctx.postsStream.sendData(
245+
v([i + 2]),
246+
new MultiSet([
247+
[[post1.userId, post1], 1],
248+
[[post2.userId, post2], 1],
249+
]),
250+
)
251+
ctx.usersStream.sendFrontier(v([i + 3]))
252+
ctx.postsStream.sendFrontier(v([i + 3]))
253+
ctx.graph.step()
254+
},
255+
})
256256

257257
// Add SQLite join benchmark
258258
joinSuite.add({
@@ -312,65 +312,65 @@ function run({
312312
})
313313

314314
// Add SQLite join with frontier benchmark
315-
// joinSuite.add({
316-
// name: 'D2TS SQLite Join with Frontier',
317-
// setup: () => {
318-
// const sqlite = new Database(':memory:')
319-
// const db = new BetterSQLite3Wrapper(sqlite)
320-
321-
// // Improve the sqlite performance
322-
// db.exec(`PRAGMA journal_mode = WAL;`)
323-
// db.exec(`PRAGMA synchronous = OFF;`)
324-
// db.exec(`PRAGMA temp_store = MEMORY;`)
325-
// db.exec(`PRAGMA cache_size = -100000;`) // 100MB
326-
327-
// const graph = new D2({ initialFrontier: v([0]) })
328-
// const usersStream = graph.newInput<[number, (typeof allUsers)[0]]>()
329-
// const postsStream = graph.newInput<[number, (typeof allPosts)[0]]>()
330-
331-
// const joined = usersStream.pipe(
332-
// joinSql(postsStream, db),
333-
// map(([_key, [user, post]]) => ({
334-
// userName: user.name,
335-
// postTitle: post.title,
336-
// })),
337-
// output((_data) => {
338-
// // do nothing
339-
// }),
340-
// )
341-
342-
// graph.finalize()
343-
// return { graph, usersStream, postsStream, joined, db, sqlite }
344-
// },
345-
// firstRun: (ctx) => {
346-
// ctx.usersStream.sendData(v([1]), initialUsersSet)
347-
// ctx.postsStream.sendData(v([1]), initialPostsSet)
348-
// ctx.usersStream.sendFrontier(v([2]))
349-
// ctx.postsStream.sendFrontier(v([2]))
350-
// ctx.graph.step()
351-
// },
352-
// incrementalRun: (ctx, i) => {
353-
// const user = incrementalUsers[i]
354-
// const post1 = incrementalPosts[i * 2]
355-
// const post2 = incrementalPosts[i * 2 + 1]
356-
357-
// ctx.usersStream.sendData(v([i + 2]), new MultiSet([[[user.id, user], 1]]))
358-
// ctx.postsStream.sendData(
359-
// v([i + 2]),
360-
// new MultiSet([
361-
// [[post1.userId, post1], 1],
362-
// [[post2.userId, post2], 1],
363-
// ]),
364-
// )
365-
// ctx.usersStream.sendFrontier(v([i + 3]))
366-
// ctx.postsStream.sendFrontier(v([i + 3]))
367-
// ctx.graph.step()
368-
// },
369-
// teardown: (ctx) => {
370-
// ctx.db.close()
371-
// ctx.sqlite.close()
372-
// },
373-
// })
315+
joinSuite.add({
316+
name: 'D2TS SQLite Join with Frontier',
317+
setup: () => {
318+
const sqlite = new Database(':memory:')
319+
const db = new BetterSQLite3Wrapper(sqlite)
320+
321+
// Improve the sqlite performance
322+
db.exec(`PRAGMA journal_mode = WAL;`)
323+
db.exec(`PRAGMA synchronous = OFF;`)
324+
db.exec(`PRAGMA temp_store = MEMORY;`)
325+
db.exec(`PRAGMA cache_size = -100000;`) // 100MB
326+
327+
const graph = new D2({ initialFrontier: v([0]) })
328+
const usersStream = graph.newInput<[number, (typeof allUsers)[0]]>()
329+
const postsStream = graph.newInput<[number, (typeof allPosts)[0]]>()
330+
331+
const joined = usersStream.pipe(
332+
joinSql(postsStream, db),
333+
map(([_key, [user, post]]) => ({
334+
userName: user.name,
335+
postTitle: post.title,
336+
})),
337+
output((_data) => {
338+
// do nothing
339+
}),
340+
)
341+
342+
graph.finalize()
343+
return { graph, usersStream, postsStream, joined, db, sqlite }
344+
},
345+
firstRun: (ctx) => {
346+
ctx.usersStream.sendData(v([1]), initialUsersSet)
347+
ctx.postsStream.sendData(v([1]), initialPostsSet)
348+
ctx.usersStream.sendFrontier(v([2]))
349+
ctx.postsStream.sendFrontier(v([2]))
350+
ctx.graph.step()
351+
},
352+
incrementalRun: (ctx, i) => {
353+
const user = incrementalUsers[i]
354+
const post1 = incrementalPosts[i * 2]
355+
const post2 = incrementalPosts[i * 2 + 1]
356+
357+
ctx.usersStream.sendData(v([i + 2]), new MultiSet([[[user.id, user], 1]]))
358+
ctx.postsStream.sendData(
359+
v([i + 2]),
360+
new MultiSet([
361+
[[post1.userId, post1], 1],
362+
[[post2.userId, post2], 1],
363+
]),
364+
)
365+
ctx.usersStream.sendFrontier(v([i + 3]))
366+
ctx.postsStream.sendFrontier(v([i + 3]))
367+
ctx.graph.step()
368+
},
369+
teardown: (ctx) => {
370+
ctx.db.close()
371+
ctx.sqlite.close()
372+
},
373+
})
374374

375375
joinSuite.run()
376376
joinSuite.printResults()

packages/d2ts/src/sqlite/operators/join.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,13 @@ export class JoinOperatorSQLite<K, V1, V2> extends BinaryOperator<
4747
for (const message of this.inputAMessages()) {
4848
if (message.type === MessageType.DATA) {
4949
const { version, collection } = message.data as DataMessage<[K, V1]>
50+
// Batch the inserts
51+
const items: [K, Version, [V1, number]][] = []
5052
for (const [item, multiplicity] of collection.getInner()) {
5153
const [key, value] = item
52-
deltaA.addValue(key, version, [value, multiplicity])
54+
items.push([key, version, [value, multiplicity]])
5355
}
56+
deltaA.addValues(items)
5457
} else if (message.type === MessageType.FRONTIER) {
5558
const frontier = message.data as Antichain
5659
if (!this.inputAFrontier().lessEqual(frontier)) {
@@ -64,10 +67,13 @@ export class JoinOperatorSQLite<K, V1, V2> extends BinaryOperator<
6467
for (const message of this.inputBMessages()) {
6568
if (message.type === MessageType.DATA) {
6669
const { version, collection } = message.data as DataMessage<[K, V2]>
70+
// Batch the inserts
71+
const items: [K, Version, [V2, number]][] = []
6772
for (const [item, multiplicity] of collection.getInner()) {
6873
const [key, value] = item
69-
deltaB.addValue(key, version, [value, multiplicity])
74+
items.push([key, version, [value, multiplicity]])
7075
}
76+
deltaB.addValues(items)
7177
} else if (message.type === MessageType.FRONTIER) {
7278
const frontier = message.data as Antichain
7379
if (!this.inputBFrontier().lessEqual(frontier)) {

0 commit comments

Comments
 (0)