Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 106 additions & 106 deletions packages/d2ts-benchmark/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,53 +206,53 @@ function run({
},
})

// // Add D2TS join with frontier benchmark
// joinSuite.add({
// name: 'D2TS Join with Frontier',
// setup: () => {
// const graph = new D2({ initialFrontier: v([0]) })
// const usersStream = graph.newInput<[number, (typeof allUsers)[0]]>()
// const postsStream = graph.newInput<[number, (typeof allPosts)[0]]>()

// const joined = usersStream.pipe(
// join(postsStream),
// map(([_key, [user, post]]) => ({
// userName: user.name,
// postTitle: post.title,
// })),
// output((_data) => {
// // do nothing
// }),
// )

// graph.finalize()
// return { graph, usersStream, postsStream, joined }
// },
// firstRun: (ctx) => {
// ctx.usersStream.sendData(v([1]), initialUsersSet)
// ctx.postsStream.sendData(v([1]), initialPostsSet)
// ctx.usersStream.sendFrontier(v([2]))
// ctx.postsStream.sendFrontier(v([2]))
// ctx.graph.step()
// },
// incrementalRun: (ctx, i) => {
// const user = incrementalUsers[i]
// const post1 = incrementalPosts[i * 2]
// const post2 = incrementalPosts[i * 2 + 1]

// ctx.usersStream.sendData(v([i + 2]), new MultiSet([[[user.id, user], 1]]))
// ctx.postsStream.sendData(
// v([i + 2]),
// new MultiSet([
// [[post1.userId, post1], 1],
// [[post2.userId, post2], 1],
// ]),
// )
// ctx.usersStream.sendFrontier(v([i + 3]))
// ctx.postsStream.sendFrontier(v([i + 3]))
// ctx.graph.step()
// },
// })
// Add D2TS join with frontier benchmark
joinSuite.add({
name: 'D2TS Join with Frontier',
setup: () => {
const graph = new D2({ initialFrontier: v([0]) })
const usersStream = graph.newInput<[number, (typeof allUsers)[0]]>()
const postsStream = graph.newInput<[number, (typeof allPosts)[0]]>()

const joined = usersStream.pipe(
join(postsStream),
map(([_key, [user, post]]) => ({
userName: user.name,
postTitle: post.title,
})),
output((_data) => {
// do nothing
}),
)

graph.finalize()
return { graph, usersStream, postsStream, joined }
},
firstRun: (ctx) => {
ctx.usersStream.sendData(v([1]), initialUsersSet)
ctx.postsStream.sendData(v([1]), initialPostsSet)
ctx.usersStream.sendFrontier(v([2]))
ctx.postsStream.sendFrontier(v([2]))
ctx.graph.step()
},
incrementalRun: (ctx, i) => {
const user = incrementalUsers[i]
const post1 = incrementalPosts[i * 2]
const post2 = incrementalPosts[i * 2 + 1]

ctx.usersStream.sendData(v([i + 2]), new MultiSet([[[user.id, user], 1]]))
ctx.postsStream.sendData(
v([i + 2]),
new MultiSet([
[[post1.userId, post1], 1],
[[post2.userId, post2], 1],
]),
)
ctx.usersStream.sendFrontier(v([i + 3]))
ctx.postsStream.sendFrontier(v([i + 3]))
ctx.graph.step()
},
})

