Skip to content

Commit 9ed309b

Browse files
committed
Fix failing tests for number of threads created by DatabasePool
1 parent 00d1a80 commit 9ed309b

File tree

2 files changed

+41
-30
lines changed

2 files changed

+41
-30
lines changed

GRDB/Core/DatabasePool.swift

Lines changed: 20 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -330,22 +330,14 @@ extension DatabasePool: DatabaseReader {
330330
}
331331

332332
public func asyncRead(_ value: @escaping (Result<Database, Error>) -> Void) {
333-
// First async jump in order to grab a reader connection.
334-
// Honor configuration dispatching (qos/targetQueue).
335-
let label = configuration.identifier(
336-
defaultLabel: "GRDB.DatabasePool",
337-
purpose: "asyncRead")
338-
configuration
339-
.makeReaderDispatchQueue(label: label)
340-
.async {
333+
do {
334+
guard let readerPool = self.readerPool else {
335+
throw DatabaseError(resultCode: .SQLITE_MISUSE, message: "Connection is closed")
336+
}
337+
readerPool.async { result in
341338
do {
342-
guard let readerPool = self.readerPool else {
343-
throw DatabaseError.connectionIsClosed()
344-
}
345-
let (reader, releaseReader) = try readerPool.get()
346-
347-
// Second async jump because sync could deadlock if
348-
// configuration has a serial targetQueue.
339+
let (reader, releaseReader) = try result.get()
340+
// Second async jump because that's how `Pool.async` has to be used.
349341
reader.async { db in
350342
defer {
351343
try? db.commit() // Ignore commit error
@@ -364,6 +356,9 @@ extension DatabasePool: DatabaseReader {
364356
value(.failure(error))
365357
}
366358
}
359+
} catch {
360+
value(.failure(error))
361+
}
367362
}
368363

369364
@_disfavoredOverload // SR-15150 Async overloading in protocol implementation fails
@@ -381,22 +376,14 @@ extension DatabasePool: DatabaseReader {
381376
}
382377

383378
public func asyncUnsafeRead(_ value: @escaping (Result<Database, Error>) -> Void) {
384-
// First async jump in order to grab a reader connection.
385-
// Honor configuration dispatching (qos/targetQueue).
386-
let label = configuration.identifier(
387-
defaultLabel: "GRDB.DatabasePool",
388-
purpose: "asyncUnsafeRead")
389-
configuration
390-
.makeReaderDispatchQueue(label: label)
391-
.async {
379+
do {
380+
guard let readerPool = self.readerPool else {
381+
throw DatabaseError(resultCode: .SQLITE_MISUSE, message: "Connection is closed")
382+
}
383+
readerPool.async { result in
392384
do {
393-
guard let readerPool = self.readerPool else {
394-
throw DatabaseError(resultCode: .SQLITE_MISUSE, message: "Connection is closed")
395-
}
396-
let (reader, releaseReader) = try readerPool.get()
397-
398-
// Second async jump because sync could deadlock if
399-
// configuration has a serial targetQueue.
385+
let (reader, releaseReader) = try result.get()
386+
// Second async jump because that's how `Pool.async` has to be used.
400387
reader.async { db in
401388
defer {
402389
releaseReader()
@@ -413,6 +400,9 @@ extension DatabasePool: DatabaseReader {
413400
value(.failure(error))
414401
}
415402
}
403+
} catch {
404+
value(.failure(error))
405+
}
416406
}
417407

418408
public func unsafeReentrantRead<T>(_ value: (Database) throws -> T) throws -> T {

GRDB/Utils/Pool.swift

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,15 @@ final class Pool<T> {
5151
private let itemsSemaphore: DispatchSemaphore // limits the number of elements
5252
private let itemsGroup: DispatchGroup // knows when no element is used
5353
private let barrierQueue: DispatchQueue
54+
private let semaphoreWaitingQueue: DispatchQueue // Inspired by https://khanlou.com/2016/04/the-GCD-handbook/
5455

5556
init(maximumCount: Int, makeElement: @escaping () throws -> T) {
5657
GRDBPrecondition(maximumCount > 0, "Pool size must be at least 1")
5758
self.makeElement = makeElement
5859
self.itemsSemaphore = DispatchSemaphore(value: maximumCount)
5960
self.itemsGroup = DispatchGroup()
6061
self.barrierQueue = DispatchQueue(label: "GRDB.Pool.barrier", attributes: [.concurrent])
62+
self.semaphoreWaitingQueue = DispatchQueue(label: "GRDB.Pool.wait")
6163
}
6264

6365
/// Returns a tuple (element, release)
@@ -87,6 +89,25 @@ final class Pool<T> {
8789
}
8890
}
8991

92+
/// Eventually produces a tuple (element, release), where element is
93+
/// intended to be used asynchronously.
94+
///
95+
/// Client must call release(), only once, after the element has been used.
96+
///
97+
/// - important: The `execute` argument is executed in a serial dispatch
98+
/// queue, so make sure you use the element asynchronously.
99+
func async(_ execute: @escaping (Result<(element: T, release: () -> Void), Error>) -> Void) {
100+
// Inspired by https://khanlou.com/2016/04/the-GCD-handbook/
101+
// > We wait on the semaphore in the serial queue, which means that
102+
// > we’ll have at most one blocked thread when we reach maximum
103+
// > executing blocks on the concurrent queue. Any other tasks the user
104+
// > enqueues will sit inertly on the serial queue waiting to be
105+
// > executed, and won’t cause new threads to be started.
106+
semaphoreWaitingQueue.async {
107+
execute(Result { try self.get() })
108+
}
109+
}
110+
90111
/// Performs a synchronous block with an element. The element turns
91112
/// available after the block has executed.
92113
func get<U>(block: (T) throws -> U) throws -> U {

0 commit comments

Comments
 (0)