Skip to content

Commit f0defd8

Browse files
committed
[Concurrency] Add CooperativeExecutor, use it.
Also tweak the sleep implementations to let the hooks run if there isn't a `SchedulableExecutor` (for hooked mode). rdar://141348916
1 parent 14b0e73 commit f0defd8

12 files changed

+585
-143
lines changed

stdlib/public/Concurrency/CMakeLists.txt

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -93,18 +93,6 @@ set(SWIFT_RUNTIME_CONCURRENCY_C_SOURCES
9393
linker-support/magic-symbols-for-install-name.c
9494
)
9595

96-
set(SWIFT_RUNTIME_CONCURRENCY_EXECUTOR_SOURCES)
97-
if("${SWIFT_CONCURRENCY_GLOBAL_EXECUTOR}" STREQUAL "dispatch")
98-
set(SWIFT_RUNTIME_CONCURRENCY_EXECUTOR_SOURCES
99-
DispatchGlobalExecutor.cpp
100-
)
101-
endif()
102-
103-
set(LLVM_OPTIONAL_SOURCES
104-
CooperativeGlobalExecutor.cpp
105-
DispatchGlobalExecutor.cpp
106-
)
107-
10896
set(SWIFT_RUNTIME_CONCURRENCY_SWIFT_SOURCES
10997
Actor.swift
11098
AsyncLet.swift
@@ -133,6 +121,7 @@ set(SWIFT_RUNTIME_CONCURRENCY_SWIFT_SOURCES
133121
GlobalActor.swift
134122
GlobalConcurrentExecutor.swift
135123
MainActor.swift
124+
PriorityQueue.swift
136125
SourceCompatibilityShims.swift
137126
Task.swift
138127
Task+PriorityEscalation.swift
@@ -169,21 +158,44 @@ set(SWIFT_RUNTIME_CONCURRENCY_SWIFT_SOURCES
169158
ContinuousClock.swift
170159
SuspendingClock.swift
171160
TaskSleepDuration.swift
172-
DispatchExecutor.swift
173-
CFExecutor.swift
174161
DummyExecutor.swift
162+
CooperativeExecutor.swift
175163
PlatformExecutorDarwin.swift
176164
PlatformExecutorLinux.swift
177165
PlatformExecutorWindows.swift
178-
PlatformExecutorWASI.swift
179166
)
180167

181-
set(SWIFT_RUNTIME_CONCURRENCY_NONEMBEDDED_SWIFT_SOURCES
182-
ExecutorImpl.swift
183-
)
168+
set(SWIFT_RUNTIME_CONCURRENCY_EXECUTOR_SOURCES)
169+
set(SWIFT_RUNTIME_CONCURRENCY_NONEMBEDDED_SWIFT_SOURCES)
170+
if("${SWIFT_CONCURRENCY_GLOBAL_EXECUTOR}" STREQUAL "dispatch")
171+
set(SWIFT_RUNTIME_CONCURRENCY_EXECUTOR_SOURCES
172+
DispatchGlobalExecutor.cpp
173+
)
174+
set(SWIFT_RUNTIME_CONCURRENCY_NONEMBEDDED_SWIFT_SOURCES
175+
DispatchExecutor.swift
176+
CFExecutor.swift
177+
ExecutorImpl.swift
178+
)
179+
elseif("${SWIFT_CONCURRENCY_GLOBAL_EXECUTOR}" STREQUAL "singlethreaded")
180+
set(SWIFT_RUNTIME_CONCURRENCYU_NONEMBEDDED_SWIFT_SOURCES
181+
ExecutorImpl.swift
182+
PlatformExecutorCooperative.swift
183+
)
184+
else()
185+
set(SWIFT_RUNTIME_CONCURRENCY_NONEMBEDDED_SWIFT_SOURCES
186+
ExecutorImpl.swift
187+
PlatformExecutorNone.swift
188+
)
189+
endif()
184190

185191
set(SWIFT_RUNTIME_CONCURRENCY_EMBEDDED_SWIFT_SOURCES
186-
PlatformExecutorEmbedded.swift
192+
PlatformExecutorNone.swift
193+
)
194+
195+
set(LLVM_OPTIONAL_SOURCES
196+
DispatchGlobalExecutor.cpp
197+
CooperativeGlobalExecutor.cpp
198+
DispatchGlobalExecutor.cpp
187199
)
188200

189201
add_swift_target_library(swift_Concurrency ${SWIFT_STDLIB_LIBRARY_BUILD_TYPES} IS_STDLIB

stdlib/public/Concurrency/Clock.cpp

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,16 @@
2121
#include <realtimeapiset.h>
2222
#endif
2323

24+
#if __has_include(<chrono>)
25+
#define WE_HAVE_STD_CHRONO 1
26+
#include <chrono>
27+
28+
#if __has_include(<thread>)
29+
#define WE_HAVE_STD_THIS_THREAD 1
30+
#include <thread>
31+
#endif
32+
#endif // __has_include(<chrono>)
33+
2434
#include "Error.h"
2535

2636
using namespace swift;
@@ -50,6 +60,14 @@ void swift_get_time(
5060
(void)QueryInterruptTimePrecise(&interruptTime);
5161
continuous.tv_sec = interruptTime / 10'000'000;
5262
continuous.tv_nsec = (interruptTime % 10'000'000) * 100;
63+
#elif WE_HAVE_STD_CHRONO
64+
auto now = std::chrono::steady_clock::now();
65+
auto epoch = std::chrono::steady_clock::min();
66+
auto timeSinceEpoch = now - epoch;
67+
auto sec = std::chrono::duration_cast<std::chrono::seconds>(timeSinceEpoch);
68+
auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>(timeSinceEpoch - sec);
69+
continuous.tv_sec = sec;
70+
continuous.tv_nsec = ns;
5371
#else
5472
#error Missing platform continuous time definition
5573
#endif
@@ -77,6 +95,14 @@ void swift_get_time(
7795
(void)QueryUnbiasedInterruptTimePrecise(&unbiasedTime);
7896
suspending.tv_sec = unbiasedTime / 10'000'000;
7997
suspending.tv_nsec = (unbiasedTime % 10'000'000) * 100;
98+
#elif WE_HAVE_STD_CHRONO
99+
auto now = std::chrono::steady_clock::now();
100+
auto epoch = std::chrono::steady_clock::min();
101+
auto timeSinceEpoch = now - epoch;
102+
auto sec = std::chrono::duration_cast<std::chrono::seconds>(timeSinceEpoch);
103+
auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>(timeSinceEpoch - sec);
104+
suspending.tv_sec = sec;
105+
suspending.tv_nsec = ns;
80106
#else
81107
#error Missing platform suspending time definition
82108
#endif
@@ -107,6 +133,11 @@ switch (clock_id) {
107133
#elif defined(_WIN32)
108134
continuous.tv_sec = 0;
109135
continuous.tv_nsec = 100;
136+
#elif WE_HAVE_STD_CHRONO
137+
auto num = std::chrono::steady_clock::period::num;
138+
auto den = std::chrono::steady_clock::period::den;
139+
continuous.tv_sec = num / den;
140+
continuous.tv_nsec = (num * 1000000000ll) % den
110141
#else
111142
#error Missing platform continuous time definition
112143
#endif
@@ -127,6 +158,11 @@ switch (clock_id) {
127158
#elif defined(_WIN32)
128159
suspending.tv_sec = 0;
129160
suspending.tv_nsec = 100;
161+
#elif WE_HAVE_STD_CHRONO
162+
auto num = std::chrono::steady_clock::period::num;
163+
auto den = std::chrono::steady_clock::period::den;
164+
continuous.tv_sec = num / den;
165+
continuous.tv_nsec = (num * 1'000'000'000ll) % den
130166
#else
131167
#error Missing platform suspending time definition
132168
#endif
@@ -138,3 +174,43 @@ switch (clock_id) {
138174
swift_Concurrency_fatalError(0, "Fatal error: invalid clock ID %d\n",
139175
clock_id);
140176
}
177+
178+
SWIFT_EXPORT_FROM(swift_Concurrency)
179+
SWIFT_CC(swift)
180+
void swift_sleep(
181+
long long seconds,
182+
long long nanoseconds) {
183+
#if defined(_WIN32)
184+
ULONGLONG now;
185+
(void)QueryInterruptTimePrecise(&now);
186+
ULONGLONG delay = seconds * 10'000'000 + nanoseconds / 100;
187+
ULONGLONG deadline = now + delay;
188+
while (deadline > now) {
189+
DWORD dwMsec = delay / 10'000;
190+
191+
// For sleeps over 15ms, Windows may return up to 15ms early(!);
192+
// for sleeps less than 15ms, Windows does a delay koop internally,
193+
// which is acceptable here.
194+
if (dwMsec > 15)
195+
deMsec += 15;
196+
197+
(void)SleepEx(dwMsec, TRUE);
198+
(void)QueryInterruptTimePrecise(&now);
199+
delay = deadline - now;
200+
}
201+
#elif defined(__linux__) || defined(__APPLE__) || defined(__wasi__) \
202+
|| defined(__OpenBSD) || defined(__FreeBSD__)
203+
struct timespec ts;
204+
ts.tv_sec = seconds;
205+
ts.tv_nsec = nanoseconds;
206+
while (nanosleep(&ts, &ts) == -1 && errno == EINTR);
207+
#elif WE_HAVE_STD_THIS_THREAD && !defined(SWIFT_THREADING_NONE)
208+
auto duration
209+
= std::chrono::duration_cast<std::chrono::steady_clock::duration>(
210+
std::chrono::seconds(seconds) + std::chrono::nanoseconds(nanoseconds)
211+
);
212+
std::this_thread::sleep_for(duration);
213+
#else
214+
#error Missing platform sleep definition
215+
#endif
216+
}

stdlib/public/Concurrency/Clock.swift

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,3 +243,9 @@ internal func _getClockRes(
243243
seconds: UnsafeMutablePointer<Int64>,
244244
nanoseconds: UnsafeMutablePointer<Int64>,
245245
clock: CInt)
246+
247+
@available(SwiftStdlib 6.2, *)
248+
@_silgen_name("swift_sleep")
249+
internal func _sleep(
250+
seconds: Int64,
251+
nanoseconds: Int64)
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift.org open source project
4+
//
5+
// Copyright (c) 2025 Apple Inc. and the Swift project authors
6+
// Licensed under Apache License v2.0 with Runtime Library Exception
7+
//
8+
// See https://swift.org/LICENSE.txt for license information
9+
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
10+
//
11+
//===----------------------------------------------------------------------===//
12+
13+
import Swift
14+
15+
extension ExecutorJob {
16+
fileprivate var cooperativeExecutorTimestamp: CooperativeExecutor.Timestamp {
17+
get {
18+
return unsafe withUnsafeExecutorPrivateData {
19+
return unsafe $0.assumingMemoryBound(
20+
to: CooperativeExecutor.Timestamp.self
21+
)[0]
22+
}
23+
}
24+
set {
25+
unsafe withUnsafeExecutorPrivateData {
26+
unsafe $0.withMemoryRebound(to: CooperativeExecutor.Timestamp.self) {
27+
unsafe $0[0] = newValue
28+
}
29+
}
30+
}
31+
}
32+
}
33+
34+
/// A co-operative executor that can be used as the main executor or as a
35+
/// task executor.
36+
class CooperativeExecutor: Executor, @unchecked Sendable {
37+
var runQueue: PriorityQueue<UnownedJob>
38+
var waitQueue: PriorityQueue<UnownedJob>
39+
var shouldStop: Bool = false
40+
41+
/// Internal representation of a duration for CooperativeExecutor
42+
struct Duration {
43+
var seconds: Int64
44+
var nanoseconds: Int64
45+
46+
init(seconds: Int64, nanoseconds: Int64) {
47+
self.seconds = seconds
48+
self.nanoseconds = nanoseconds
49+
}
50+
51+
init(from duration: Swift.Duration) {
52+
let (seconds, attoseconds) = duration.components
53+
self.seconds = seconds
54+
self.nanoseconds = attoseconds / 1_000_000_000
55+
}
56+
}
57+
58+
/// Internal representation of a timestamp for CooperativeExecutor
59+
struct Timestamp: Comparable {
60+
var seconds: Int64
61+
var nanoseconds: Int64
62+
63+
static var zero: Timestamp {
64+
return Timestamp(seconds: 0, nanoseconds: 0)
65+
}
66+
67+
static func == (lhs: Timestamp, rhs: Timestamp) -> Bool {
68+
return lhs.seconds == rhs.seconds && lhs.nanoseconds == rhs.nanoseconds
69+
}
70+
static func < (lhs: Timestamp, rhs: Timestamp) -> Bool {
71+
return lhs.seconds < rhs.seconds || (
72+
lhs.seconds == rhs.seconds
73+
&& lhs.nanoseconds < rhs.nanoseconds
74+
)
75+
}
76+
static func - (lhs: Timestamp, rhs: Timestamp) -> Duration {
77+
if lhs.nanoseconds < rhs.nanoseconds {
78+
return Duration(seconds: lhs.seconds - rhs.seconds - 1,
79+
nanoseconds: 1_000_000_000 + lhs.nanoseconds
80+
- rhs.nanoseconds)
81+
}
82+
return Duration(seconds: lhs.seconds - rhs.seconds,
83+
nanoseconds: lhs.nanoseconds - rhs.nanoseconds)
84+
}
85+
static func + (lhs: Timestamp, rhs: Duration) -> Timestamp {
86+
var seconds = lhs.seconds + rhs.seconds
87+
var nanoseconds = lhs.nanoseconds + rhs.nanoseconds
88+
// Normally will run only once
89+
while nanoseconds > 1_000_000_000 {
90+
seconds += 1
91+
nanoseconds -= 1_000_000_000
92+
}
93+
return Timestamp(seconds: seconds, nanoseconds: nanoseconds)
94+
}
95+
}
96+
97+
public init() {
98+
runQueue = PriorityQueue(compare: { $0.priority > $1.priority })
99+
waitQueue =
100+
PriorityQueue(compare: {
101+
ExecutorJob($0).cooperativeExecutorTimestamp
102+
< ExecutorJob($1).cooperativeExecutorTimestamp
103+
})
104+
}
105+
106+
public func enqueue(_ job: consuming ExecutorJob) {
107+
runQueue.push(UnownedJob(job))
108+
}
109+
110+
public var isMainExecutor: Bool { true }
111+
112+
public var asSchedulable: any SchedulableExecutor { self }
113+
}
114+
115+
extension CooperativeExecutor: SchedulableExecutor {
116+
var currentTime: Timestamp {
117+
var now: Timestamp = .zero
118+
unsafe _getTime(seconds: &now.seconds,
119+
nanoseconds: &now.nanoseconds,
120+
clock: _ClockID.suspending.rawValue)
121+
return now
122+
}
123+
124+
public func enqueue<C: Clock>(_ job: consuming ExecutorJob,
125+
after delay: C.Duration,
126+
tolerance: C.Duration? = nil,
127+
clock: C) {
128+
let duration = Duration(from: clock.convert(from: delay)!)
129+
let deadline = self.currentTime + duration
130+
131+
job.cooperativeExecutorTimestamp = deadline
132+
waitQueue.push(UnownedJob(job))
133+
}
134+
}
135+
136+
extension CooperativeExecutor: RunLoopExecutor {
137+
public func run() throws {
138+
try runUntil { false }
139+
}
140+
141+
public func runUntil(_ condition: () -> Bool) throws {
142+
shouldStop = false
143+
while !shouldStop && !condition() {
144+
// Process the timer queue
145+
let now = currentTime
146+
while let job = waitQueue.pop(when: {
147+
ExecutorJob($0).cooperativeExecutorTimestamp <= now
148+
}) {
149+
runQueue.push(job)
150+
}
151+
152+
// Now run any queued jobs
153+
while let job = runQueue.pop() {
154+
unsafe ExecutorJob(job).runSynchronously(
155+
on: self.asUnownedSerialExecutor()
156+
)
157+
}
158+
159+
// Finally, wait until the next deadline
160+
if let job = waitQueue.top {
161+
let deadline = ExecutorJob(job).cooperativeExecutorTimestamp
162+
let now = self.currentTime
163+
if deadline > now {
164+
let toWait = deadline - now
165+
_sleep(seconds: toWait.seconds,
166+
nanoseconds: toWait.nanoseconds)
167+
}
168+
} else {
169+
// Stop if no more jobs are available
170+
break
171+
}
172+
}
173+
}
174+
175+
public func stop() {
176+
shouldStop = true
177+
}
178+
}
179+
180+
extension CooperativeExecutor: SerialExecutor {}
181+
182+
extension CooperativeExecutor: TaskExecutor {}
183+
184+
extension CooperativeExecutor: MainExecutor {}

0 commit comments

Comments
 (0)