Skip to content

Commit 18e2398

Browse files
authored
Update the project to address concurrency failures for 6.2 build modes. (#362)
* Correct build failures while updating to 6.2 swift build modes * Add a package definition for 5.8 builds * Add some brief explanation for the alteration of the Element in the buffer to an UnsafeTransfer * Fix broken merge for wasi threads
1 parent c537393 commit 18e2398

File tree

11 files changed

+110
-37
lines changed

11 files changed

+110
-37
lines changed

Package.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// swift-tools-version: 5.8
1+
// swift-tools-version: 6.2
22

33
import PackageDescription
44
import CompilerPluginSupport

[email protected]

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// swift-tools-version: 5.8
2+
3+
import PackageDescription
4+
import CompilerPluginSupport
5+
6+
// Availability Macros
7+
8+
let availabilityMacros: [SwiftSetting] = [
9+
.enableExperimentalFeature("AvailabilityMacro=AsyncAlgorithms 1.0:macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0"),
10+
.enableExperimentalFeature("AvailabilityMacro=AsyncAlgorithms 1.1:macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0"),
11+
]
12+
13+
let package = Package(
14+
name: "swift-async-algorithms",
15+
products: [
16+
.library(name: "AsyncAlgorithms", targets: ["AsyncAlgorithms"])
17+
],
18+
targets: [
19+
.target(
20+
name: "AsyncAlgorithms",
21+
dependencies: [
22+
.product(name: "OrderedCollections", package: "swift-collections"),
23+
.product(name: "DequeModule", package: "swift-collections"),
24+
],
25+
swiftSettings: availabilityMacros + [
26+
.enableExperimentalFeature("StrictConcurrency=complete")
27+
]
28+
),
29+
.target(
30+
name: "AsyncSequenceValidation",
31+
dependencies: ["_CAsyncSequenceValidationSupport", "AsyncAlgorithms"],
32+
swiftSettings: availabilityMacros + [
33+
.enableExperimentalFeature("StrictConcurrency=complete")
34+
]
35+
),
36+
.systemLibrary(name: "_CAsyncSequenceValidationSupport"),
37+
.target(
38+
name: "AsyncAlgorithms_XCTest",
39+
dependencies: ["AsyncAlgorithms", "AsyncSequenceValidation"],
40+
swiftSettings: availabilityMacros + [
41+
.enableExperimentalFeature("StrictConcurrency=complete")
42+
]
43+
),
44+
.testTarget(
45+
name: "AsyncAlgorithmsTests",
46+
dependencies: ["AsyncAlgorithms", "AsyncSequenceValidation", "AsyncAlgorithms_XCTest"],
47+
swiftSettings: availabilityMacros + [
48+
.enableExperimentalFeature("StrictConcurrency=complete")
49+
]
50+
),
51+
]
52+
)
53+
54+
if Context.environment["SWIFTCI_USE_LOCAL_DEPS"] == nil {
55+
package.dependencies += [
56+
.package(url: "https://github.com/apple/swift-collections.git", from: "1.1.0"),
57+
]
58+
} else {
59+
package.dependencies += [
60+
.package(path: "../swift-collections")
61+
]
62+
}

Sources/AsyncAlgorithms/Buffer/AsyncBufferSequence.swift

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,14 @@ public struct AsyncBufferSequencePolicy: Sendable {
7474
/// An `AsyncSequence` that buffers elements in regard to a policy.
7575
@available(AsyncAlgorithms 1.0, *)
7676
public struct AsyncBufferSequence<Base: AsyncSequence & Sendable>: AsyncSequence {
77+
// Internal implementation note:
78+
// This type origianlly had no requirement that the element is actually Sendable. However,
79+
// that is technically an implementation detail hole in the safety of the system, it needs
80+
// to specify that the element is actually Sendable since the draining mechanism passes
81+
// through the isolation that is in nature sending but cannot be marked as such for the
82+
// isolated next method.
83+
// In practice the users of this type are safe from isolation crossing since the Element
84+
// is as sendable as it is required by the base sequences the buffer is constructed from.
7785
enum StorageType {
7886
case transparent(Base.AsyncIterator)
7987
case bounded(storage: BoundedBufferStorage<Base>)
@@ -121,9 +129,9 @@ public struct AsyncBufferSequence<Base: AsyncSequence & Sendable>: AsyncSequence
121129
self.storageType = .transparent(iterator)
122130
return element
123131
case .bounded(let storage):
124-
return try await storage.next()?._rethrowGet()
132+
return try await storage.next().wrapped?._rethrowGet()
125133
case .unbounded(let storage):
126-
return try await storage.next()?._rethrowGet()
134+
return try await storage.next().wrapped?._rethrowGet()
127135
}
128136
}
129137
}

Sources/AsyncAlgorithms/Buffer/BoundedBufferStateMachine.swift

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import DequeModule
1515
struct BoundedBufferStateMachine<Base: AsyncSequence> {
1616
typealias Element = Base.Element
1717
typealias SuspendedProducer = UnsafeContinuation<Void, Never>
18-
typealias SuspendedConsumer = UnsafeContinuation<Result<Base.Element, Error>?, Never>
18+
typealias SuspendedConsumer = UnsafeContinuation<UnsafeTransfer<Result<Base.Element, Error>?>, Never>
1919

2020
// We are using UnsafeTransfer here since we have to get the elements from the task
2121
// into the consumer task. This is a transfer but we cannot prove this to the compiler at this point
@@ -137,7 +137,7 @@ struct BoundedBufferStateMachine<Base: AsyncSequence> {
137137

138138
enum ElementProducedAction {
139139
case none
140-
case resumeConsumer(continuation: SuspendedConsumer, result: Result<Element, Error>)
140+
case resumeConsumer(continuation: SuspendedConsumer, result: UnsafeTransfer<Result<Base.Element, Error>?>)
141141
}
142142

143143
mutating func elementProduced(element: Element) -> ElementProducedAction {
@@ -161,7 +161,7 @@ struct BoundedBufferStateMachine<Base: AsyncSequence> {
161161
// we have an awaiting consumer, we can resume it with the element and exit
162162
precondition(buffer.isEmpty, "Invalid state. The buffer should be empty.")
163163
self.state = .buffering(task: task, buffer: buffer, suspendedProducer: nil, suspendedConsumer: nil)
164-
return .resumeConsumer(continuation: suspendedConsumer, result: .success(element))
164+
return .resumeConsumer(continuation: suspendedConsumer, result: UnsafeTransfer(.success(element)))
165165

166166
case .buffering(_, _, .some, _):
167167
preconditionFailure("Invalid state. There should not be a suspended producer.")
@@ -177,7 +177,7 @@ struct BoundedBufferStateMachine<Base: AsyncSequence> {
177177
enum FinishAction {
178178
case none
179179
case resumeConsumer(
180-
continuation: UnsafeContinuation<Result<Base.Element, Error>?, Never>?
180+
continuation: UnsafeContinuation<UnsafeTransfer<Result<Base.Element, Error>?>, Never>?
181181
)
182182
}
183183

@@ -295,7 +295,7 @@ struct BoundedBufferStateMachine<Base: AsyncSequence> {
295295
case resumeProducerAndConsumer(
296296
task: Task<Void, Never>,
297297
producerContinuation: UnsafeContinuation<Void, Never>?,
298-
consumerContinuation: UnsafeContinuation<Result<Base.Element, Error>?, Never>?
298+
consumerContinuation: UnsafeContinuation<UnsafeTransfer<Result<Base.Element, Error>?>, Never>?
299299
)
300300
}
301301

Sources/AsyncAlgorithms/Buffer/BoundedBufferStorage.swift

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ final class BoundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Send
1717
self.stateMachine = ManagedCriticalState(BoundedBufferStateMachine(base: base, limit: limit))
1818
}
1919

20-
func next() async -> Result<Base.Element, Error>? {
20+
func next() async -> UnsafeTransfer<Result<Base.Element, Error>?> {
2121
return await withTaskCancellationHandler {
2222
let action: BoundedBufferStateMachine<Base>.NextAction? = self.stateMachine.withCriticalRegion {
2323
stateMachine in
@@ -45,14 +45,14 @@ final class BoundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Send
4545

4646
case .returnResult(let producerContinuation, let result):
4747
producerContinuation?.resume()
48-
return result
48+
return UnsafeTransfer(result)
4949

5050
case .none:
5151
break
5252
}
5353

5454
return await withUnsafeContinuation {
55-
(continuation: UnsafeContinuation<Result<Base.Element, Error>?, Never>) in
55+
(continuation: UnsafeContinuation<UnsafeTransfer<Result<Base.Element, Error>?>, Never>) in
5656
let action = self.stateMachine.withCriticalRegion { stateMachine in
5757
stateMachine.nextSuspended(continuation: continuation)
5858
}
@@ -61,7 +61,7 @@ final class BoundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Send
6161
break
6262
case .returnResult(let producerContinuation, let result):
6363
producerContinuation?.resume()
64-
continuation.resume(returning: result)
64+
continuation.resume(returning: UnsafeTransfer(result))
6565
}
6666
}
6767
} onCancel: {
@@ -109,6 +109,7 @@ final class BoundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Send
109109
case .none:
110110
break
111111
case .resumeConsumer(let continuation, let result):
112+
112113
continuation.resume(returning: result)
113114
}
114115
}
@@ -120,7 +121,7 @@ final class BoundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Send
120121
case .none:
121122
break
122123
case .resumeConsumer(let continuation):
123-
continuation?.resume(returning: nil)
124+
continuation?.resume(returning: UnsafeTransfer(nil))
124125
}
125126
} catch {
126127
let action = self.stateMachine.withCriticalRegion { stateMachine in
@@ -130,7 +131,7 @@ final class BoundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Send
130131
case .none:
131132
break
132133
case .resumeConsumer(let continuation):
133-
continuation?.resume(returning: .failure(error))
134+
continuation?.resume(returning: UnsafeTransfer(Result<Base.Element, Error>.failure(error)))
134135
}
135136
}
136137
}
@@ -148,7 +149,7 @@ final class BoundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Send
148149
case .resumeProducerAndConsumer(let task, let producerContinuation, let consumerContinuation):
149150
task.cancel()
150151
producerContinuation?.resume()
151-
consumerContinuation?.resume(returning: nil)
152+
consumerContinuation?.resume(returning: UnsafeTransfer(nil))
152153
}
153154
}
154155

