diff --git a/packages/d2ts/src/multiset.ts b/packages/d2ts/src/multiset.ts index 2eef6fd..b2c4cab 100644 --- a/packages/d2ts/src/multiset.ts +++ b/packages/d2ts/src/multiset.ts @@ -1,4 +1,4 @@ -import { DefaultMap } from './utils.js' +import { DefaultMap, chunkedArrayPush } from './utils.js' export type MultiSetArray = [T, number][] export type KeyedData = [key: string, value: T] @@ -55,8 +55,8 @@ export class MultiSet { */ concat(other: MultiSet): MultiSet { const out: MultiSetArray = [] - out.push(...this.#inner) - out.push(...other.#inner) + chunkedArrayPush(out, this.#inner) + chunkedArrayPush(out, other.getInner()) return new MultiSet(out) } @@ -93,7 +93,7 @@ export class MultiSet { const out: MultiSetArray> = [] for (const [[k1, v1], d1] of this.#inner as MultiSetArray>) { - for (const [[k2, v2], d2] of other.#inner) { + for (const [[k2, v2], d2] of other.getInner()) { if (k1 === k2) { out.push([[k1, [v1, v2]], d1 * d2]) } @@ -283,11 +283,8 @@ export class MultiSet { } extend(other: MultiSet | MultiSetArray): void { - if (other instanceof MultiSet) { - this.#inner.push(...other.getInner()) - } else { - this.#inner.push(...other) - } + const otherArray = other instanceof MultiSet ? other.getInner() : other + chunkedArrayPush(this.#inner, otherArray) } getInner(): MultiSetArray { diff --git a/packages/d2ts/src/sqlite/operators/join.ts b/packages/d2ts/src/sqlite/operators/join.ts index cf76889..98ee29e 100644 --- a/packages/d2ts/src/sqlite/operators/join.ts +++ b/packages/d2ts/src/sqlite/operators/join.ts @@ -34,8 +34,8 @@ export class JoinOperatorSQLite extends BinaryOperator< super(id, inputA, inputB, output, initialFrontier) this.#indexA = new SQLIndex(db, `join_a_${id}`) this.#indexB = new SQLIndex(db, `join_b_${id}`) - this.#deltaA = new SQLIndex(db, `join_delta_a_${id}`) - this.#deltaB = new SQLIndex(db, `join_delta_b_${id}`) + this.#deltaA = new SQLIndex(db, `join_delta_a_${id}`, true) + this.#deltaB = new SQLIndex(db, `join_delta_b_${id}`, true) } run(): void { diff --git a/packages/d2ts/src/sqlite/version-index.ts b/packages/d2ts/src/sqlite/version-index.ts index 80378b7..35071b9 100644 --- a/packages/d2ts/src/sqlite/version-index.ts +++ b/packages/d2ts/src/sqlite/version-index.ts @@ -11,7 +11,7 @@ interface IndexRow { interface InsertParams { key: string - version_id: number + version: string value: string multiplicity: number } @@ -27,16 +27,19 @@ interface GetAllForKeyParams { interface JoinResult { key: string - version: string - joined_value: string - multiplicity: number + this_version: string + other_version: string + this_value: string + other_value: string + this_multiplicity: number + other_multiplicity: number } export class SQLIndex { #db: SQLiteDb #tableName: string - #versionTableName: string #isTemp: boolean + #compactionFrontierCache: Antichain | null = null #preparedStatements: { insert: SQLiteStatement get: SQLiteStatement @@ -44,54 +47,37 @@ export class SQLIndex { getAllForKey: SQLiteStatement<[string, GetAllForKeyParams], IndexRow> delete: SQLiteStatement<[string]> deleteAll: SQLiteStatement - deleteAllVersions: SQLiteStatement getForCompaction: SQLiteStatement<[string], IndexRow> consolidateVersions: SQLiteStatement<[string, string]> - insertVersion: SQLiteStatement<[string], { id: number }> - updateVersionMapping: SQLiteStatement<[string, string]> deleteZeroMultiplicity: SQLiteStatement - getVersionId: SQLiteStatement<[string], { id: number }> setCompactionFrontier: SQLiteStatement<[string]> getCompactionFrontier: SQLiteStatement<[], { value: string }> deleteMeta: SQLiteStatement getAllKeys: SQLiteStatement<[], { key: string }> - getVersionsForKey: SQLiteStatement< - [string], - { version: string; version_id: number } - > - moveDataToNewVersion: SQLiteStatement<[number, string, number]> - deleteOldVersionData: SQLiteStatement<[string, number]> + getVersionsForKey: SQLiteStatement<[string], { version: string }> + moveDataToNewVersion: SQLiteStatement<[string, string, string]> + deleteOldVersionData: SQLiteStatement<[string, string]> truncate: SQLiteStatement truncateMeta: SQLiteStatement - truncateVersions: SQLiteStatement } - // Cache for frequently used queries + // Change cache to store queries instead of statements static #appendQueryCache = new Map() static #joinQueryCache = new Map() constructor(db: SQLiteDb, name: string, isTemp = false) { this.#db = db this.#tableName = `index_${name}` - this.#versionTableName = `${this.#tableName}_versions` this.#isTemp = isTemp - // Create tables - this.#db.exec(` - CREATE ${isTemp ? 'TEMP' : ''} TABLE IF NOT EXISTS ${this.#versionTableName} ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - version TEXT NOT NULL - ) - `) - + // Create single table with version string directly this.#db.exec(` CREATE ${isTemp ? 'TEMP' : ''} TABLE IF NOT EXISTS ${this.#tableName} ( key TEXT NOT NULL, - version_id INTEGER NOT NULL, + version TEXT NOT NULL, value TEXT NOT NULL, multiplicity INTEGER NOT NULL, - PRIMARY KEY (key, version_id, value), - FOREIGN KEY (version_id) REFERENCES ${this.#versionTableName}(id) + PRIMARY KEY (key, version, value) ) `) @@ -106,88 +92,65 @@ export class SQLIndex { if (!isTemp) { this.#db.exec(` CREATE INDEX IF NOT EXISTS ${this.#tableName}_version_idx - ON ${this.#tableName}(version_id) + ON ${this.#tableName}(version) `) this.#db.exec(` CREATE INDEX IF NOT EXISTS ${this.#tableName}_key_idx ON ${this.#tableName}(key) `) - this.#db.exec(` - CREATE INDEX IF NOT EXISTS ${this.#versionTableName}_version_idx - ON ${this.#versionTableName}(version) - `) } // Prepare statements this.#preparedStatements = { insert: this.#db.prepare(` - INSERT INTO ${this.#tableName} (key, version_id, value, multiplicity) - VALUES (@key, @version_id, @value, @multiplicity) - ON CONFLICT(key, version_id, value) DO + INSERT INTO ${this.#tableName} (key, version, value, multiplicity) + VALUES (@key, @version, @value, @multiplicity) + ON CONFLICT(key, version, value) DO UPDATE SET multiplicity = multiplicity + excluded.multiplicity `), - insertVersion: this.#db.prepare(` - INSERT INTO ${this.#versionTableName} (version) - VALUES (?) - RETURNING id - `), - get: this.#db.prepare(` SELECT value, multiplicity - FROM ${this.#tableName} t - JOIN ${this.#versionTableName} v ON v.id = t.version_id - WHERE key = @key AND v.version = @version + FROM ${this.#tableName} + WHERE key = @key AND version = @version `), getVersions: this.#db.prepare(` - SELECT DISTINCT v.version - FROM ${this.#tableName} t - JOIN ${this.#versionTableName} v ON v.id = t.version_id + SELECT DISTINCT version + FROM ${this.#tableName} WHERE key = ? `), getAllForKey: this.#db.prepare(` - SELECT v.version, t.value, t.multiplicity - FROM ${this.#tableName} t - JOIN ${this.#versionTableName} v ON v.id = t.version_id - WHERE t.key = ? AND json_array_length(v.version) <= json_array_length(@requestedVersion) + SELECT version, value, multiplicity + FROM ${this.#tableName} + WHERE key = ? AND json_array_length(version) <= json_array_length(@requestedVersion) `), delete: this.#db.prepare(` DELETE FROM ${this.#tableName} - WHERE version_id IN ( - SELECT id FROM ${this.#versionTableName} - WHERE version = ? - ) + WHERE version = ? `), deleteAll: this.#db.prepare(` DROP TABLE IF EXISTS ${this.#tableName} `), - deleteAllVersions: this.#db.prepare(` - DROP TABLE IF EXISTS ${this.#versionTableName} - `), - getForCompaction: this.#db.prepare(` - SELECT t.key, v.version, t.value, t.multiplicity - FROM ${this.#tableName} t - JOIN ${this.#versionTableName} v ON v.id = t.version_id - WHERE v.version = ? + SELECT key, version, value, multiplicity + FROM ${this.#tableName} + WHERE version = ? `), consolidateVersions: this.#db.prepare(` WITH consolidated AS ( SELECT - t1.key, - t1.version_id, - t1.value, - CAST(SUM(CAST(t1.multiplicity AS BIGINT)) AS INTEGER) as new_multiplicity - FROM ${this.#tableName} t1 - JOIN ${this.#versionTableName} v1 ON v1.id = t1.version_id - WHERE v1.version = ? - GROUP BY t1.key, t1.value, t1.version_id + key, + value, + CAST(SUM(CAST(multiplicity AS BIGINT)) AS INTEGER) as new_multiplicity + FROM ${this.#tableName} + WHERE version = ? + GROUP BY key, value ) UPDATE ${this.#tableName} SET multiplicity = ( @@ -195,13 +158,9 @@ export class SQLIndex { FROM consolidated c WHERE c.key = ${this.#tableName}.key AND - c.value = ${this.#tableName}.value AND - c.version_id = ${this.#tableName}.version_id - ) - WHERE version_id IN ( - SELECT id FROM ${this.#versionTableName} - WHERE version = ? + c.value = ${this.#tableName}.value ) + WHERE version = ? `), deleteZeroMultiplicity: this.#db.prepare(` @@ -209,17 +168,6 @@ export class SQLIndex { WHERE multiplicity = 0 `), - updateVersionMapping: this.#db.prepare(` - UPDATE ${this.#versionTableName} - SET version = ? - WHERE version = ? - `), - - getVersionId: this.#db.prepare(` - SELECT id FROM ${this.#versionTableName} - WHERE version = ? - `), - setCompactionFrontier: this.#db.prepare(` INSERT OR REPLACE INTO ${this.#tableName}_meta (key, value) VALUES ('compaction_frontier', ?) @@ -239,24 +187,23 @@ export class SQLIndex { `), getVersionsForKey: this.#db.prepare(` - SELECT DISTINCT v.version, v.id as version_id - FROM ${this.#tableName} t - JOIN ${this.#versionTableName} v ON v.id = t.version_id - WHERE t.key = ? + SELECT DISTINCT version + FROM ${this.#tableName} + WHERE key = ? `), moveDataToNewVersion: this.#db.prepare(` - INSERT INTO ${this.#tableName} (key, version_id, value, multiplicity) + INSERT INTO ${this.#tableName} (key, version, value, multiplicity) SELECT key, ?, value, multiplicity FROM ${this.#tableName} - WHERE key = ? AND version_id = ? - ON CONFLICT(key, version_id, value) DO UPDATE SET + WHERE key = ? AND version = ? + ON CONFLICT(key, version, value) DO UPDATE SET multiplicity = multiplicity + excluded.multiplicity `), deleteOldVersionData: this.#db.prepare(` DELETE FROM ${this.#tableName} - WHERE key = ? AND version_id = ? + WHERE key = ? AND version = ? `), truncate: this.#db.prepare(` @@ -266,10 +213,6 @@ export class SQLIndex { truncateMeta: this.#db.prepare(` DELETE FROM ${this.#tableName}_meta `), - - truncateVersions: this.#db.prepare(` - DELETE FROM ${this.#versionTableName} - `), } } @@ -281,20 +224,24 @@ export class SQLIndex { return this.#tableName } - get versionTableName(): string { - return this.#versionTableName - } - getCompactionFrontier(): Antichain | null { + if (this.#compactionFrontierCache !== null) { + return this.#compactionFrontierCache + } + const frontierRow = this.#preparedStatements.getCompactionFrontier.get() if (!frontierRow) return null const data = JSON.parse(frontierRow.value) as number[][] - return new Antichain(data.map((inner) => v(inner))) + const frontier = new Antichain(data.map((inner) => v(inner))) + + this.#compactionFrontierCache = frontier + return frontier } setCompactionFrontier(frontier: Antichain): void { const json = JSON.stringify(frontier.elements.map((v) => v.getInner())) this.#preparedStatements.setCompactionFrontier.run(json) + this.#compactionFrontierCache = frontier } #validate(requestedVersion: Version | Antichain): boolean { @@ -341,58 +288,30 @@ export class SQLIndex { this.#validate(version) const versionJson = version.toJSON() - // First try to get existing version id - let versionRow = this.#preparedStatements.getVersionId.get(versionJson) as - | { id: number } - | undefined - - // If version doesn't exist, insert it - if (!versionRow) { - versionRow = this.#preparedStatements.insertVersion.get(versionJson) as { - id: number - } - } - - // Insert the actual value this.#preparedStatements.insert.run({ key: JSON.stringify(key), - version_id: versionRow.id, + version: versionJson, value: JSON.stringify(value[0]), multiplicity: value[1], }) } append(other: SQLIndex): void { - // First, copy any missing versions from the other version table - const copyVersionsQuery = ` - INSERT INTO ${this.#versionTableName} (version) - SELECT DISTINCT o.version - FROM ${other.#versionTableName} o - WHERE NOT EXISTS ( - SELECT 1 FROM ${this.#versionTableName} t - WHERE t.version = o.version - ) - ` - this.#db.prepare(copyVersionsQuery).run() - - // Now use the cached query for the data copy const cacheKey = `${this.#tableName}_${other.tableName}` let query = SQLIndex.#appendQueryCache.get(cacheKey) if (!query) { query = ` - INSERT OR REPLACE INTO ${this.#tableName} (key, version_id, value, multiplicity) + INSERT OR REPLACE INTO ${this.#tableName} (key, version, value, multiplicity) SELECT o.key, - v2.id as version_id, + o.version, o.value, COALESCE(t.multiplicity, 0) + o.multiplicity as multiplicity FROM ${other.tableName} o - JOIN ${other.#versionTableName} v1 ON v1.id = o.version_id - JOIN ${this.#versionTableName} v2 ON v2.version = v1.version LEFT JOIN ${this.#tableName} t ON t.key = o.key - AND t.version_id = v2.id + AND t.version = o.version AND t.value = o.value ` SQLIndex.#appendQueryCache.set(cacheKey, query) @@ -409,65 +328,66 @@ export class SQLIndex { query = ` SELECT a.key, - ( - WITH RECURSIVE numbers(i) AS ( - SELECT 0 - UNION ALL - SELECT i + 1 FROM numbers - WHERE i < json_array_length(va.version) - 1 - ) - SELECT json_group_array( - MAX( - json_extract(va.version, '$[' || i || ']'), - json_extract(vb.version, '$[' || i || ']') - ) - ) - FROM numbers - ) as version, - json_array(json(a.value), json(b.value)) as joined_value, - a.multiplicity * b.multiplicity as multiplicity + a.version as this_version, + b.version as other_version, + a.value as this_value, + b.value as other_value, + a.multiplicity as this_multiplicity, + b.multiplicity as other_multiplicity FROM ${this.#tableName} a - JOIN ${this.#versionTableName} va ON va.id = a.version_id JOIN ${other.tableName} b ON a.key = b.key - JOIN ${other.#versionTableName} vb ON vb.id = b.version_id - GROUP BY a.key, a.value, b.value ` SQLIndex.#joinQueryCache.set(cacheKey, query) } const results = this.#db.prepare(query).all() as JoinResult[] - const versionMap = new Map>() + const collections = new Map() for (const row of results) { const key = JSON.parse(row.key) as K - const [v1, v2] = JSON.parse(row.joined_value) as [V, V2] - - if (!versionMap.has(row.version)) { - versionMap.set(row.version, new MultiSet()) + const version1 = Version.fromJSON(row.this_version) + const version2 = Version.fromJSON(row.other_version) + const val1 = JSON.parse(row.this_value) as V + const val2 = JSON.parse(row.other_value) as V2 + const mul1 = row.this_multiplicity + const mul2 = row.other_multiplicity + + const resultVersion = version1.join(version2) + const versionKey = resultVersion.toJSON() + + if (!collections.has(versionKey)) { + collections.set(versionKey, []) } - const collection = versionMap.get(row.version)! - collection.extend([[[key, [v1, v2]], row.multiplicity]]) + collections.get(versionKey)!.push([key, [val1, val2], mul1 * mul2]) } - const result = Array.from(versionMap.entries()).map( - ([versionStr, collection]) => [Version.fromJSON(versionStr), collection], - ) + const result = Array.from(collections.entries()) + .filter(([_v, c]) => c.length > 0) + .map(([versionJson, data]) => [ + Version.fromJSON(versionJson), + new MultiSet(data.map(([k, v, m]) => [[k, v], m])), + ]) + return result as [Version, MultiSet<[K, [V, V2]]>][] } destroy(): void { this.#preparedStatements.deleteMeta.run() this.#preparedStatements.deleteAll.run() - this.#preparedStatements.deleteAllVersions.run() - // Clear the query caches + // Just clear the query caches + SQLIndex.#appendQueryCache.clear() + SQLIndex.#joinQueryCache.clear() + this.#compactionFrontierCache = null + } + + static clearStatementCaches(): void { SQLIndex.#appendQueryCache.clear() SQLIndex.#joinQueryCache.clear() } compact(compactionFrontier: Antichain, keys: K[] = []): void { - // Check existing frontier const existingFrontier = this.getCompactionFrontier() if (existingFrontier && !existingFrontier.lessEqual(compactionFrontier)) { throw new Error('Invalid compaction frontier') @@ -480,21 +400,19 @@ export class SQLIndex { keys.length > 0 ? keys : (() => { - const rows = this.#preparedStatements.getAllKeys.all() as { - key: string - }[] + const rows = this.#preparedStatements.getAllKeys.all() return rows.map((row) => JSON.parse(row.key)) })() // Process each key for (const key of keysToProcess) { // Get versions for this key that need compaction - const versionsToCompact = this.#preparedStatements.getVersionsForKey.all( - JSON.stringify(key), - ) as { version: string; version_id: number }[] + const versionsToCompact = this.#preparedStatements.getVersionsForKey + .all(JSON.stringify(key)) + .map((row) => row.version) - const toCompact = versionsToCompact.filter((row) => { - const version = Version.fromJSON(row.version) + const toCompact = versionsToCompact.filter((versionJson) => { + const version = Version.fromJSON(versionJson) return !compactionFrontier.lessEqualVersion(version) }) @@ -502,35 +420,22 @@ export class SQLIndex { const toConsolidate = new Set() // Process each version that needs compaction - for (const versionRow of toCompact) { - const oldVersion = Version.fromJSON(versionRow.version) + for (const oldVersionJson of toCompact) { + const oldVersion = Version.fromJSON(oldVersionJson) const newVersion = oldVersion.advanceBy(compactionFrontier) const newVersionJson = newVersion.toJSON() - // Get or create the new version ID - let newVersionId = this.#preparedStatements.getVersionId.get( - newVersionJson, - ) as { id: number } | undefined - - if (!newVersionId) { - newVersionId = this.#preparedStatements.insertVersion.get( - newVersionJson, - ) as { - id: number - } - } - - // Move data to new version, handling conflicts by adding multiplicities + // Move data to new version this.#preparedStatements.moveDataToNewVersion.run( - newVersionId.id, + newVersionJson, JSON.stringify(key), - versionRow.version_id, + oldVersionJson, ) // Delete old version data this.#preparedStatements.deleteOldVersionData.run( JSON.stringify(key), - versionRow.version_id, + oldVersionJson, ) toConsolidate.add(newVersionJson) @@ -554,30 +459,19 @@ export class SQLIndex { showAll(): { key: K; version: Version; value: V; multiplicity: number }[] { const rows = this.#db - .prepare( - ` - SELECT i.*, v.version - FROM ${this.#tableName} i - JOIN ${this.#versionTableName} v ON i.version_id = v.id - `, - ) - .all() - return rows as { - key: K - version: Version - value: V - multiplicity: number - }[] - } - - static clearStatementCaches(): void { - SQLIndex.#appendQueryCache.clear() - SQLIndex.#joinQueryCache.clear() + .prepare(`SELECT * FROM ${this.#tableName}`) + .all() as IndexRow[] + return rows.map((row) => ({ + key: JSON.parse(row.key), + version: Version.fromJSON(row.version), + value: JSON.parse(row.value), + multiplicity: row.multiplicity, + })) } truncate(): void { this.#preparedStatements.truncate.run() this.#preparedStatements.truncateMeta.run() - this.#preparedStatements.truncateVersions.run() + this.#compactionFrontierCache = null } } diff --git a/packages/d2ts/src/utils.ts b/packages/d2ts/src/utils.ts index 7bd8c9c..33b345e 100644 --- a/packages/d2ts/src/utils.ts +++ b/packages/d2ts/src/utils.ts @@ -51,3 +51,19 @@ export class DefaultMap extends Map { return newValue } } + +// JS engines have various limits on how many args can be passed to a function +// with a spread operator, so we need to split the operation into chunks +// 32767 is the max for Chrome 14, all others are higher +// TODO: investigate the performance of this and other approaches +const chunkSize = 30000 +export function chunkedArrayPush(array: unknown[], other: unknown[]) { + if (other.length <= chunkSize) { + array.push(...other) + } else { + for (let i = 0; i < other.length; i += chunkSize) { + const chunk = other.slice(i, i + chunkSize) + array.push(...chunk) + } + } +} diff --git a/packages/d2ts/src/version-index.ts b/packages/d2ts/src/version-index.ts index 3e9bbe0..9cde34b 100644 --- a/packages/d2ts/src/version-index.ts +++ b/packages/d2ts/src/version-index.ts @@ -1,6 +1,6 @@ import { Version, Antichain } from './order.js' import { MultiSet } from './multiset.js' -import { DefaultMap } from './utils.js' +import { DefaultMap, chunkedArrayPush } from './utils.js' type VersionMap = DefaultMap type IndexMap = DefaultMap> @@ -66,7 +66,7 @@ export class Index implements IndexType { for (const [version, values] of versions.entries()) { if (version.lessEqual(requestedVersion)) { - out.push(...values) + chunkedArrayPush(out, values) } } @@ -92,7 +92,7 @@ export class Index implements IndexType { const thisVersions = this.#inner.get(key) for (const [version, data] of versions) { thisVersions.update(version, (values) => { - values.push(...data) + chunkedArrayPush(values, data) return values }) } @@ -207,7 +207,7 @@ export class Index implements IndexType { const newVersion = version.advanceBy(compactionFrontier) versions.update(newVersion, (existing) => { - existing.push(...values) + chunkedArrayPush(existing, values) return existing }) toConsolidate.add(newVersion)