// Add SQLite join benchmark
joinSuite.add({
Expand Down Expand Up @@ -312,65 +312,65 @@ function run({
})

// Add SQLite join with frontier benchmark
// joinSuite.add({
// name: 'D2TS SQLite Join with Frontier',
// setup: () => {
// const sqlite = new Database(':memory:')
// const db = new BetterSQLite3Wrapper(sqlite)

// // Improve the sqlite performance
// db.exec(`PRAGMA journal_mode = WAL;`)
// db.exec(`PRAGMA synchronous = OFF;`)
// db.exec(`PRAGMA temp_store = MEMORY;`)
// db.exec(`PRAGMA cache_size = -100000;`) // 100MB

// const graph = new D2({ initialFrontier: v([0]) })
// const usersStream = graph.newInput<[number, (typeof allUsers)[0]]>()
// const postsStream = graph.newInput<[number, (typeof allPosts)[0]]>()

// const joined = usersStream.pipe(
// joinSql(postsStream, db),
// map(([_key, [user, post]]) => ({
// userName: user.name,
// postTitle: post.title,
// })),
// output((_data) => {
// // do nothing
// }),
// )

// graph.finalize()
// return { graph, usersStream, postsStream, joined, db, sqlite }
// },
// firstRun: (ctx) => {
// ctx.usersStream.sendData(v([1]), initialUsersSet)
// ctx.postsStream.sendData(v([1]), initialPostsSet)
// ctx.usersStream.sendFrontier(v([2]))
// ctx.postsStream.sendFrontier(v([2]))
// ctx.graph.step()
// },
// incrementalRun: (ctx, i) => {
// const user = incrementalUsers[i]
// const post1 = incrementalPosts[i * 2]
// const post2 = incrementalPosts[i * 2 + 1]

// ctx.usersStream.sendData(v([i + 2]), new MultiSet([[[user.id, user], 1]]))
// ctx.postsStream.sendData(
// v([i + 2]),
// new MultiSet([
// [[post1.userId, post1], 1],
// [[post2.userId, post2], 1],
// ]),
// )
// ctx.usersStream.sendFrontier(v([i + 3]))
// ctx.postsStream.sendFrontier(v([i + 3]))
// ctx.graph.step()
// },
// teardown: (ctx) => {
// ctx.db.close()
// ctx.sqlite.close()
// },
// })
joinSuite.add({
name: 'D2TS SQLite Join with Frontier',
setup: () => {
const sqlite = new Database(':memory:')
const db = new BetterSQLite3Wrapper(sqlite)

// Improve the sqlite performance
db.exec(`PRAGMA journal_mode = WAL;`)
db.exec(`PRAGMA synchronous = OFF;`)
db.exec(`PRAGMA temp_store = MEMORY;`)
db.exec(`PRAGMA cache_size = -100000;`) // 100MB

const graph = new D2({ initialFrontier: v([0]) })
const usersStream = graph.newInput<[number, (typeof allUsers)[0]]>()
const postsStream = graph.newInput<[number, (typeof allPosts)[0]]>()

const joined = usersStream.pipe(
joinSql(postsStream, db),
map(([_key, [user, post]]) => ({
userName: user.name,
postTitle: post.title,
})),
output((_data) => {
// do nothing
}),
)

graph.finalize()
return { graph, usersStream, postsStream, joined, db, sqlite }
},
firstRun: (ctx) => {
ctx.usersStream.sendData(v([1]), initialUsersSet)
ctx.postsStream.sendData(v([1]), initialPostsSet)
ctx.usersStream.sendFrontier(v([2]))
ctx.postsStream.sendFrontier(v([2]))
ctx.graph.step()
},
incrementalRun: (ctx, i) => {
const user = incrementalUsers[i]
const post1 = incrementalPosts[i * 2]
const post2 = incrementalPosts[i * 2 + 1]

ctx.usersStream.sendData(v([i + 2]), new MultiSet([[[user.id, user], 1]]))
ctx.postsStream.sendData(
v([i + 2]),
new MultiSet([
[[post1.userId, post1], 1],
[[post2.userId, post2], 1],
]),
)
ctx.usersStream.sendFrontier(v([i + 3]))
ctx.postsStream.sendFrontier(v([i + 3]))
ctx.graph.step()
},
teardown: (ctx) => {
ctx.db.close()
ctx.sqlite.close()
},
})

joinSuite.run()
joinSuite.printResults()
Expand Down
10 changes: 8 additions & 2 deletions packages/d2ts/src/sqlite/operators/join.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,13 @@ export class JoinOperatorSQLite<K, V1, V2> extends BinaryOperator<
for (const message of this.inputAMessages()) {
if (message.type === MessageType.DATA) {
const { version, collection } = message.data as DataMessage<[K, V1]>
// Batch the inserts
const items: [K, Version, [V1, number]][] = []
for (const [item, multiplicity] of collection.getInner()) {
const [key, value] = item
deltaA.addValue(key, version, [value, multiplicity])
items.push([key, version, [value, multiplicity]])
}
deltaA.addValues(items)
} else if (message.type === MessageType.FRONTIER) {
const frontier = message.data as Antichain
if (!this.inputAFrontier().lessEqual(frontier)) {
Expand All @@ -64,10 +67,13 @@ export class JoinOperatorSQLite<K, V1, V2> extends BinaryOperator<
for (const message of this.inputBMessages()) {
if (message.type === MessageType.DATA) {
const { version, collection } = message.data as DataMessage<[K, V2]>
// Batch the inserts
const items: [K, Version, [V2, number]][] = []
for (const [item, multiplicity] of collection.getInner()) {
const [key, value] = item
deltaB.addValue(key, version, [value, multiplicity])
items.push([key, version, [value, multiplicity]])
}
deltaB.addValues(items)
} else if (message.type === MessageType.FRONTIER) {
const frontier = message.data as Antichain
if (!this.inputBFrontier().lessEqual(frontier)) {
Expand Down
Loading
Loading