Skip to content

Commit f7bcc41

Browse files
committed
Improve performace of join by reusing delta index and prepared statments
1 parent 211acc2 commit f7bcc41

File tree

2 files changed

+97
-55
lines changed

2 files changed

+97
-55
lines changed

packages/d2ts/src/sqlite/operators/join.ts

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ export class JoinOperatorSQLite<K, V1, V2> extends BinaryOperator<
2020
> {
2121
#indexA: SQLIndex<K, V1>
2222
#indexB: SQLIndex<K, V2>
23-
#db: SQLiteDb
23+
#deltaA: SQLIndex<K, V1>
24+
#deltaB: SQLIndex<K, V2>
2425

2526
constructor(
2627
id: number,
@@ -31,18 +32,15 @@ export class JoinOperatorSQLite<K, V1, V2> extends BinaryOperator<
3132
db: SQLiteDb,
3233
) {
3334
super(id, inputA, inputB, output, initialFrontier)
34-
this.#db = db
3535
this.#indexA = new SQLIndex<K, V1>(db, `join_a_${id}`)
3636
this.#indexB = new SQLIndex<K, V2>(db, `join_b_${id}`)
37+
this.#deltaA = new SQLIndex<K, V1>(db, `join_delta_a_${id}`)
38+
this.#deltaB = new SQLIndex<K, V2>(db, `join_delta_b_${id}`)
3739
}
3840

3941
run(): void {
40-
const db = this.#db
41-
const id = this.id
42-
43-
// Create temporary indexes for this iteration
44-
const deltaA = new SQLIndex<K, V1>(db, `join_delta_a_${id}`, true)
45-
const deltaB = new SQLIndex<K, V2>(db, `join_delta_b_${id}`, true)
42+
const deltaA = this.#deltaA
43+
const deltaB = this.#deltaB
4644

4745
try {
4846
// Process input A
@@ -120,8 +118,8 @@ export class JoinOperatorSQLite<K, V1, V2> extends BinaryOperator<
120118
}
121119
} finally {
122120
// Clean up temporary indexes
123-
deltaA.destroy()
124-
deltaB.destroy()
121+
deltaA.truncate()
122+
deltaB.truncate()
125123
}
126124
}
127125
}

packages/d2ts/src/sqlite/version-index.ts

Lines changed: 89 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,15 @@ export class SQLIndex<K, V> {
6161
>
6262
moveDataToNewVersion: SQLiteStatement<[number, string, number]>
6363
deleteOldVersionData: SQLiteStatement<[string, number]>
64+
truncate: SQLiteStatement
65+
truncateMeta: SQLiteStatement
66+
truncateVersions: SQLiteStatement
6467
}
6568