Sources/AsyncAlgorithms/Buffer/UnboundedBufferStateMachine.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import DequeModule
1414
@available(AsyncAlgorithms 1.0, *)
1515
struct UnboundedBufferStateMachine<Base: AsyncSequence> {
1616
typealias Element = Base.Element
17-
typealias SuspendedConsumer = UnsafeContinuation<Result<Element, Error>?, Never>
17+
typealias SuspendedConsumer = UnsafeContinuation<UnsafeTransfer<Result<Element, Error>?>, Never>
1818

1919
enum Policy {
2020
case unlimited
@@ -73,7 +73,7 @@ struct UnboundedBufferStateMachine<Base: AsyncSequence> {
7373
case none
7474
case resumeConsumer(
7575
continuation: SuspendedConsumer,
76-
result: Result<Element, Error>
76+
result: UnsafeTransfer<Result<Element, Error>?>
7777
)
7878
}
7979

@@ -108,7 +108,7 @@ struct UnboundedBufferStateMachine<Base: AsyncSequence> {
108108
self.state = .buffering(task: task, buffer: buffer, suspendedConsumer: nil)
109109
return .resumeConsumer(
110110
continuation: suspendedConsumer,
111-
result: .success(element)
111+
result: UnsafeTransfer(.success(element))
112112
)
113113

114114
case .modifying:

Sources/AsyncAlgorithms/Buffer/UnboundedBufferStorage.swift

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ final class UnboundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Se
1717
self.stateMachine = ManagedCriticalState(UnboundedBufferStateMachine<Base>(base: base, policy: policy))
1818
}
1919

20-
func next() async -> Result<Base.Element, Error>? {
20+
func next() async -> UnsafeTransfer<Result<Base.Element, Error>?> {
2121
return await withTaskCancellationHandler {
2222

2323
let action: UnboundedBufferStateMachine<Base>.NextAction? = self.stateMachine.withCriticalRegion {
@@ -42,21 +42,21 @@ final class UnboundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Se
4242
case .suspend:
4343
break
4444
case .returnResult(let result):
45-
return result
45+
return UnsafeTransfer(result)
4646
case .none:
4747
break
4848
}
4949

5050
return await withUnsafeContinuation {
51-
(continuation: UnsafeContinuation<Result<Base.Element, Error>?, Never>) in
51+
(continuation: UnsafeContinuation<UnsafeTransfer<Result<Base.Element, Error>?>, Never>) in
5252
let action = self.stateMachine.withCriticalRegion { stateMachine in
5353
stateMachine.nextSuspended(continuation: continuation)
5454
}
5555
switch action {
5656
case .none:
5757
break
5858
case .resumeConsumer(let result):
59-
continuation.resume(returning: result)
59+
continuation.resume(returning: UnsafeTransfer(result))
6060
}
6161
}
6262
} onCancel: {
@@ -89,7 +89,7 @@ final class UnboundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Se
8989
case .none:
9090
break
9191
case .resumeConsumer(let continuation):
92-
continuation?.resume(returning: nil)
92+
continuation?.resume(returning: UnsafeTransfer(nil))
9393
}
9494
} catch {
9595
let action = self.stateMachine.withCriticalRegion { stateMachine in
@@ -99,7 +99,7 @@ final class UnboundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Se
9999
case .none:
100100
break
101101
case .resumeConsumer(let continuation):
102-
continuation?.resume(returning: .failure(error))
102+
continuation?.resume(returning: UnsafeTransfer(Result<Base.Element, Error>.failure(error)))
103103
}
104104
}
105105
}
@@ -116,7 +116,7 @@ final class UnboundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Se
116116
break
117117
case .resumeConsumer(let task, let continuation):
118118
task.cancel()
119-
continuation?.resume(returning: nil)
119+
continuation?.resume(returning: UnsafeTransfer(nil))
120120
}
121121
}
122122

