Skip to content

Commit d0cd697

Browse files
committed
Create a sqlite database wrapper as a route to supporting other sqlite drivers
1 parent 95c78af commit d0cd697

File tree

14 files changed

+175
-82
lines changed

14 files changed

+175
-82
lines changed
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/**
2+
* Represents a prepared statement in SQLite
3+
*/
4+
export interface SQLiteStatement<Params = unknown, Result = unknown> {
5+
/**
6+
* Run the prepared statement with parameters
7+
* Accepts either an array of parameters or an object of named parameters
8+
*/
9+
run(...params: Params extends any[] ? Params : [Params?]): void
10+
11+
/**
12+
* Get a single row from the prepared statement
13+
* Accepts either an array of parameters or an object of named parameters
14+
*/
15+
get(...params: Params extends any[] ? Params : [Params?]): Result | undefined
16+
17+
/**
18+
* Get all rows from the prepared statement
19+
* Accepts either an array of parameters or an object of named parameters
20+
*/
21+
all(...params: Params extends any[] ? Params : [Params?]): Result[]
22+
}
23+
24+
/**
25+
* Interface for SQLite database operations
26+
*/
27+
export interface SQLiteDb {
28+
/**
29+
* Execute raw SQL
30+
*/
31+
exec(sql: string): void
32+
33+
/**
34+
* Prepare a statement
35+
*/
36+
prepare<Params = unknown, Result = unknown>(
37+
sql: string,
38+
): SQLiteStatement<Params, Result>
39+
40+
/**
41+
* Close the database connection
42+
*/
43+
close(): void
44+
}
45+
46+
/**
47+
* Wrapper for better-sqlite3 to implement SQLiteDb interface
48+
*/
49+
export class BetterSQLite3Wrapper implements SQLiteDb {
50+
#db: import('better-sqlite3').Database
51+
52+
constructor(db: import('better-sqlite3').Database) {
53+
this.#db = db
54+
}
55+
56+
exec(sql: string): void {
57+
this.#db.exec(sql)
58+
}
59+
60+
prepare<Params = unknown, Result = unknown>(
61+
sql: string,
62+
): SQLiteStatement<Params, Result> {
63+
const stmt = this.#db.prepare(sql)
64+
return {
65+
run: (...params: Params extends any[] ? Params : [Params?]) =>
66+
stmt.run(...params),
67+
get: (...params: Params extends any[] ? Params : [Params?]) =>
68+
stmt.get(...params) as Result | undefined,
69+
all: (...params: Params extends any[] ? Params : [Params?]) =>
70+
stmt.all(...params) as Result[],
71+
}
72+
}
73+
74+
close(): void {
75+
this.#db.close()
76+
}
77+
}

packages/d2ts/src/sqlite/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
export * from './version-index.js'
22
export * from './operators/index.js'
3+
export * from './database.js'

packages/d2ts/src/sqlite/operators/consolidate.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import {
1212
UnaryOperator,
1313
} from '../../graph.js'
1414
import { Version, Antichain } from '../../order.js'
15-
import Database from 'better-sqlite3'
15+
import { SQLiteDb, SQLiteStatement } from '../database.js'
1616

