11import NIOCore
22import Logging
33
4- final class PSQLRowStream {
4+ // Thread safety is guaranteed in the RowStream through dispatching onto the NIO EventLoop.
5+ final class PSQLRowStream : @unchecked Sendable {
56 private typealias AsyncSequenceSource = NIOThrowingAsyncSequenceProducer < DataRow , Error , AdaptiveRowBuffer , PSQLRowStream > . Source
67
78 enum RowSource {
@@ -23,10 +24,7 @@ final class PSQLRowStream {
2324 case iteratingRows( onRow: ( PostgresRow ) throws -> ( ) , EventLoopPromise < Void > , PSQLRowsDataSource )
2425 case waitingForAll( [ PostgresRow ] , EventLoopPromise < [ PostgresRow ] > , PSQLRowsDataSource )
2526 case consumed( Result < String , Error > )
26-
27- #if canImport(_Concurrency)
2827 case asyncSequence( AsyncSequenceSource , PSQLRowsDataSource )
29- #endif
3028 }
3129
3230 internal let rowDescription : [ RowDescription . Column ]
@@ -64,8 +62,7 @@ final class PSQLRowStream {
6462 }
6563
6664 // MARK: Async Sequence
67-
68- #if canImport(_Concurrency)
65+
6966 func asyncSequence( ) -> PostgresRowSequence {
7067 self . eventLoop. preconditionInEventLoop ( )
7168
@@ -150,7 +147,6 @@ final class PSQLRowStream {
150147 preconditionFailure ( " Invalid state: \( self . downstreamState) " )
151148 }
152149 }
153- #endif
154150
155151 // MARK: Consume in array
156152
@@ -312,12 +308,10 @@ final class PSQLRowStream {
312308 self . downstreamState = . waitingForAll( rows, promise, dataSource)
313309 // immediately request more
314310 dataSource. request ( for: self )
315-
316- #if canImport(_Concurrency)
311+
317312 case . asyncSequence( let consumer, let source) :
318313 let yieldResult = consumer. yield ( contentsOf: newRows)
319314 self . executeActionBasedOnYieldResult ( yieldResult, source: source)
320- #endif
321315
322316 case . consumed( . success) :
323317 preconditionFailure ( " How can we receive further rows, if we are supposed to be done " )
@@ -353,12 +347,10 @@ final class PSQLRowStream {
353347 case . waitingForAll( let rows, let promise, _) :
354348 self . downstreamState = . consumed( . success( commandTag) )
355349 promise. succeed ( rows)
356-
357- #if canImport(_Concurrency)
350+
358351 case . asyncSequence( let source, _) :
359352 source. finish ( )
360353 self . downstreamState = . consumed( . success( commandTag) )
361- #endif
362354
363355 case . consumed:
364356 break
@@ -380,13 +372,11 @@ final class PSQLRowStream {
380372 case . waitingForAll( _, let promise, _) :
381373 self . downstreamState = . consumed( . failure( error) )
382374 promise. fail ( error)
383-
384- #if canImport(_Concurrency)
375+
385376 case . asyncSequence( let consumer, _) :
386377 consumer. finish ( error)
387378 self . downstreamState = . consumed( . failure( error) )
388- #endif
389-
379+
390380 case . consumed:
391381 break
392382 }
@@ -432,8 +422,3 @@ protocol PSQLRowsDataSource {
432422 func cancel( for stream: PSQLRowStream )
433423
434424}
435-
436- #if swift(>=5.5)
437- // Thread safety is guaranteed in the RowStream through dispatching onto the NIO EventLoop.
438- extension PSQLRowStream : @unchecked Sendable { }
439- #endif
0 commit comments