Sources/AsyncSequenceValidation/Clock.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import AsyncAlgorithms
1313

1414
@available(AsyncAlgorithms 1.0, *)
1515
extension AsyncSequenceValidationDiagram {
16-
public struct Clock {
16+
public struct Clock: Sendable {
1717
let queue: WorkQueue
1818

1919
init(queue: WorkQueue) {

Sources/AsyncSequenceValidation/TaskDriver.swift

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,18 +50,18 @@ func start_thread(_ raw: UnsafeMutableRawPointer) -> UnsafeMutableRawPointer {
5050
#endif
5151

5252
@available(AsyncAlgorithms 1.0, *)
53-
final class TaskDriver {
54-
let work: (TaskDriver) -> Void
53+
final class TaskDriver: Sendable {
54+
let work: @Sendable (TaskDriver) -> Void
5555
let queue: WorkQueue
5656
#if canImport(Darwin) || canImport(wasi_pthread)
57-
var thread: pthread_t?
57+
nonisolated(unsafe) var thread: pthread_t?
5858
#elseif canImport(Glibc) || canImport(Musl) || canImport(Bionic)
59-
var thread = pthread_t()
59+
nonisolated(unsafe) var thread = pthread_t()
6060
#elseif canImport(WinSDK)
6161
#error("TODO: Port TaskDriver threading to windows")
6262
#endif
6363

64-
init(queue: WorkQueue, _ work: @escaping (TaskDriver) -> Void) {
64+
init(queue: WorkQueue, _ work: @Sendable @escaping (TaskDriver) -> Void) {
6565
self.queue = queue
6666
self.work = work
6767
}

Sources/AsyncSequenceValidation/Test.swift

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ extension AsyncSequenceValidationDiagram {
122122
}
123123
}
124124

125-
private static let _executor: AnyObject = {
125+
private static let _executor: AnyObject & Sendable = {
126126
guard #available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) else {
127127
return ClockExecutor_Pre5_9()
128128
}
@@ -134,13 +134,13 @@ extension AsyncSequenceValidationDiagram {
134134
}
135135
#endif
136136

137-
static var clock: Clock?
137+
nonisolated(unsafe) static var clock: Clock?
138138

139-
static var driver: TaskDriver?
139+
nonisolated(unsafe) static var driver: TaskDriver?
140140

141-
static var currentJob: Job?
141+
nonisolated(unsafe) static var currentJob: Job?
142142

143-
static var specificationFailures = [ExpectationFailure]()
143+
nonisolated(unsafe) static var specificationFailures = [ExpectationFailure]()
144144
}
145145

146146
enum ActualResult {

0 commit comments

Comments
 (0)