Skip to content

Commit a0c542b

Browse files
orobioglbrnttLukasa
authored
Fix NIOAsyncWriter test on concurrency thread pool with single thread (#3135)
### Motivation: The testSuspendingBufferedYield_whenWriterFinished test fails on the Android emulator. See also the discussion in #3044. ### Modifications: The test requires at least two threads in the concurrency thread pool because it blocks one task, which waits for another task to set a condition. This PR adds support for running a task executor based on a NIOThreadPool and uses it for the test. Using a custom task executor guarantees that at least two threads are available for the test. Additionally, the test has been renamed to testWriterFinish_AndSuspendBufferedYield, which is more in line with the other test names. ### Result: The test will pass regardless of the width of the global concurrency thread pool. --------- Co-authored-by: George Barnett <[email protected]> Co-authored-by: Cory Benfield <[email protected]>
1 parent 34d486b commit a0c542b

File tree

4 files changed

+282
-40
lines changed

4 files changed

+282
-40
lines changed

Package.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,7 @@ let package = Package(
446446
"NIOCore",
447447
"NIOEmbedded",
448448
"NIOFoundationCompat",
449+
"NIOTestUtils",
449450
swiftAtomics,
450451
],
451452
swiftSettings: strictConcurrencySettings
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftNIO open source project
4+
//
5+
// Copyright (c) 2025 Apple Inc. and the SwiftNIO project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
#if compiler(>=6)
16+
17+
import DequeModule
18+
import Synchronization
19+
20+
/// Provide a `ManualTaskExecutor` for the duration of the given `body`.
21+
///
22+
/// The executor can be used for setting the executor preference of tasks and fully control
23+
/// when execution of the tasks is performed.
24+
///
25+
/// Example usage:
26+
/// ```swift
27+
/// await withDiscardingTaskGroup { group in
28+
/// await withManualTaskExecutor { taskExecutor in
29+
/// group.addTask(executorPreference: taskExecutor) {
30+
/// print("Running")
31+
/// }
32+
/// taskExecutor.runUntilQueueIsEmpty() // Run the task synchronously
33+
/// }
34+
/// }
35+
/// ```
36+
///
37+
/// - warning: Do not escape the task executor from the closure for later use and make sure that
38+
/// all tasks running on the executor are completely finished before `body` returns.
39+
/// It is highly recommended to use structured concurrency with this task executor.
40+
///
41+
/// - Parameters:
42+
/// - body: The closure that will accept the task executor.
43+
///
44+
/// - Throws: When `body` throws.
45+
///
46+
/// - Returns: The value returned by `body`.
47+
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
48+
@inlinable
49+
package func withManualTaskExecutor<T, Failure>(
50+
body: (ManualTaskExecutor) async throws(Failure) -> T
51+
) async throws(Failure) -> T {
52+
let taskExecutor = ManualTaskExecutor()
53+
defer { taskExecutor.shutdown() }
54+
return try await body(taskExecutor)
55+
}
56+
57+
/// Provide two `ManualTaskExecutor`s for the duration of the given `body`.
58+
///
59+
/// The executors can be used for setting the executor preference of tasks and fully control
60+
/// when execution of the tasks is performed.
61+
///
62+
/// Example usage:
63+
/// ```swift
64+
/// await withDiscardingTaskGroup { group in
65+
/// await withManualTaskExecutor { taskExecutor1, taskExecutor2 in
66+
/// group.addTask(executorPreference: taskExecutor1) {
67+
/// print("Running 1")
68+
/// }
69+
/// group.addTask(executorPreference: taskExecutor2) {
70+
/// print("Running 2")
71+
/// }
72+
/// taskExecutor2.runUntilQueueIsEmpty() // Run second task synchronously
73+
/// taskExecutor1.runUntilQueueIsEmpty() // Run first task synchronously
74+
/// }
75+
/// }
76+
/// ```
77+
///
78+
/// - warning: Do not escape the task executors from the closure for later use and make sure that
79+
/// all tasks running on the executors are completely finished before `body` returns.
80+
/// It is highly recommended to use structured concurrency with these task executors.
81+
///
82+
/// - Parameters:
83+
/// - body: The closure that will accept the task executors.
84+
///
85+
/// - Throws: When `body` throws.
86+
///
87+
/// - Returns: The value returned by `body`.
88+
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
89+
@inlinable
90+
package func withManualTaskExecutor<T, Failure>(
91+
body: (ManualTaskExecutor, ManualTaskExecutor) async throws(Failure) -> T
92+
) async throws(Failure) -> T {
93+
let taskExecutor1 = ManualTaskExecutor()
94+
defer { taskExecutor1.shutdown() }
95+
96+
let taskExecutor2 = ManualTaskExecutor()
97+
defer { taskExecutor2.shutdown() }
98+
99+
return try await body(taskExecutor1, taskExecutor2)
100+
}
101+
102+
/// Manual task executor.
103+
///
104+
/// A `TaskExecutor` that does not use any threadpool or similar mechanism to run the jobs.
105+
/// Jobs are manually run by calling the `runUntilQueueIsEmpty` method.
106+
///
107+
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
108+
@usableFromInline
109+
package final class ManualTaskExecutor: TaskExecutor {
110+
struct Storage {
111+
var isShutdown = false
112+
var jobs = Deque<UnownedJob>()
113+
}
114+
115+
private let storage = Mutex<Storage>(.init())
116+
117+
@usableFromInline
118+
init() {}
119+
120+
/// Run jobs until queue is empty.
121+
///
122+
/// Synchronously runs all enqueued jobs, including any jobs that are enqueued while running.
123+
/// When this function returns, it means that each task running on this executor is either:
124+
/// - suspended
125+
/// - moved (temporarily) to a different executor
126+
/// - finished
127+
///
128+
/// If not all tasks are finished, this function must be called again.
129+
package func runUntilQueueIsEmpty() {
130+
while let job = self.storage.withLock({ $0.jobs.popFirst() }) {
131+
job.runSynchronously(on: self.asUnownedTaskExecutor())
132+
}
133+
}
134+
135+
/// Enqueue a job.
136+
///
137+
/// Called by the concurrency runtime.
138+
///
139+
/// - Parameter job: The job to enqueue.
140+
@usableFromInline
141+
package func enqueue(_ job: UnownedJob) {
142+
self.storage.withLock { storage in
143+
if storage.isShutdown {
144+
fatalError("A job is enqueued after manual executor shutdown")
145+
}
146+
storage.jobs.append(job)
147+
}
148+
}
149+
150+
/// Shutdown.
151+
///
152+
/// Since the manual task executor is not running anything in the background, this is purely to catch
153+
/// any issues due to incorrect usage of the executor. The shutdown verifies that the queue is empty
154+
/// and makes sure that no new jobs can be enqueued.
155+
@usableFromInline
156+
func shutdown() {
157+
self.storage.withLock { storage in
158+
if !storage.jobs.isEmpty {
159+
fatalError("Shutdown of manual executor with jobs in queue")
160+
}
161+
storage.isShutdown = true
162+
}
163+
}
164+
}
165+
166+
#endif // compiler(>=6)

Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift

Lines changed: 46 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import DequeModule
1616
import NIOConcurrencyHelpers
17+
import NIOTestUtils
1718
import XCTest
1819

1920
@testable import NIOCore
@@ -606,49 +607,54 @@ final class NIOAsyncWriterTests: XCTestCase {
606607
self.assert(suspendCallCount: 1, yieldCallCount: 1, terminateCallCount: 1)
607608
}
608609

609-
func testSuspendingBufferedYield_whenWriterFinished() async throws {
610-
self.sink.setWritability(to: false)
611-
612-
let bothSuspended = expectation(description: "suspended on both yields")
613-
let suspendedAgain = ConditionLock(value: false)
614-
self.delegate.didSuspendHandler = {
615-
if self.delegate.didSuspendCallCount == 2 {
616-
bothSuspended.fulfill()
617-
} else if self.delegate.didSuspendCallCount > 2 {
618-
suspendedAgain.lock()
619-
suspendedAgain.unlock(withValue: true)
620-
}
621-
}
622-
623-
self.delegate.didYieldHandler = { _ in
624-
if self.delegate.didYieldCallCount == 1 {
625-
// Delay this yield until the other yield is suspended again.
626-
suspendedAgain.lock(whenValue: true)
627-
suspendedAgain.unlock()
610+
#if compiler(>=6)
611+
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
612+
func testWriterFinish_AndSuspendBufferedYield() async throws {
613+
try await withThrowingTaskGroup(of: Void.self) { group in
614+
try await withManualTaskExecutor { taskExecutor1, taskExecutor2 in
615+
self.sink.setWritability(to: false)
616+
617+
self.delegate.didYieldHandler = { _ in
618+
if self.delegate.didYieldCallCount == 1 {
619+
// This is the yield of the first task. Run the second task until it suspends again
620+
self.assert(suspendCallCount: 2, yieldCallCount: 1, terminateCallCount: 0)
621+
taskExecutor2.runUntilQueueIsEmpty()
622+
self.assert(suspendCallCount: 3, yieldCallCount: 1, terminateCallCount: 0)
623+
}
624+
}
625+
626+
group.addTask(executorPreference: taskExecutor1) { [writer] in
627+
try await writer!.yield("message1")
628+
}
629+
group.addTask(executorPreference: taskExecutor2) { [writer] in
630+
try await writer!.yield("message2")
631+
}
632+
633+
// Run tasks until they are both suspended
634+
taskExecutor1.runUntilQueueIsEmpty()
635+
taskExecutor2.runUntilQueueIsEmpty()
636+
self.assert(suspendCallCount: 2, yieldCallCount: 0, terminateCallCount: 0)
637+
638+
self.writer.finish()
639+
640+
// We have to become writable again to unbuffer the yields
641+
self.sink.setWritability(to: true)
642+
643+
// Run the first task, which will complete its yield
644+
// During this yield, didYieldHandler will run the second task, which will suspend again
645+
taskExecutor1.runUntilQueueIsEmpty()
646+
self.assert(suspendCallCount: 3, yieldCallCount: 1, terminateCallCount: 0)
647+
648+
// Run the second task to complete its yield
649+
taskExecutor2.runUntilQueueIsEmpty()
650+
self.assert(suspendCallCount: 3, yieldCallCount: 2, terminateCallCount: 1)
651+
652+
await XCTAssertNoThrow(try await group.next())
653+
await XCTAssertNoThrow(try await group.next())
628654
}
629655
}
630-
631-
let task1 = Task { [writer] in
632-
try await writer!.yield("message1")
633-
}
634-
let task2 = Task { [writer] in
635-
try await writer!.yield("message2")
636-
}
637-
638-
await fulfillment(of: [bothSuspended], timeout: 1)
639-
self.writer.finish()
640-
641-
self.assert(suspendCallCount: 2, yieldCallCount: 0, terminateCallCount: 0)
642-
643-
// We have to become writable again to unbuffer the yields
644-
// The first call to didYield will pause, so that the other yield will be suspended again.
645-
self.sink.setWritability(to: true)
646-
647-
await XCTAssertNoThrow(try await task1.value)
648-
await XCTAssertNoThrow(try await task2.value)
649-
650-
self.assert(suspendCallCount: 3, yieldCallCount: 2, terminateCallCount: 1)
651656
}
657+
#endif // compiler(>=6)
652658

653659
func testWriterFinish_whenFinished() {
654660
// This tests just checks that finishing again is a no-op
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftNIO open source project
4+
//
5+
// Copyright (c) 2025 Apple Inc. and the SwiftNIO project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
#if compiler(>=6)
16+
17+
import NIOTestUtils
18+
import Synchronization
19+
import XCTest
20+
21+
class ManualTaskExecutorTest: XCTestCase {
22+
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
23+
func testManualTaskExecutor() async {
24+
await withDiscardingTaskGroup { group in
25+
await withManualTaskExecutor { taskExecutor in
26+
let taskDidRun = Mutex(false)
27+
28+
group.addTask(executorPreference: taskExecutor) {
29+
taskDidRun.withLock { $0 = true }
30+
}
31+
32+
// Run task
33+
XCTAssertFalse(taskDidRun.withLock { $0 })
34+
taskExecutor.runUntilQueueIsEmpty()
35+
XCTAssertTrue(taskDidRun.withLock { $0 })
36+
}
37+
}
38+
}
39+
40+
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
41+
func testTwoManualTaskExecutors() async {
42+
await withDiscardingTaskGroup { group in
43+
await withManualTaskExecutor { taskExecutor1, taskExecutor2 in
44+
let task1DidRun = Mutex(false)
45+
let task2DidRun = Mutex(false)
46+
47+
group.addTask(executorPreference: taskExecutor1) {
48+
task1DidRun.withLock { $0 = true }
49+
}
50+
51+
group.addTask(executorPreference: taskExecutor2) {
52+
task2DidRun.withLock { $0 = true }
53+
}
54+
55+
// Run task 1
56+
XCTAssertFalse(task1DidRun.withLock { $0 })
57+
taskExecutor1.runUntilQueueIsEmpty()
58+
XCTAssertTrue(task1DidRun.withLock { $0 })
59+
60+
// Run task 2
61+
XCTAssertFalse(task2DidRun.withLock { $0 })
62+
taskExecutor2.runUntilQueueIsEmpty()
63+
XCTAssertTrue(task2DidRun.withLock { $0 })
64+
}
65+
}
66+
}
67+
}
68+
69+
#endif // compiler(>=6)

0 commit comments

Comments
 (0)