69+
// Cache for frequently used queries
70+
static #appendQueryCache = new Map<string, string>()
71+
static #joinQueryCache = new Map<string, string>()
72+
6673
constructor(db: SQLiteDb, name: string, isTemp = false) {
6774
this.#db = db
6875
this.#tableName = `index_${name}`
@@ -251,6 +258,18 @@ export class SQLIndex<K, V> {
251258
DELETE FROM ${this.#tableName}
252259
WHERE key = ? AND version_id = ?
253260
`),
261+
262+
truncate: this.#db.prepare(`
263+
DELETE FROM ${this.#tableName}
264+
`),
265+
266+
truncateMeta: this.#db.prepare(`
267+
DELETE FROM ${this.#tableName}_meta
268+
`),
269+
270+
truncateVersions: this.#db.prepare(`
271+
DELETE FROM ${this.#versionTableName}
272+
`),
254273
}
255274
}
256275

@@ -356,56 +375,67 @@ export class SQLIndex<K, V> {
356375
`
357376
this.#db.prepare(copyVersionsQuery).run()
358377

359-
// Now copy all data from the other index into this one
360-
const insertQuery = `
361-
INSERT OR REPLACE INTO ${this.#tableName} (key, version_id, value, multiplicity)
362-
SELECT
363-
o.key,
364-
v2.id as version_id,
365-
o.value,
366-
COALESCE(t.multiplicity, 0) + o.multiplicity as multiplicity
367-
FROM ${other.tableName} o
368-
JOIN ${other.#versionTableName} v1 ON v1.id = o.version_id
369-
JOIN ${this.#versionTableName} v2 ON v2.version = v1.version
370-
LEFT JOIN ${this.#tableName} t
371-
ON t.key = o.key
372-
AND t.version_id = v2.id
373-
AND t.value = o.value
374-
`
375-
this.#db.prepare(insertQuery).run()
378+
// Now use the cached query for the data copy
379+
const cacheKey = `${this.#tableName}_${other.tableName}`
380+
381+
let query = SQLIndex.#appendQueryCache.get(cacheKey)
382+
if (!query) {
383+
query = `
384+
INSERT OR REPLACE INTO ${this.#tableName} (key, version_id, value, multiplicity)
385+
SELECT
386+
o.key,
387+
v2.id as version_id,
388+
o.value,
389+
COALESCE(t.multiplicity, 0) + o.multiplicity as multiplicity
390+
FROM ${other.tableName} o
391+
JOIN ${other.#versionTableName} v1 ON v1.id = o.version_id
392+
JOIN ${this.#versionTableName} v2 ON v2.version = v1.version
393+
LEFT JOIN ${this.#tableName} t
394+
ON t.key = o.key
395+
AND t.version_id = v2.id
396+
AND t.value = o.value
397+
`
398+
SQLIndex.#appendQueryCache.set(cacheKey, query)
399+
}
400+
401+
this.#db.prepare(query).run()
376402
}
377403

378404
join<V2>(other: SQLIndex<K, V2>): [Version, MultiSet<[K, [V, V2]]>][] {
379-
// Create the join query dynamically with the actual table names
380-
const joinQuery = `
381-
SELECT
382-
a.key,
383-
(
384-
WITH RECURSIVE numbers(i) AS (
385-
SELECT 0
386-
UNION ALL
387-
SELECT i + 1 FROM numbers
388-
WHERE i < json_array_length(va.version) - 1
389-
)
390-
SELECT json_group_array(
391-
MAX(
392-
json_extract(va.version, '$[' || i || ']'),
393-
json_extract(vb.version, '$[' || i || ']')
405+
const cacheKey = `${this.#tableName}_${other.tableName}`
406+
407+
let query = SQLIndex.#joinQueryCache.get(cacheKey)
408+
if (!query) {
409+
query = `
410+
SELECT
411+
a.key,
412+
(
413+
WITH RECURSIVE numbers(i) AS (
414+
SELECT 0
415+
UNION ALL
416+
SELECT i + 1 FROM numbers
417+
WHERE i < json_array_length(va.version) - 1
394418
)
395-
)
396-
FROM numbers
397-
) as version,
398-
json_array(json(a.value), json(b.value)) as joined_value,
399-
a.multiplicity * b.multiplicity as multiplicity
400-
FROM ${this.#tableName} a
401-
JOIN ${this.#versionTableName} va ON va.id = a.version_id
402-
JOIN ${other.tableName} b ON a.key = b.key
403-
JOIN ${other.#versionTableName} vb ON vb.id = b.version_id
404-
GROUP BY a.key, a.value, b.value
405-
`
419+
SELECT json_group_array(
420+
MAX(
421+
json_extract(va.version, '$[' || i || ']'),
422+
json_extract(vb.version, '$[' || i || ']')
423+
)
424+
)
425+
FROM numbers
426+
) as version,
427+
json_array(json(a.value), json(b.value)) as joined_value,
428+
a.multiplicity * b.multiplicity as multiplicity
429+
FROM ${this.#tableName} a
430+
JOIN ${this.#versionTableName} va ON va.id = a.version_id
431+
JOIN ${other.tableName} b ON a.key = b.key
432+
JOIN ${other.#versionTableName} vb ON vb.id = b.version_id
433+
GROUP BY a.key, a.value, b.value
434+
`
435+
SQLIndex.#joinQueryCache.set(cacheKey, query)
436+
}
406437

407-
// Execute the query directly
408-
const results = this.#db.prepare(joinQuery).all() as JoinResult[]
438+
const results = this.#db.prepare(query).all() as JoinResult[]
409439

410440
const versionMap = new Map<string, MultiSet<[K, [V, V2]]>>()
411441

@@ -431,6 +461,9 @@ export class SQLIndex<K, V> {
431461
this.#preparedStatements.deleteMeta.run()
432462
this.#preparedStatements.deleteAll.run()
433463
this.#preparedStatements.deleteAllVersions.run()
464+
// Clear the query caches
465+
SQLIndex.#appendQueryCache.clear()
466+
SQLIndex.#joinQueryCache.clear()
434467
}
435468

436469
compact(compactionFrontier: Antichain, keys: K[] = []): void {
@@ -536,4 +569,15 @@ export class SQLIndex<K, V> {
536569
multiplicity: number
537570
}[]
538571
}
572+
573+
static clearStatementCaches(): void {
574+
SQLIndex.#appendQueryCache.clear()
575+
SQLIndex.#joinQueryCache.clear()
576+
}
577+
578+
truncate(): void {
579+
this.#preparedStatements.truncate.run()
580+
this.#preparedStatements.truncateMeta.run()
581+
this.#preparedStatements.truncateVersions.run()
582+
}
539583
}

0 commit comments

Comments
 (0)