diff --git a/packages/d2ts-benchmark/src/index.ts b/packages/d2ts-benchmark/src/index.ts index 7c5f281..e9f63eb 100644 --- a/packages/d2ts-benchmark/src/index.ts +++ b/packages/d2ts-benchmark/src/index.ts @@ -206,53 +206,53 @@ function run({ }, }) - // // Add D2TS join with frontier benchmark - // joinSuite.add({ - // name: 'D2TS Join with Frontier', - // setup: () => { - // const graph = new D2({ initialFrontier: v([0]) }) - // const usersStream = graph.newInput<[number, (typeof allUsers)[0]]>() - // const postsStream = graph.newInput<[number, (typeof allPosts)[0]]>() - - // const joined = usersStream.pipe( - // join(postsStream), - // map(([_key, [user, post]]) => ({ - // userName: user.name, - // postTitle: post.title, - // })), - // output((_data) => { - // // do nothing - // }), - // ) - - // graph.finalize() - // return { graph, usersStream, postsStream, joined } - // }, - // firstRun: (ctx) => { - // ctx.usersStream.sendData(v([1]), initialUsersSet) - // ctx.postsStream.sendData(v([1]), initialPostsSet) - // ctx.usersStream.sendFrontier(v([2])) - // ctx.postsStream.sendFrontier(v([2])) - // ctx.graph.step() - // }, - // incrementalRun: (ctx, i) => { - // const user = incrementalUsers[i] - // const post1 = incrementalPosts[i * 2] - // const post2 = incrementalPosts[i * 2 + 1] - - // ctx.usersStream.sendData(v([i + 2]), new MultiSet([[[user.id, user], 1]])) - // ctx.postsStream.sendData( - // v([i + 2]), - // new MultiSet([ - // [[post1.userId, post1], 1], - // [[post2.userId, post2], 1], - // ]), - // ) - // ctx.usersStream.sendFrontier(v([i + 3])) - // ctx.postsStream.sendFrontier(v([i + 3])) - // ctx.graph.step() - // }, - // }) + // Add D2TS join with frontier benchmark + joinSuite.add({ + name: 'D2TS Join with Frontier', + setup: () => { + const graph = new D2({ initialFrontier: v([0]) }) + const usersStream = graph.newInput<[number, (typeof allUsers)[0]]>() + const postsStream = graph.newInput<[number, (typeof allPosts)[0]]>() + + const joined = usersStream.pipe( + join(postsStream), + map(([_key, [user, post]]) => ({ + userName: user.name, + postTitle: post.title, + })), + output((_data) => { + // do nothing + }), + ) + + graph.finalize() + return { graph, usersStream, postsStream, joined } + }, + firstRun: (ctx) => { + ctx.usersStream.sendData(v([1]), initialUsersSet) + ctx.postsStream.sendData(v([1]), initialPostsSet) + ctx.usersStream.sendFrontier(v([2])) + ctx.postsStream.sendFrontier(v([2])) + ctx.graph.step() + }, + incrementalRun: (ctx, i) => { + const user = incrementalUsers[i] + const post1 = incrementalPosts[i * 2] + const post2 = incrementalPosts[i * 2 + 1] + + ctx.usersStream.sendData(v([i + 2]), new MultiSet([[[user.id, user], 1]])) + ctx.postsStream.sendData( + v([i + 2]), + new MultiSet([ + [[post1.userId, post1], 1], + [[post2.userId, post2], 1], + ]), + ) + ctx.usersStream.sendFrontier(v([i + 3])) + ctx.postsStream.sendFrontier(v([i + 3])) + ctx.graph.step() + }, + }) // Add SQLite join benchmark joinSuite.add({ @@ -312,65 +312,65 @@ function run({ }) // Add SQLite join with frontier benchmark - // joinSuite.add({ - // name: 'D2TS SQLite Join with Frontier', - // setup: () => { - // const sqlite = new Database(':memory:') - // const db = new BetterSQLite3Wrapper(sqlite) - - // // Improve the sqlite performance - // db.exec(`PRAGMA journal_mode = WAL;`) - // db.exec(`PRAGMA synchronous = OFF;`) - // db.exec(`PRAGMA temp_store = MEMORY;`) - // db.exec(`PRAGMA cache_size = -100000;`) // 100MB - - // const graph = new D2({ initialFrontier: v([0]) }) - // const usersStream = graph.newInput<[number, (typeof allUsers)[0]]>() - // const postsStream = graph.newInput<[number, (typeof allPosts)[0]]>() - - // const joined = usersStream.pipe( - // joinSql(postsStream, db), - // map(([_key, [user, post]]) => ({ - // userName: user.name, - // postTitle: post.title, - // })), - // output((_data) => { - // // do nothing - // }), - // ) - - // graph.finalize() - // return { graph, usersStream, postsStream, joined, db, sqlite } - // }, - // firstRun: (ctx) => { - // ctx.usersStream.sendData(v([1]), initialUsersSet) - // ctx.postsStream.sendData(v([1]), initialPostsSet) - // ctx.usersStream.sendFrontier(v([2])) - // ctx.postsStream.sendFrontier(v([2])) - // ctx.graph.step() - // }, - // incrementalRun: (ctx, i) => { - // const user = incrementalUsers[i] - // const post1 = incrementalPosts[i * 2] - // const post2 = incrementalPosts[i * 2 + 1] - - // ctx.usersStream.sendData(v([i + 2]), new MultiSet([[[user.id, user], 1]])) - // ctx.postsStream.sendData( - // v([i + 2]), - // new MultiSet([ - // [[post1.userId, post1], 1], - // [[post2.userId, post2], 1], - // ]), - // ) - // ctx.usersStream.sendFrontier(v([i + 3])) - // ctx.postsStream.sendFrontier(v([i + 3])) - // ctx.graph.step() - // }, - // teardown: (ctx) => { - // ctx.db.close() - // ctx.sqlite.close() - // }, - // }) + joinSuite.add({ + name: 'D2TS SQLite Join with Frontier', + setup: () => { + const sqlite = new Database(':memory:') + const db = new BetterSQLite3Wrapper(sqlite) + + // Improve the sqlite performance + db.exec(`PRAGMA journal_mode = WAL;`) + db.exec(`PRAGMA synchronous = OFF;`) + db.exec(`PRAGMA temp_store = MEMORY;`) + db.exec(`PRAGMA cache_size = -100000;`) // 100MB + + const graph = new D2({ initialFrontier: v([0]) }) + const usersStream = graph.newInput<[number, (typeof allUsers)[0]]>() + const postsStream = graph.newInput<[number, (typeof allPosts)[0]]>() + + const joined = usersStream.pipe( + joinSql(postsStream, db), + map(([_key, [user, post]]) => ({ + userName: user.name, + postTitle: post.title, + })), + output((_data) => { + // do nothing + }), + ) + + graph.finalize() + return { graph, usersStream, postsStream, joined, db, sqlite } + }, + firstRun: (ctx) => { + ctx.usersStream.sendData(v([1]), initialUsersSet) + ctx.postsStream.sendData(v([1]), initialPostsSet) + ctx.usersStream.sendFrontier(v([2])) + ctx.postsStream.sendFrontier(v([2])) + ctx.graph.step() + }, + incrementalRun: (ctx, i) => { + const user = incrementalUsers[i] + const post1 = incrementalPosts[i * 2] + const post2 = incrementalPosts[i * 2 + 1] + + ctx.usersStream.sendData(v([i + 2]), new MultiSet([[[user.id, user], 1]])) + ctx.postsStream.sendData( + v([i + 2]), + new MultiSet([ + [[post1.userId, post1], 1], + [[post2.userId, post2], 1], + ]), + ) + ctx.usersStream.sendFrontier(v([i + 3])) + ctx.postsStream.sendFrontier(v([i + 3])) + ctx.graph.step() + }, + teardown: (ctx) => { + ctx.db.close() + ctx.sqlite.close() + }, + }) joinSuite.run() joinSuite.printResults() diff --git a/packages/d2ts/src/sqlite/operators/join.ts b/packages/d2ts/src/sqlite/operators/join.ts index 98ee29e..0052329 100644 --- a/packages/d2ts/src/sqlite/operators/join.ts +++ b/packages/d2ts/src/sqlite/operators/join.ts @@ -47,10 +47,13 @@ export class JoinOperatorSQLite extends BinaryOperator< for (const message of this.inputAMessages()) { if (message.type === MessageType.DATA) { const { version, collection } = message.data as DataMessage<[K, V1]> + // Batch the inserts + const items: [K, Version, [V1, number]][] = [] for (const [item, multiplicity] of collection.getInner()) { const [key, value] = item - deltaA.addValue(key, version, [value, multiplicity]) + items.push([key, version, [value, multiplicity]]) } + deltaA.addValues(items) } else if (message.type === MessageType.FRONTIER) { const frontier = message.data as Antichain if (!this.inputAFrontier().lessEqual(frontier)) { @@ -64,10 +67,13 @@ export class JoinOperatorSQLite extends BinaryOperator< for (const message of this.inputBMessages()) { if (message.type === MessageType.DATA) { const { version, collection } = message.data as DataMessage<[K, V2]> + // Batch the inserts + const items: [K, Version, [V2, number]][] = [] for (const [item, multiplicity] of collection.getInner()) { const [key, value] = item - deltaB.addValue(key, version, [value, multiplicity]) + items.push([key, version, [value, multiplicity]]) } + deltaB.addValues(items) } else if (message.type === MessageType.FRONTIER) { const frontier = message.data as Antichain if (!this.inputBFrontier().lessEqual(frontier)) { diff --git a/packages/d2ts/src/sqlite/version-index.ts b/packages/d2ts/src/sqlite/version-index.ts index 35071b9..4729b50 100644 --- a/packages/d2ts/src/sqlite/version-index.ts +++ b/packages/d2ts/src/sqlite/version-index.ts @@ -1,6 +1,9 @@ import { Version, Antichain, v } from '../order.js' import { MultiSet } from '../multiset.js' import { SQLiteDb, SQLiteStatement } from './database.js' +import { DefaultMap } from '../utils.js' + +type VersionMap = DefaultMap interface IndexRow { key: string @@ -21,10 +24,6 @@ interface GetParams { version: string } -interface GetAllForKeyParams { - requestedVersion: string -} - interface JoinResult { key: string this_version: string @@ -35,35 +34,36 @@ interface JoinResult { other_multiplicity: number } +interface PreparedStatements { + insert: SQLiteStatement + get: SQLiteStatement + getVersions: SQLiteStatement<[string], { version: string }> + getAllForKey: SQLiteStatement<[string], IndexRow> + deleteAll: SQLiteStatement + setCompactionFrontier: SQLiteStatement<[string]> + getCompactionFrontier: SQLiteStatement<[], { value: string }> + deleteMeta: SQLiteStatement + getAllKeys: SQLiteStatement<[], { key: string }> + getVersionsForKey: SQLiteStatement<[string], { version: string }> + truncate: SQLiteStatement + truncateMeta: SQLiteStatement + getModifiedKeys: SQLiteStatement<[], { key: string }> + addModifiedKey: SQLiteStatement<[string]> + clearModifiedKey: SQLiteStatement<[string]> + clearAllModifiedKeys: SQLiteStatement + compactKey: SQLiteStatement<[string, string, string]> + deleteOldVersionsData: SQLiteStatement<[string, string]> + getKeysNeedingCompaction: SQLiteStatement<[], { key: string }> + clearModifiedKeys: SQLiteStatement<[string]> +} + export class SQLIndex { #db: SQLiteDb #tableName: string #isTemp: boolean #compactionFrontierCache: Antichain | null = null - #preparedStatements: { - insert: SQLiteStatement - get: SQLiteStatement - getVersions: SQLiteStatement<[string], { version: string }> - getAllForKey: SQLiteStatement<[string, GetAllForKeyParams], IndexRow> - delete: SQLiteStatement<[string]> - deleteAll: SQLiteStatement - getForCompaction: SQLiteStatement<[string], IndexRow> - consolidateVersions: SQLiteStatement<[string, string]> - deleteZeroMultiplicity: SQLiteStatement - setCompactionFrontier: SQLiteStatement<[string]> - getCompactionFrontier: SQLiteStatement<[], { value: string }> - deleteMeta: SQLiteStatement - getAllKeys: SQLiteStatement<[], { key: string }> - getVersionsForKey: SQLiteStatement<[string], { version: string }> - moveDataToNewVersion: SQLiteStatement<[string, string, string]> - deleteOldVersionData: SQLiteStatement<[string, string]> - truncate: SQLiteStatement - truncateMeta: SQLiteStatement - } - - // Change cache to store queries instead of statements - static #appendQueryCache = new Map() - static #joinQueryCache = new Map() + #statementCache = new Map() + #preparedStatements: PreparedStatements constructor(db: SQLiteDb, name: string, isTemp = false) { this.#db = db @@ -88,6 +88,12 @@ export class SQLIndex { ) `) + this.#db.exec(` + CREATE ${isTemp ? 'TEMP' : ''} TABLE IF NOT EXISTS ${this.#tableName}_modified_keys ( + key TEXT PRIMARY KEY + ) + `) + // Create indexes if (!isTemp) { this.#db.exec(` @@ -124,50 +130,13 @@ export class SQLIndex { getAllForKey: this.#db.prepare(` SELECT version, value, multiplicity FROM ${this.#tableName} - WHERE key = ? AND json_array_length(version) <= json_array_length(@requestedVersion) - `), - - delete: this.#db.prepare(` - DELETE FROM ${this.#tableName} - WHERE version = ? + WHERE key = ? `), deleteAll: this.#db.prepare(` DROP TABLE IF EXISTS ${this.#tableName} `), - getForCompaction: this.#db.prepare(` - SELECT key, version, value, multiplicity - FROM ${this.#tableName} - WHERE version = ? - `), - - consolidateVersions: this.#db.prepare(` - WITH consolidated AS ( - SELECT - key, - value, - CAST(SUM(CAST(multiplicity AS BIGINT)) AS INTEGER) as new_multiplicity - FROM ${this.#tableName} - WHERE version = ? - GROUP BY key, value - ) - UPDATE ${this.#tableName} - SET multiplicity = ( - SELECT new_multiplicity - FROM consolidated c - WHERE - c.key = ${this.#tableName}.key AND - c.value = ${this.#tableName}.value - ) - WHERE version = ? - `), - - deleteZeroMultiplicity: this.#db.prepare(` - DELETE FROM ${this.#tableName} - WHERE multiplicity = 0 - `), - setCompactionFrontier: this.#db.prepare(` INSERT OR REPLACE INTO ${this.#tableName}_meta (key, value) VALUES ('compaction_frontier', ?) @@ -192,26 +161,75 @@ export class SQLIndex { WHERE key = ? `), - moveDataToNewVersion: this.#db.prepare(` + truncate: this.#db.prepare(` + DELETE FROM ${this.#tableName} + `), + + truncateMeta: this.#db.prepare(` + DELETE FROM ${this.#tableName}_meta + `), + + getModifiedKeys: this.#db.prepare(` + SELECT key FROM ${this.#tableName}_modified_keys + `), + + addModifiedKey: this.#db.prepare(` + INSERT OR IGNORE INTO ${this.#tableName}_modified_keys (key) + VALUES (?) + `), + + clearModifiedKey: this.#db.prepare(` + DELETE FROM ${this.#tableName}_modified_keys + WHERE key = ? + `), + + clearAllModifiedKeys: this.#db.prepare(` + DELETE FROM ${this.#tableName}_modified_keys + `), + + compactKey: this.#db.prepare(` + WITH moved_data AS ( + -- Move data to new versions and sum multiplicities + SELECT + key, + ? as new_version, -- Parameter 1: New version JSON + value, + SUM(multiplicity) as multiplicity + FROM ${this.#tableName} + WHERE key = ? -- Parameter 2: Key JSON + AND version IN ( -- Parameter 3: Old versions JSON array + SELECT value FROM json_each(?) + ) + GROUP BY key, value + ) INSERT INTO ${this.#tableName} (key, version, value, multiplicity) - SELECT key, ?, value, multiplicity - FROM ${this.#tableName} - WHERE key = ? AND version = ? + SELECT key, new_version, value, multiplicity + FROM moved_data + WHERE multiplicity != 0 ON CONFLICT(key, version, value) DO UPDATE SET - multiplicity = multiplicity + excluded.multiplicity + multiplicity = multiplicity + excluded.multiplicity `), - deleteOldVersionData: this.#db.prepare(` + deleteOldVersionsData: this.#db.prepare(` DELETE FROM ${this.#tableName} - WHERE key = ? AND version = ? + WHERE key = ? + AND version IN (SELECT value FROM json_each(?)) `), - truncate: this.#db.prepare(` - DELETE FROM ${this.#tableName} + getKeysNeedingCompaction: this.#db.prepare(` + SELECT key, COUNT(*) as version_count + FROM ( + SELECT DISTINCT key, version + FROM ${this.#tableName} + WHERE key IN (SELECT key FROM ${this.#tableName}_modified_keys) + ) + GROUP BY key + HAVING version_count > 1 `), - truncateMeta: this.#db.prepare(` - DELETE FROM ${this.#tableName}_meta + clearModifiedKeys: this.#db.prepare(` + DELETE FROM ${this.#tableName}_modified_keys + WHERE key IN (SELECT value FROM json_each(?)) `), } } @@ -262,12 +280,7 @@ export class SQLIndex { reconstructAt(key: K, requestedVersion: Version): [V, number][] { this.#validate(requestedVersion) - const rows = this.#preparedStatements.getAllForKey.all( - JSON.stringify(key), - { - requestedVersion: requestedVersion.toJSON(), - }, - ) + const rows = this.#preparedStatements.getAllForKey.all(JSON.stringify(key)) const result = rows .filter((row) => { @@ -278,6 +291,28 @@ export class SQLIndex { return result as [V, number][] } + get(key: K): VersionMap<[V, number]> { + const rows = this.#preparedStatements.getAllForKey.all(JSON.stringify(key)) + const result = new DefaultMap(() => []) + const compactionFrontier = this.getCompactionFrontier() + for (const row of rows) { + let version = Version.fromJSON(row.version) + if (compactionFrontier && !compactionFrontier.lessEqualVersion(version)) { + version = version.advanceBy(compactionFrontier) + } + result.set(version, [[JSON.parse(row.value) as V, row.multiplicity]]) + } + return result + } + + entries(): [K, VersionMap<[V, number]>][] { + // TODO: This is inefficient, we should use a query to get the entries + const keys = this.#preparedStatements.getAllKeys + .all() + .map((row) => JSON.parse(row.key)) + return keys.map((key) => [key, this.get(key)]) + } + versions(key: K): Version[] { const rows = this.#preparedStatements.getVersions.all(JSON.stringify(key)) const result = rows.map(({ version }) => Version.fromJSON(version)) @@ -287,21 +322,83 @@ export class SQLIndex { addValue(key: K, version: Version, value: [V, number]): void { this.#validate(version) const versionJson = version.toJSON() + const keyJson = JSON.stringify(key) this.#preparedStatements.insert.run({ - key: JSON.stringify(key), + key: keyJson, version: versionJson, value: JSON.stringify(value[0]), multiplicity: value[1], }) + + this.#preparedStatements.addModifiedKey.run(keyJson) + } + + addValues(items: [K, Version, [V, number]][]): void { + // SQLite has a limit of 32766 parameters per query + // Each item uses 4 parameters (key, version, value, multiplicity) + const BATCH_SIZE = Math.floor(32766 / 4) + + for (let i = 0; i < items.length; i += BATCH_SIZE) { + const batch = items.slice(i, i + BATCH_SIZE) + + // Build the parameterized query for this batch + const placeholders = batch.map(() => '(?, ?, ?, ?)').join(',') + + const query = ` + INSERT INTO ${this.#tableName} (key, version, value, multiplicity) + VALUES ${placeholders} + ON CONFLICT(key, version, value) DO + UPDATE SET multiplicity = multiplicity + excluded.multiplicity + ` + + // Create flattened parameters array + const params: (string | number)[] = [] + const modifiedKeys: K[] = [] + + batch.forEach(([key, version, [value, multiplicity]]) => { + this.#validate(version) + params.push( + JSON.stringify(key), + version.toJSON(), + JSON.stringify(value), + multiplicity, + ) + modifiedKeys.push(key) + }) + + // Execute the batch insert + this.#db.prepare(query).run(params) + + // Track modified keys in batch + this.addModifiedKeys(modifiedKeys) + } + } + + addModifiedKeys(keys: K[]): void { + // SQLite has a limit of 32766 parameters per query + const BATCH_SIZE = 32766 + + for (let i = 0; i < keys.length; i += BATCH_SIZE) { + const batch = keys.slice(i, i + BATCH_SIZE) + + const placeholders = batch.map(() => '(?)').join(',') + const query = ` + INSERT OR IGNORE INTO ${this.#tableName}_modified_keys (key) + VALUES ${placeholders} + ` + + const params = batch.map((key) => JSON.stringify(key)) + this.#db.prepare(query).run(params) + } } append(other: SQLIndex): void { - const cacheKey = `${this.#tableName}_${other.tableName}` + const cacheKey = `append_${this.#tableName}_${other.tableName}` - let query = SQLIndex.#appendQueryCache.get(cacheKey) - if (!query) { - query = ` + let stmt = this.#statementCache.get(cacheKey) + if (!stmt) { + const query = ` INSERT OR REPLACE INTO ${this.#tableName} (key, version, value, multiplicity) SELECT o.key, @@ -314,18 +411,31 @@ export class SQLIndex { AND t.version = o.version AND t.value = o.value ` - SQLIndex.#appendQueryCache.set(cacheKey, query) + stmt = this.#db.prepare(query) + this.#statementCache.set(cacheKey, stmt) + } + + stmt.run() + + const modifiedKeysCacheKey = `append_modified_keys_${this.#tableName}_${other.tableName}` + let modifiedKeysStmt = this.#statementCache.get(modifiedKeysCacheKey) + if (!modifiedKeysStmt) { + modifiedKeysStmt = this.#db.prepare(` + INSERT OR IGNORE INTO ${this.#tableName}_modified_keys (key) + SELECT DISTINCT key FROM ${other.tableName} + `) + this.#statementCache.set(modifiedKeysCacheKey, modifiedKeysStmt) } - this.#db.prepare(query).run() + modifiedKeysStmt.run() } join(other: SQLIndex): [Version, MultiSet<[K, [V, V2]]>][] { - const cacheKey = `${this.#tableName}_${other.tableName}` + const cacheKey = `join_${this.#tableName}_${other.tableName}` - let query = SQLIndex.#joinQueryCache.get(cacheKey) - if (!query) { - query = ` + let stmt = this.#statementCache.get(cacheKey) + if (!stmt) { + const query = ` SELECT a.key, a.version as this_version, @@ -337,10 +447,11 @@ export class SQLIndex { FROM ${this.#tableName} a JOIN ${other.tableName} b ON a.key = b.key ` - SQLIndex.#joinQueryCache.set(cacheKey, query) + stmt = this.#db.prepare(query) + this.#statementCache.set(cacheKey, stmt) } - const results = this.#db.prepare(query).all() as JoinResult[] + const results = stmt.all() as JoinResult[] const collections = new Map() @@ -353,6 +464,21 @@ export class SQLIndex { const mul1 = row.this_multiplicity const mul2 = row.other_multiplicity + const compactionFrontier1 = this.getCompactionFrontier() + const compactionFrontier2 = other.getCompactionFrontier() + if ( + compactionFrontier1 && + compactionFrontier1.lessEqualVersion(version1) + ) { + version1.advanceBy(compactionFrontier1) + } + if ( + compactionFrontier2 && + compactionFrontier2.lessEqualVersion(version2) + ) { + version2.advanceBy(compactionFrontier2) + } + const resultVersion = version1.join(version2) const versionKey = resultVersion.toJSON() @@ -373,20 +499,6 @@ export class SQLIndex { return result as [Version, MultiSet<[K, [V, V2]]>][] } - destroy(): void { - this.#preparedStatements.deleteMeta.run() - this.#preparedStatements.deleteAll.run() - // Just clear the query caches - SQLIndex.#appendQueryCache.clear() - SQLIndex.#joinQueryCache.clear() - this.#compactionFrontierCache = null - } - - static clearStatementCaches(): void { - SQLIndex.#appendQueryCache.clear() - SQLIndex.#joinQueryCache.clear() - } - compact(compactionFrontier: Antichain, keys: K[] = []): void { const existingFrontier = this.getCompactionFrontier() if (existingFrontier && !existingFrontier.lessEqual(compactionFrontier)) { @@ -395,63 +507,69 @@ export class SQLIndex { this.#validate(compactionFrontier) - // Get all keys if none provided - const keysToProcess = + // Get all keys that were modified + const allKeysToProcess = keys.length > 0 ? keys - : (() => { - const rows = this.#preparedStatements.getAllKeys.all() - return rows.map((row) => JSON.parse(row.key)) - })() + : this.#preparedStatements.getModifiedKeys + .all() + .map((row) => JSON.parse(row.key)) - // Process each key - for (const key of keysToProcess) { - // Get versions for this key that need compaction - const versionsToCompact = this.#preparedStatements.getVersionsForKey - .all(JSON.stringify(key)) - .map((row) => row.version) + if (allKeysToProcess.length === 0) return - const toCompact = versionsToCompact.filter((versionJson) => { - const version = Version.fromJSON(versionJson) - return !compactionFrontier.lessEqualVersion(version) - }) + // Get keys that actually need compaction (have multiple versions) + const keysToProcessWithMultipleVersions = + this.#preparedStatements.getKeysNeedingCompaction + .all() + .map((row) => JSON.parse(row.key) as K) + .filter((key) => allKeysToProcess.includes(key)) - // Track versions that need consolidation - const toConsolidate = new Set() + // Process each key that needs compaction + for (const key of keysToProcessWithMultipleVersions) { + const keyJson = JSON.stringify(key) - // Process each version that needs compaction - for (const oldVersionJson of toCompact) { + // Get versions for this key that need compaction + const versionsToCompact = this.#preparedStatements.getVersionsForKey + .all(keyJson) + .map((row) => Version.fromJSON(row.version)) + .filter((version) => !compactionFrontier.lessEqualVersion(version)) + .map((version) => version.toJSON()) + + // Group versions by their target version after compaction + const versionGroups = new Map() + for (const oldVersionJson of versionsToCompact) { const oldVersion = Version.fromJSON(oldVersionJson) const newVersion = oldVersion.advanceBy(compactionFrontier) const newVersionJson = newVersion.toJSON() - // Move data to new version - this.#preparedStatements.moveDataToNewVersion.run( + if (!versionGroups.has(newVersionJson)) { + versionGroups.set(newVersionJson, []) + } + versionGroups.get(newVersionJson)!.push(oldVersionJson) + } + + // Process each group in a single query + for (const [newVersionJson, oldVersionJsons] of versionGroups) { + // Compact all versions in this group to the new version + this.#preparedStatements.compactKey.run( newVersionJson, - JSON.stringify(key), - oldVersionJson, + keyJson, + JSON.stringify(oldVersionJsons), ) - // Delete old version data - this.#preparedStatements.deleteOldVersionData.run( - JSON.stringify(key), - oldVersionJson, + // Delete all old versions data at once + this.#preparedStatements.deleteOldVersionsData.run( + keyJson, + JSON.stringify(oldVersionJsons), ) - - toConsolidate.add(newVersionJson) } + } - // Consolidate values for each affected version - for (const versionJson of toConsolidate) { - // Consolidate by summing multiplicities - this.#preparedStatements.consolidateVersions.run( - versionJson, - versionJson, - ) - - // Remove entries with zero multiplicity - this.#preparedStatements.deleteZeroMultiplicity.run() - } + // Clear processed keys from modified keys table in a single query + if (allKeysToProcess.length > 0) { + this.#preparedStatements.clearModifiedKeys.run( + JSON.stringify(allKeysToProcess.map((k) => JSON.stringify(k))), + ) } this.setCompactionFrontier(compactionFrontier) @@ -472,6 +590,15 @@ export class SQLIndex { truncate(): void { this.#preparedStatements.truncate.run() this.#preparedStatements.truncateMeta.run() + this.#preparedStatements.clearAllModifiedKeys.run() + this.#compactionFrontierCache = null + } + + destroy(): void { + this.#preparedStatements.deleteMeta.run() + this.#preparedStatements.deleteAll.run() + this.#db.exec(`DROP TABLE IF EXISTS ${this.#tableName}_modified_keys`) + this.#statementCache.clear() this.#compactionFrontierCache = null } } diff --git a/packages/d2ts/src/version-index.ts b/packages/d2ts/src/version-index.ts index 9cde34b..1ca2e5c 100644 --- a/packages/d2ts/src/version-index.ts +++ b/packages/d2ts/src/version-index.ts @@ -28,12 +28,20 @@ export interface IndexType { export class Index implements IndexType { #inner: IndexMap #compactionFrontier: Antichain | null + #modifiedKeys: Set constructor() { this.#inner = new DefaultMap>( () => new DefaultMap(() => []), ) + // #inner is as map of: + // { + // [key]: { + // [version]: [value, multiplicity] + // } + // } this.#compactionFrontier = null + this.#modifiedKeys = new Set() } toString(indent = false): string { @@ -73,8 +81,29 @@ export class Index implements IndexType { return out } + get(key: K): VersionMap<[V, number]> { + if (!this.#compactionFrontier) return this.#inner.get(key) + // versions may be older than the compaction frontier, so we need to + // advance them to it. This is due to not rewriting the whole version index + // to the compaction frontier as part of the compact operation. + const versions = this.#inner.get(key).entries() + const out = new DefaultMap(() => []) + for (const [rawVersion, values] of versions) { + let version = rawVersion + if (!this.#compactionFrontier.lessEqualVersion(rawVersion)) { + version = rawVersion.advanceBy(this.#compactionFrontier) + } + out.set(version, values) + } + return out + } + + entries(): [K, VersionMap<[V, number]>][] { + return this.keys().map((key) => [key, this.get(key)]) + } + versions(key: K): Version[] { - const result = Array.from(this.#inner.get(key).keys()) + const result = Array.from(this.get(key).keys()) return result } @@ -85,17 +114,19 @@ export class Index implements IndexType { values.push(value) return values }) + this.#modifiedKeys.add(key) } append(other: Index): void { - for (const [key, versions] of other.#inner) { - const thisVersions = this.#inner.get(key) + for (const [key, versions] of other.entries()) { + const thisVersions = this.get(key) for (const [version, data] of versions) { thisVersions.update(version, (values) => { chunkedArrayPush(values, data) return values }) } + this.#modifiedKeys.add(key) } } @@ -106,45 +137,43 @@ export class Index implements IndexType { // We want to iterate over the smaller of the two indexes to reduce the // number of operations we need to do. - let inner1 - let inner2 - let direction: 'left' | 'right' if (this.#inner.size <= other.#inner.size) { - inner1 = this.#inner - inner2 = other.#inner - direction = 'left' + for (const [key, versions] of this.#inner) { + if (!other.has(key)) continue + const otherVersions = other.get(key) + for (const [rawVersion1, data1] of versions) { + const version1 = + this.#compactionFrontier && + this.#compactionFrontier.lessEqualVersion(rawVersion1) + ? rawVersion1.advanceBy(this.#compactionFrontier) + : rawVersion1 + for (const [version2, data2] of otherVersions) { + for (const [val1, mul1] of data1) { + for (const [val2, mul2] of data2) { + const resultVersion = version1.join(version2) + collections.update(resultVersion, (existing) => { + existing.push([key, [val1, val2], mul1 * mul2]) + return existing + }) + } + } + } + } + } } else { - inner1 = other.#inner - inner2 = this.#inner - direction = 'right' - } - - for (const [key, versions1] of inner1) { - if (!inner2.has(key)) continue - - const versions2 = inner2.get(key) - - for (const [version1, data1] of versions1) { - for (const [version2, data2] of versions2) { - for (const [val1, mul1] of data1) { + for (const [key, otherVersions] of other.entries()) { + if (!this.has(key)) continue + const versions = this.get(key) + for (const [version2, data2] of otherVersions) { + for (const [version1, data1] of versions) { for (const [val2, mul2] of data2) { - const resultVersion = version1.join(version2) - collections.update(resultVersion, (existing) => { - if (direction === 'left') { - existing.push([key, [val1, val2], mul1 * mul2] as [ - K, - [V, V2], - number, - ]) - } else { - existing.push([key, [val2, val1], mul1 * mul2] as [ - K, - [V, V2], - number, - ]) - } - return existing - }) + for (const [val1, mul1] of data1) { + const resultVersion = version1.join(version2) + collections.update(resultVersion, (existing) => { + existing.push([key, [val1, val2], mul1 * mul2]) + return existing + }) + } } } } @@ -190,7 +219,7 @@ export class Index implements IndexType { } const keysToProcess = - keys.length > 0 ? keys : Array.from(this.#inner.keys()) + keys.length > 0 ? keys : Array.from(this.#modifiedKeys) for (const key of keysToProcess) { const versions = this.#inner.get(key) @@ -221,6 +250,7 @@ export class Index implements IndexType { this.#inner.delete(key) } } + this.#modifiedKeys.delete(key) } this.#compactionFrontier = compactionFrontier diff --git a/packages/d2ts/tests/version-index.test.ts b/packages/d2ts/tests/version-index.test.ts index c31096d..90a0fe5 100644 --- a/packages/d2ts/tests/version-index.test.ts +++ b/packages/d2ts/tests/version-index.test.ts @@ -330,29 +330,6 @@ function createIndexTests< expect(result).toEqual([[10, 2]]) }) - test('should handle selective key compaction', () => { - const version1 = v([1]) - const version2 = v([2]) - const frontier = new Antichain([v([2])]) - - index.addValue('key1', version1, [10, 3]) - index.addValue('key2', version1, [20, 2]) - index.addValue('key1', version2, [10, -1]) - index.addValue('key2', version2, [20, 3]) - - // Only compact 'key1' - index.compact(frontier, ['key1']) - - // key1 should be compacted - expect(index.reconstructAt('key1', version2)).toEqual([[10, 2]]) - - // key2 should maintain original versions - const key2Versions = index.versions('key2') - expect(key2Versions).toHaveLength(2) - expect(key2Versions).toContainEqual(version1) - expect(key2Versions).toContainEqual(version2) - }) - test('should throw error for invalid compaction frontier', () => { const version = v([1]) const frontier1 = new Antichain([v([2])])