1717
interface CollectionRow {
1818
version: string
@@ -29,19 +29,19 @@ interface CollectionParams {
2929
*/
3030
export class ConsolidateOperatorSQLite<T> extends UnaryOperator<T> {
3131
#preparedStatements: {
32-
insert: Database.Statement<CollectionParams>
33-
update: Database.Statement<CollectionParams>
34-
get: Database.Statement<string, CollectionRow>
35-
delete: Database.Statement<string>
36-
getAllVersions: Database.Statement<[], CollectionRow>
32+
insert: SQLiteStatement<CollectionParams>
33+
update: SQLiteStatement<CollectionParams>
34+
get: SQLiteStatement<[string], CollectionRow>
35+
delete: SQLiteStatement<[string]>
36+
getAllVersions: SQLiteStatement<[], CollectionRow>
3737
}
3838

3939
constructor(
4040
id: number,
4141
inputA: DifferenceStreamReader<T>,
4242
output: DifferenceStreamWriter<T>,
4343
initialFrontier: Antichain,
44-
db: Database.Database,
44+
db: SQLiteDb,
4545
) {
4646
super(id, inputA, output, initialFrontier)
4747

@@ -143,7 +143,7 @@ export class ConsolidateOperatorSQLite<T> extends UnaryOperator<T> {
143143
* Persists state to SQLite
144144
* @param db - The SQLite database
145145
*/
146-
export function consolidate<T>(db: Database.Database): PipedOperator<T, T> {
146+
export function consolidate<T>(db: SQLiteDb): PipedOperator<T, T> {
147147
return (stream: IStreamBuilder<T>): IStreamBuilder<T> => {
148148
const output = new StreamBuilder<T>(
149149
stream.graph,

packages/d2ts/src/sqlite/operators/count.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@ import { StreamBuilder } from '../../d2.js'
22
import { IStreamBuilder, KeyValue } from '../../types.js'
33
import { DifferenceStreamReader, DifferenceStreamWriter } from '../../graph.js'
44
import { Antichain } from '../../order.js'
5-
6-
import Database from 'better-sqlite3'
5+
import { SQLiteDb } from '../database.js'
76
import { ReduceOperatorSQLite } from './reduce.js'
87

98
export class CountOperatorSQLite<K, V> extends ReduceOperatorSQLite<
@@ -16,7 +15,7 @@ export class CountOperatorSQLite<K, V> extends ReduceOperatorSQLite<
1615
inputA: DifferenceStreamReader<[K, V]>,
1716
output: DifferenceStreamWriter<[K, number]>,
1817
initialFrontier: Antichain,
19-
db: Database.Database,
18+
db: SQLiteDb,
2019
) {
2120
const countInner = (vals: [V, number][]): [number, number][] => {
2221
let count = 0
@@ -39,7 +38,7 @@ export function count<
3938
K extends T extends KeyValue<infer K, infer _V> ? K : never,
4039
V extends T extends KeyValue<K, infer V> ? V : never,
4140
T,
42-
>(db: Database.Database) {
41+
>(db: SQLiteDb) {
4342
return (stream: IStreamBuilder<T>): IStreamBuilder<KeyValue<K, number>> => {
4443
const output = new StreamBuilder<KeyValue<K, number>>(
4544
stream.graph,

packages/d2ts/src/sqlite/operators/distinct.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { StreamBuilder } from '../../d2.js'
22
import { IStreamBuilder, KeyValue } from '../../types.js'
33
import { DifferenceStreamReader, DifferenceStreamWriter } from '../../graph.js'
44
import { Antichain } from '../../order.js'
5-
import Database from 'better-sqlite3'
5+
import { SQLiteDb } from '../database.js'
66
import { ReduceOperatorSQLite } from './reduce.js'
77

88
export class DistinctOperatorSQLite<K, V> extends ReduceOperatorSQLite<
@@ -15,7 +15,7 @@ export class DistinctOperatorSQLite<K, V> extends ReduceOperatorSQLite<
1515
inputA: DifferenceStreamReader<[K, V]>,
1616
output: DifferenceStreamWriter<[K, V]>,
1717
initialFrontier: Antichain,
18-
db: Database.Database,
18+
db: SQLiteDb,
1919
) {
2020
const distinctInner = (vals: [V, number][]): [V, number][] => {
2121
const consolidated = new Map<string, number>()
@@ -43,7 +43,7 @@ export function distinct<
4343
K extends T extends KeyValue<infer K, infer _V> ? K : never,
4444
V extends T extends KeyValue<K, infer V> ? V : never,
4545
T,
46-
>(db: Database.Database) {
46+
>(db: SQLiteDb) {
4747
return (stream: IStreamBuilder<T>): IStreamBuilder<KeyValue<K, V>> => {
4848
const output = new StreamBuilder<KeyValue<K, V>>(
4949
stream.graph,

packages/d2ts/src/sqlite/operators/join.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,23 +12,23 @@ import {
1212
BinaryOperator,
1313
} from '../../graph.js'
1414
import { Version, Antichain } from '../../order.js'
15-
import Database from 'better-sqlite3'
15+
import { SQLiteDb } from '../database.js'
1616
import { SQLIndex } from '../version-index.js'
1717

1818
export class JoinOperatorSQLite<K, V1, V2> extends BinaryOperator<
1919
[K, unknown]
2020
> {
2121
#indexA: SQLIndex<K, V1>
2222
#indexB: SQLIndex<K, V2>
23-
#db: Database.Database
23+
#db: SQLiteDb
2424

2525
constructor(
2626
id: number,
2727
inputA: DifferenceStreamReader<[K, V1]>,
2828
inputB: DifferenceStreamReader<[K, V2]>,
2929
output: DifferenceStreamWriter<[K, [V1, V2]]>,
3030
initialFrontier: Antichain,
31-
db: Database.Database,
31+
db: SQLiteDb,
3232
) {
3333
super(id, inputA, inputB, output, initialFrontier)
3434
this.#db = db
@@ -117,7 +117,7 @@ export class JoinOperatorSQLite<K, V1, V2> extends BinaryOperator<
117117
this.output.sendFrontier(this.outputFrontier)
118118
this.#indexA.compact(this.outputFrontier)
119119
this.#indexB.compact(this.outputFrontier)
120-
}
120+
}
121121
} finally {
122122
// Clean up temporary indexes
123123
deltaA.destroy()
@@ -137,7 +137,7 @@ export function join<
137137
V1 extends T extends KeyValue<infer _KT, infer VT> ? VT : never,
138138
V2,
139139
T,
140-
>(other: IStreamBuilder<KeyValue<K, V2>>, db: Database.Database) {
140+
>(other: IStreamBuilder<KeyValue<K, V2>>, db: SQLiteDb) {
141141
return (stream: IStreamBuilder<T>): IStreamBuilder<KeyValue<K, [V1, V2]>> => {
142142
if (stream.graph !== other.graph) {
143143
throw new Error('Cannot join streams from different graphs')

packages/d2ts/src/sqlite/operators/reduce.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import {
1212
UnaryOperator,
1313
} from '../../graph.js'
1414
import { Version, Antichain } from '../../order.js'
15-
import Database from 'better-sqlite3'
15+
import { SQLiteDb, SQLiteStatement } from '../database.js'
1616
import { SQLIndex } from '../version-index.js'
1717

1818
interface KeysTodoRow {
@@ -29,11 +29,11 @@ export class ReduceOperatorSQLite<K, V1, V2> extends UnaryOperator<
2929
#index: SQLIndex<K, V1>
3030
#indexOut: SQLIndex<K, V2>
3131
#preparedStatements: {
32-
insertKeyTodo: Database.Statement<[string, string]>
33-
getKeysTodo: Database.Statement<[], KeysTodoRow>
34-
deleteKeysTodo: Database.Statement<[string]>
35-
createKeysTodoTable: Database.Statement
36-
dropKeysTodoTable: Database.Statement
32+
insertKeyTodo: SQLiteStatement<[string, string]>
33+
getKeysTodo: SQLiteStatement<[], KeysTodoRow>
34+
deleteKeysTodo: SQLiteStatement<[string]>
35+
createKeysTodoTable: SQLiteStatement
36+
dropKeysTodoTable: SQLiteStatement
3737
}
3838
#f: (values: [V1, number][]) => [V2, number][]
3939

@@ -43,7 +43,7 @@ export class ReduceOperatorSQLite<K, V1, V2> extends UnaryOperator<
4343
output: DifferenceStreamWriter<[K, V2]>,
4444
f: (values: [V1, number][]) => [V2, number][],
4545
initialFrontier: Antichain,
46-
db: Database.Database,
46+
db: SQLiteDb,
4747
) {
4848
super(id, inputA, output, initialFrontier)
4949
this.#f = f
@@ -211,7 +211,7 @@ export function reduce<
211211
V1 extends T extends KeyValue<K, infer V> ? V : never,
212212
R,
213213
T,
214-
>(f: (values: [V1, number][]) => [R, number][], db: Database.Database) {
214+
>(f: (values: [V1, number][]) => [R, number][], db: SQLiteDb) {
215215
return (stream: IStreamBuilder<T>): IStreamBuilder<KeyValue<K, R>> => {
216216
const output = new StreamBuilder<KeyValue<K, R>>(
217217
stream.graph,

packages/d2ts/src/sqlite/version-index.ts

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { Version, Antichain, v } from '../order.js'
22
import { MultiSet } from '../multiset.js'
3-
import Database from 'better-sqlite3'
3+
import { SQLiteDb, SQLiteStatement } from './database.js'
44

55
interface IndexRow {
66
key: string
@@ -33,37 +33,37 @@ interface JoinResult {
3333
}
3434

3535
export class SQLIndex<K, V> {
36-
#db: Database.Database
36+
#db: SQLiteDb
3737
#tableName: string
3838
#versionTableName: string
3939
#isTemp: boolean
4040
#preparedStatements: {
41-
insert: Database.Statement<InsertParams>
42-
get: Database.Statement<GetParams, IndexRow>
43-
getVersions: Database.Statement<[string], { version: string }>
44-
getAllForKey: Database.Statement<[string, GetAllForKeyParams], IndexRow>
45-
delete: Database.Statement<[string]>
46-
deleteAll: Database.Statement
47-
deleteAllVersions: Database.Statement
48-
getForCompaction: Database.Statement<[string], IndexRow>
49-
consolidateVersions: Database.Statement<[string, string]>
50-
insertVersion: Database.Statement<[string], { id: number }>
51-
updateVersionMapping: Database.Statement<[string, string]>
52-
deleteZeroMultiplicity: Database.Statement
53-
getVersionId: Database.Statement<[string], { id: number }>
54-
setCompactionFrontier: Database.Statement<[string]>
55-
getCompactionFrontier: Database.Statement<[], { value: string }>
56-
deleteMeta: Database.Statement
57-
getAllKeys: Database.Statement<[], { key: string }>
58-
getVersionsForKey: Database.Statement<
41+
insert: SQLiteStatement<InsertParams>
42+
get: SQLiteStatement<GetParams, IndexRow>
43+
getVersions: SQLiteStatement<[string], { version: string }>
44+
getAllForKey: SQLiteStatement<[string, GetAllForKeyParams], IndexRow>
45+
delete: SQLiteStatement<[string]>
46+
deleteAll: SQLiteStatement
47+
deleteAllVersions: SQLiteStatement
48+
getForCompaction: SQLiteStatement<[string], IndexRow>
49+
consolidateVersions: SQLiteStatement<[string, string]>
50+
insertVersion: SQLiteStatement<[string], { id: number }>
51+
updateVersionMapping: SQLiteStatement<[string, string]>
52+
deleteZeroMultiplicity: SQLiteStatement
53+
getVersionId: SQLiteStatement<[string], { id: number }>
54+
setCompactionFrontier: SQLiteStatement<[string]>
55+
getCompactionFrontier: SQLiteStatement<[], { value: string }>
56+
deleteMeta: SQLiteStatement
57+
getAllKeys: SQLiteStatement<[], { key: string }>
58+
getVersionsForKey: SQLiteStatement<
5959
[string],
6060
{ version: string; version_id: number }
6161
>
62-
moveDataToNewVersion: Database.Statement<[number, string, number]>
63-
deleteOldVersionData: Database.Statement<[string, number]>
62+
moveDataToNewVersion: SQLiteStatement<[number, string, number]>
63+
deleteOldVersionData: SQLiteStatement<[string, number]>
6464
}
6565

66-
constructor(db: Database.Database, name: string, isTemp = false) {
66+
constructor(db: SQLiteDb, name: string, isTemp = false) {
6767
this.#db = db
6868
this.#tableName = `index_${name}`
6969
this.#versionTableName = `${this.#tableName}_versions`

packages/d2ts/tests/operators-sqlite/consolidate.test.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,19 @@ import { DataMessage, MessageType } from '../../src/types.js'
66
import { consolidate } from '../../src/sqlite/operators/consolidate.js'
77
import { output } from '../../src/operators/index.js'
88
import Database from 'better-sqlite3'
9+
import { BetterSQLite3Wrapper } from '../../src/sqlite/database.js'
910
import fs from 'fs'
1011
import path from 'path'
1112

1213
const DB_FILENAME = 'test-consolidate.db'
1314

1415
describe('SQLite Operators', () => {
1516
describe('Consolidate operation', () => {
16-
let db: Database.Database
17+
let db: BetterSQLite3Wrapper
1718

1819
beforeEach(() => {
19-
db = new Database(':memory:')
20+
const sqlite = new Database(':memory:')
21+
db = new BetterSQLite3Wrapper(sqlite)
2022
})
2123

2224
afterEach(() => {
@@ -126,13 +128,14 @@ describe('SQLite Operators', () => {
126128

127129
describe('Consolidate operation with persistence', () => {
128130
const dbPath = path.join(import.meta.dirname, DB_FILENAME)
129-
let db: Database.Database
131+
let db: BetterSQLite3Wrapper
130132

131133
beforeEach(() => {
132134
if (fs.existsSync(dbPath)) {
133135
fs.unlinkSync(dbPath)
134136
}
135-
db = new Database(dbPath)
137+
const sqlite = new Database(dbPath)
138+
db = new BetterSQLite3Wrapper(sqlite)
136139
})
137140

138141
afterEach(() => {
@@ -175,7 +178,7 @@ describe('SQLite Operators', () => {
175178

176179
// Create new graph instance with same database
177180
messages = []
178-
db = new Database(dbPath)
181+
db = new BetterSQLite3Wrapper(new Database(dbPath))
179182
graph = new D2({ initialFrontier: v([1, 0]) })
180183
const newInput = graph.newInput<number>()
181184

0 commit comments

Comments
 (0)