@@ -85,22 +85,34 @@ public actor PostgresConnectionPool {
8585// }
8686
8787 /// Takes one connection from the pool and dishes it out to the caller.
88+ ///
89+ /// - Parameters:
90+ /// - batchId: An optional integer value associated with the connection. See also ``abortBatch(_:)``.
91+ /// - callback: A closure with a connection to the database server.
8892 @discardableResult
8993 public func connection< T> (
94+ batchId: Int ? = nil ,
9095 _ callback: ( PostgresConnectionWrapper ) async throws -> T )
9196 async throws -> T
9297 {
9398 var poolConnection : PoolConnection ?
9499
95100 do {
96- poolConnection = try await getConnection ( )
101+ poolConnection = try await getConnection ( batchId : batchId )
97102
98103 if Task . isCancelled {
99104 await releaseConnection ( poolConnection!)
105+
106+ if let batchId {
107+ await abortBatch ( batchId)
108+ }
109+
100110 throw PoolError . cancelled
101111 }
102112
103- let result = try await PostgresConnectionWrapper . distribute ( poolConnection: poolConnection, callback: callback)
113+ let result = try await PostgresConnectionWrapper . distribute (
114+ poolConnection: poolConnection,
115+ callback: callback)
104116
105117 await releaseConnection ( poolConnection!)
106118
@@ -118,11 +130,11 @@ public actor PostgresConnectionPool {
118130 }
119131
120132 /// Adds a connection placeholder to the list of waiting connections.
121- private func getConnection( ) async throws -> PoolConnection {
133+ private func getConnection( batchId : Int ? = nil ) async throws -> PoolConnection {
122134 guard !isShutdown else { throw PoolError . poolDestroyed ( shutdownError) }
123135
124136 return try await withCheckedThrowingContinuation ( { ( continuation: PostgresCheckedContinuation ) in
125- self . continuations. append ( PoolContinuation ( continuation: continuation) )
137+ self . continuations. append ( PoolContinuation ( batchId : batchId , continuation: continuation) )
126138
127139 Task . detached { [ weak self] in
128140 await self ? . handleNextContinuation ( )
@@ -154,11 +166,31 @@ public actor PostgresConnectionPool {
154166 assert ( available. contains ( connection) , " Connections in state 'available' should be available " )
155167 }
156168
169+ connection. query = nil
170+ connection. batchId = nil
171+
157172 Task . detached { [ weak self] in
158173 await self ? . handleNextContinuation ( )
159174 }
160175 }
161176
177+ /// Aborts all waiting queries with the given `batchId`.
178+ public func abortBatch( _ batchId: Int ) async {
179+ let countBefore = continuations. count
180+
181+ continuations. removeAll ( where: { poolContinuation in
182+ guard poolContinuation. batchId == batchId else { return false }
183+
184+ poolContinuation. continuation. resume ( throwing: PoolError . cancelled)
185+ return true
186+ } )
187+
188+ let countRemoved = countBefore - continuations. count
189+ if countRemoved > 0 {
190+ logger. debug ( " [ \( poolName) ] Removed \( countRemoved) continuations for batch \( batchId) " )
191+ }
192+ }
193+
162194 /// Releases all resources in the pool and shuts down the event loop.
163195 /// All further uses of the pool will throw an error.
164196 ///
@@ -220,12 +252,17 @@ public actor PostgresConnectionPool {
220252 }
221253
222254 /// Information about the pool and its open connections.
223- public func poolInfo( ) async -> PoolInfo {
255+ public func poolInfo( batchId : Int ? = nil ) async -> PoolInfo {
224256 let connections = connections. compactMap { connection -> PoolInfo . ConnectionInfo ? in
225- PoolInfo . ConnectionInfo (
257+ if let batchId, connection. batchId != batchId {
258+ return nil
259+ }
260+
261+ return PoolInfo . ConnectionInfo (
226262 id: connection. id,
227263 name: nameForConnection ( id: connection. id) ,
228264 usageCounter: connection. usageCounter,
265+ batchId: connection. batchId,
229266 query: connection. query,
230267 queryRuntime: connection. queryRuntime,
231268 state: connection. state)
@@ -236,7 +273,6 @@ public actor PostgresConnectionPool {
236273 openConnections: connections. count,
237274 activeConnections: connections. count - available. count,
238275 availableConnections: available. count,
239- usageCounter: PoolConnection . globalUsageCounter,
240276 connections: connections,
241277 isShutdown: isShutdown,
242278 shutdownError: shutdownError)
@@ -388,6 +424,7 @@ public actor PostgresConnectionPool {
388424 }
389425
390426 poolConnection. state = . active( Date ( ) )
427+ poolConnection. batchId = poolContinuation. batchId
391428
392429 do {
393430 // Connection check, etc.
0 commit comments