Skip to content

Commit 3c85b86

Browse files
committed
feat: improve concurrency features
1 parent ee33215 commit 3c85b86

File tree

5 files changed

+206
-123
lines changed

5 files changed

+206
-123
lines changed

package/src/DatabaseQueue.ts

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
import NitroSQLiteError from './NitroSQLiteError'
2+
3+
export interface QueuedOperation {
4+
/**
5+
* Starts the operation
6+
*/
7+
start: () => void
8+
}
9+
10+
export type DatabaseQueue = {
11+
queue: QueuedOperation[]
12+
inProgress: boolean
13+
}
14+
15+
const databaseQueues = new Map<string, DatabaseQueue>()
16+
17+
export function openDatabaseQueue(dbName: string) {
18+
if (isDatabaseOpen(dbName)) {
19+
throw new NitroSQLiteError(
20+
`Database ${dbName} is already open. There is already a connection to the database.`
21+
)
22+
}
23+
24+
databaseQueues.set(dbName, { queue: [], inProgress: false })
25+
}
26+
27+
export function closeDatabaseQueue(dbName: string) {
28+
const queue = getDatabaseQueue(dbName)
29+
30+
if (queue.inProgress || queue.queue.length > 0) {
31+
console.warn(
32+
`Database queue for ${dbName} has operations in the queue. Closing anyway.`
33+
)
34+
}
35+
36+
databaseQueues.delete(dbName)
37+
}
38+
39+
export function isDatabaseOpen(dbName: string) {
40+
return databaseQueues.has(dbName)
41+
}
42+
43+
export function throwIfDatabaseIsNotOpen(dbName: string) {
44+
if (!isDatabaseOpen(dbName))
45+
throw new NitroSQLiteError(
46+
`Database ${dbName} is not open. There is no connection to the database.`
47+
)
48+
}
49+
50+
function getDatabaseQueue(dbName: string) {
51+
throwIfDatabaseIsNotOpen(dbName)
52+
53+
const queue = databaseQueues.get(dbName)!
54+
return queue
55+
}
56+
57+
export function openDatabase(dbName: string) {
58+
databaseQueues.set(dbName, { queue: [], inProgress: false })
59+
}
60+
61+
export function closeDatabase(dbName: string) {
62+
databaseQueues.delete(dbName)
63+
}
64+
65+
export function queueOperationAsync<
66+
OperationCallback extends () => Promise<Result>,
67+
Result = void,
68+
>(dbName: string, callback: OperationCallback) {
69+
const queue = getDatabaseQueue(dbName)
70+
71+
return new Promise<Result>((resolve, reject) => {
72+
const operation: QueuedOperation = {
73+
start: async () => {
74+
try {
75+
const result = await callback()
76+
resolve(result)
77+
} catch (error) {
78+
reject(error)
79+
} finally {
80+
queue.inProgress = false
81+
startOperationAsync(dbName)
82+
}
83+
},
84+
}
85+
86+
queue.queue.push(operation)
87+
startOperationAsync(dbName)
88+
})
89+
}
90+
91+
function startOperationAsync(dbName: string) {
92+
const queue = getDatabaseQueue(dbName)
93+
94+
if (queue.inProgress) {
95+
// Operation is already in process bail out
96+
return
97+
}
98+
99+
if (queue.queue.length > 0) {
100+
queue.inProgress = true
101+
102+
const operation = queue.queue.shift()!
103+
setImmediate(() => {
104+
operation.start()
105+
})
106+
}
107+
}
108+
109+
export function startOperationSync<
110+
OperationCallback extends () => Result,
111+
Result = void,
112+
>(dbName: string, callback: OperationCallback) {
113+
const queue = getDatabaseQueue(dbName)
114+
115+
// Database is busy - cannot execute synchronously
116+
if (queue.inProgress || queue.queue.length > 0) {
117+
throw new NitroSQLiteError(
118+
`Cannot run synchronous operation on database. Database ${dbName} is busy with another operation.`
119+
)
120+
}
121+
122+
// Execute synchronously
123+
queue.inProgress = true
124+
const result = callback()
125+
queue.inProgress = false
126+
127+
return result
128+
}

package/src/concurrency.ts

Lines changed: 0 additions & 8 deletions
This file was deleted.

package/src/operations/executeBatch.ts

Lines changed: 27 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -3,78 +3,62 @@ import {
33
replaceWithNativeNullValue,
44
} from '../nullHandling'
55
import { HybridNitroSQLite } from '../nitro'
6-
import { locks, type QueuedOperation } from '../concurrency'
6+
import {
7+
queueOperationAsync,
8+
startOperationSync,
9+
throwIfDatabaseIsNotOpen,
10+
} from '../DatabaseQueue'
711
import type {
812
NativeSQLiteQueryParams,
913
BatchQueryResult,
1014
BatchQueryCommand,
1115
NativeBatchQueryCommand,
1216
} from '../types'
13-
import { startNextOperation } from './transaction'
17+
import NitroSQLiteError from '../NitroSQLiteError'
1418

1519
export function executeBatch(
1620
dbName: string,
1721
commands: BatchQueryCommand[]
1822
): BatchQueryResult {
19-
if (locks[dbName] == null)
20-
throw Error(`Nitro SQLite Error: No lock found on db: ${dbName}`)
23+
throwIfDatabaseIsNotOpen(dbName)
2124

2225
const transformedCommands = isSimpleNullHandlingEnabled()
2326
? toNativeBatchQueryCommands(commands)
2427
: (commands as NativeBatchQueryCommand[])
2528

26-
// If lock is immediately available, execute synchronously
27-
if (!locks[dbName].inProgress && locks[dbName].queue.length === 0) {
28-
locks[dbName].inProgress = true
29-
try {
30-
const result = HybridNitroSQLite.executeBatch(dbName, transformedCommands)
31-
return result
32-
} finally {
33-
locks[dbName].inProgress = false
34-
startNextOperation(dbName)
35-
}
29+
try {
30+
return startOperationSync(dbName, () =>
31+
HybridNitroSQLite.executeBatch(dbName, transformedCommands)
32+
)
33+
} catch (error) {
34+
throw NitroSQLiteError.fromError(error)
3635
}
37-
38-
// Lock is busy - cannot execute synchronously
39-
throw Error(
40-
`Nitro SQLite Error: Database ${dbName} is busy with another operation. Use executeBatchAsync for queued execution.`
41-
)
4236
}
4337

4438
export async function executeBatchAsync(
4539
dbName: string,
4640
commands: BatchQueryCommand[]
4741
): Promise<BatchQueryResult> {
48-
if (locks[dbName] == null)
49-
throw Error(`Nitro SQLite Error: No lock found on db: ${dbName}`)
42+
throwIfDatabaseIsNotOpen(dbName)
5043

5144
const transformedCommands = isSimpleNullHandlingEnabled()
5245
? toNativeBatchQueryCommands(commands)
5346
: (commands as NativeBatchQueryCommand[])
5447

55-
async function run() {
56-
try {
57-
const result = await HybridNitroSQLite.executeBatchAsync(
58-
dbName,
59-
transformedCommands
60-
)
61-
return result
62-
} finally {
63-
locks[dbName]!.inProgress = false
64-
startNextOperation(dbName)
65-
}
48+
try {
49+
return queueOperationAsync(dbName, async () => {
50+
try {
51+
return await HybridNitroSQLite.executeBatchAsync(
52+
dbName,
53+
transformedCommands
54+
)
55+
} catch (error) {
56+
throw NitroSQLiteError.fromError(error)
57+
}
58+
})
59+
} catch (error) {
60+
throw NitroSQLiteError.fromError(error)
6661
}
67-
68-
return new Promise<BatchQueryResult>((resolve, reject) => {
69-
const operation: QueuedOperation = {
70-
start: () => {
71-
run().then(resolve).catch(reject)
72-
},
73-
}
74-
75-
locks[dbName]?.queue.push(operation)
76-
startNextOperation(dbName)
77-
})
7862
}
7963

8064
function toNativeBatchQueryCommands(

package/src/operations/session.ts

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { locks, HybridNitroSQLite } from '../nitro'
1+
import { HybridNitroSQLite } from '../nitro'
22
import { transaction } from './transaction'
33
import type {
44
BatchQueryCommand,
@@ -11,6 +11,8 @@ import type {
1111
} from '../types'
1212
import { execute, executeAsync } from './execute'
1313
import { executeBatch, executeBatchAsync } from './executeBatch'
14+
import NitroSQLiteError from '../NitroSQLiteError'
15+
import { closeDatabaseQueue, openDatabaseQueue } from '../DatabaseQueue'
1416

1517
export function open(
1618
options: NitroSQLiteConnectionOptions
@@ -45,15 +47,19 @@ export function open(
4547
}
4648

4749
export function openDb(dbName: string, location?: string) {
48-
HybridNitroSQLite.open(dbName, location)
49-
50-
locks[dbName] = {
51-
queue: [],
52-
inProgress: false,
50+
try {
51+
HybridNitroSQLite.open(dbName, location)
52+
openDatabaseQueue(dbName)
53+
} catch (error) {
54+
throw NitroSQLiteError.fromError(error)
5355
}
5456
}
5557

5658
export function close(dbName: string) {
57-
HybridNitroSQLite.close(dbName)
58-
delete locks[dbName]
59+
try {
60+
HybridNitroSQLite.close(dbName)
61+
closeDatabaseQueue(dbName)
62+
} catch (error) {
63+
throw NitroSQLiteError.fromError(error)
64+
}
5965
}

0 commit comments

Comments
 (0)