Skip to content

Commit 7ba01a2

Browse files
committed
[Concurrency] Updates after second SE pitch.
We no longer attempt to convert timestamps from the passed-in `Clock` in order to allow any clock to work with any executor. Instead, executors that do not recognise a clock should call the `enqueue` function on that `Clock`, which lets the `Clock` itself decide how to proceed. Additionally, rename `SchedulableExecutor` to `SchedulingExecutor`.
1 parent a552f73 commit 7ba01a2

18 files changed

+532
-250
lines changed

lib/SILGen/SILGen.cpp

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -523,7 +523,36 @@ FuncDecl *SILGenModule::getExit() {
523523
Type SILGenModule::getConfiguredExecutorFactory() {
524524
auto &ctx = getASTContext();
525525

526-
// Look in the main module for a typealias
526+
// First look in the @main struct, if any
527+
NominalTypeDecl *mainType = ctx.MainModule->getMainTypeDecl();
528+
if (mainType) {
529+
SmallVector<ValueDecl *, 1> decls;
530+
auto identifier = ctx.getIdentifier("DefaultExecutorFactory");
531+
mainType->lookupQualified(mainType,
532+
DeclNameRef(identifier),
533+
SourceLoc(),
534+
NL_RemoveNonVisible | NL_RemoveOverridden
535+
| NL_OnlyTypes | NL_ProtocolMembers,
536+
decls);
537+
for (auto decl : decls) {
538+
TypeDecl *typeDecl = dyn_cast<TypeDecl>(decl);
539+
if (typeDecl) {
540+
if (auto *nominalDecl = dyn_cast<NominalTypeDecl>(typeDecl)) {
541+
return nominalDecl->getDeclaredType();
542+
}
543+
544+
if (isa<AssociatedTypeDecl>(typeDecl)) {
545+
// We ignore associatedtype declarations; those with a default will
546+
// turn into a `typealias` instead.
547+
continue;
548+
}
549+
550+
return typeDecl->getDeclaredInterfaceType();
551+
}
552+
}
553+
}
554+
555+
// Failing that, look at the top level
527556
Type factory = ctx.getNamedSwiftType(ctx.MainModule, "DefaultExecutorFactory");
528557

529558
// If we don't find it, fall back to _Concurrency.PlatformExecutorFactory

stdlib/public/Concurrency/CFExecutor.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ enum CoreFoundation {
4343

4444
// .. Main Executor ............................................................
4545

46+
/// A CFRunLoop-based main executor (Apple platforms only)
4647
@available(StdlibDeploymentTarget 6.2, *)
4748
final class CFMainExecutor: DispatchMainExecutor, @unchecked Sendable {
4849

@@ -58,6 +59,7 @@ final class CFMainExecutor: DispatchMainExecutor, @unchecked Sendable {
5859

5960
// .. Task Executor ............................................................
6061

62+
/// A `TaskExecutor` to match `CFMainExecutor` (Apple platforms only)
6163
@available(StdlibDeploymentTarget 6.2, *)
6264
final class CFTaskExecutor: DispatchGlobalTaskExecutor,
6365
@unchecked Sendable {

stdlib/public/Concurrency/Clock.swift

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,28 @@ public protocol Clock<Duration>: Sendable {
4343
#endif
4444
}
4545

46+
#if !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
47+
extension Clock {
48+
// The default implementation works by creating a trampoline and calling
49+
// the run() method.
50+
@available(StdlibDeploymentTarget 6.2, *)
51+
func enqueue(_ job: consuming ExecutorJob,
52+
on executor: some Executor,
53+
at instant: Instant, tolerance: Duration?) {
54+
let trampoline = job.createTrampoline(to: executor)
55+
run(trampoline, at: instant, tolerance: tolerance)
56+
}
57+
58+
// Clocks that do not implement run will fatalError() if you try to use
59+
// them with an executor that does not understand them.
60+
@available(StdlibDeploymentTarget 6.2, *)
61+
func run(_ job: consuming ExecutorJob,
62+
at instant: Instant, tolerance: Duration?) {
63+
fatalError("\(Self.self) does not implement run(_:at:tolerance:).")
64+
}
65+
}
66+
#endif // !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
67+
4668
@available(StdlibDeploymentTarget 5.7, *)
4769
extension Clock {
4870
/// Measure the elapsed time to execute a closure.
@@ -117,6 +139,7 @@ extension Clock {
117139
enum _ClockID: Int32 {
118140
case continuous = 1
119141
case suspending = 2
142+
case walltime = 3
120143
}
121144

122145
@available(StdlibDeploymentTarget 5.7, *)

stdlib/public/Concurrency/ContinuousClock.swift

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,3 +201,36 @@ extension ContinuousClock.Instant: InstantProtocol {
201201
rhs.duration(to: lhs)
202202
}
203203
}
204+
205+
#if !$Embedded && !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
206+
@available(StdlibDeploymentTarget 6.2, *)
207+
extension ContinuousClock {
208+
209+
public func run(_ job: consuming ExecutorJob,
210+
at instant: Instant,
211+
tolerance: Duration?) {
212+
guard let executor = Task<Never,Never>.currentSchedulingExecutor else {
213+
fatalError("no scheduling executor is available")
214+
}
215+
216+
executor.enqueue(job, at: instant,
217+
tolerance: tolerance,
218+
clock: self)
219+
}
220+
221+
public func enqueue(_ job: consuming ExecutorJob,
222+
on executor: some Executor,
223+
at instant: Instant,
224+
tolerance: Duration?) {
225+
if let schedulingExecutor = executor.asSchedulingExecutor {
226+
schedulingExecutor.enqueue(job, at: instant,
227+
tolerance: tolerance,
228+
clock: self)
229+
} else {
230+
let trampoline = job.createTrampoline(to: executor)
231+
run(trampoline, at: instant, tolerance: tolerance)
232+
}
233+
}
234+
235+
}
236+
#endif

stdlib/public/Concurrency/CooperativeExecutor.swift

Lines changed: 126 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -97,16 +97,77 @@ extension ExecutorJob {
9797
}
9898
}
9999

100+
#if !$Embedded && !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
101+
/// A wait queue is a specialised priority queue used to run a timer.
102+
@available(StdlibDeploymentTarget 6.2, *)
103+
struct WaitQueue {
104+
var queue: PriorityQueue<UnownedJob>
105+
var clock: _ClockID
106+
107+
init(clock: _ClockID) {
108+
queue = PriorityQueue(compare: {
109+
ExecutorJob($0).cooperativeExecutorTimestamp
110+
< ExecutorJob($1).cooperativeExecutorTimestamp
111+
})
112+
self.clock = clock
113+
}
114+
115+
var currentTime: CooperativeExecutor.Timestamp {
116+
var now: CooperativeExecutor.Timestamp = .zero
117+
unsafe _getTime(seconds: &now.seconds,
118+
nanoseconds: &now.nanoseconds,
119+
clock: clock.rawValue)
120+
return now
121+
}
122+
123+
mutating func enqueue(_ job: consuming ExecutorJob,
124+
after delay: CooperativeExecutor.Duration) {
125+
let deadline = currentTime + delay
126+
job.setupCooperativeExecutorTimestamp()
127+
job.cooperativeExecutorTimestamp = deadline
128+
queue.push(UnownedJob(job))
129+
}
130+
131+
mutating func forEachReadyJob(body: (consuming ExecutorJob) -> ()) {
132+
let now = currentTime
133+
while let job = queue.pop(
134+
when: {
135+
ExecutorJob($0).cooperativeExecutorTimestamp < now
136+
}) {
137+
var theJob = ExecutorJob(job)
138+
theJob.clearCooperativeExecutorTimestamp()
139+
body(theJob)
140+
}
141+
}
142+
143+
var timeToNextJob: CooperativeExecutor.Duration? {
144+
if let job = queue.top {
145+
let deadline = ExecutorJob(job).cooperativeExecutorTimestamp
146+
let now = currentTime
147+
if deadline > now {
148+
return deadline - now
149+
} else {
150+
return CooperativeExecutor.Duration(seconds: 0, nanoseconds: 0)
151+
}
152+
}
153+
return nil
154+
}
155+
}
156+
#endif
157+
100158
/// A co-operative executor that can be used as the main executor or as a
101159
/// task executor.
102160
@available(StdlibDeploymentTarget 6.2, *)
103-
class CooperativeExecutor: Executor, @unchecked Sendable {
161+
final class CooperativeExecutor: Executor, @unchecked Sendable {
104162
var runQueue: PriorityQueue<UnownedJob>
105-
var waitQueue: PriorityQueue<UnownedJob>
163+
#if !$Embedded && !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
164+
var suspendingWaitQueue = WaitQueue(clock: .suspending)
165+
var continuousWaitQueue = WaitQueue(clock: .continuous)
166+
#endif
106167
var shouldStop: Bool = false
107168

108169
/// Internal representation of a duration for CooperativeExecutor
109-
struct Duration {
170+
struct Duration: Comparable {
110171
var seconds: Int64
111172
var nanoseconds: Int64
112173

@@ -120,6 +181,16 @@ class CooperativeExecutor: Executor, @unchecked Sendable {
120181
self.seconds = seconds
121182
self.nanoseconds = attoseconds / 1_000_000_000
122183
}
184+
185+
static func == (lhs: Duration, rhs: Duration) -> Bool {
186+
return lhs.seconds == rhs.seconds && lhs.nanoseconds == rhs.nanoseconds
187+
}
188+
static func < (lhs: Duration, rhs: Duration) -> Bool {
189+
return lhs.seconds < rhs.seconds || (
190+
lhs.seconds == rhs.seconds
191+
&& lhs.nanoseconds < rhs.nanoseconds
192+
)
193+
}
123194
}
124195

125196
/// Internal representation of a timestamp for CooperativeExecutor
@@ -163,48 +234,53 @@ class CooperativeExecutor: Executor, @unchecked Sendable {
163234

164235
public init() {
165236
runQueue = PriorityQueue(compare: { $0.priority > $1.priority })
166-
waitQueue =
167-
PriorityQueue(compare: {
168-
ExecutorJob($0).cooperativeExecutorTimestamp
169-
< ExecutorJob($1).cooperativeExecutorTimestamp
170-
})
171237
}
172238

173239
public func enqueue(_ job: consuming ExecutorJob) {
174240
runQueue.push(UnownedJob(job))
175241
}
176242

177243
public var isMainExecutor: Bool { true }
178-
179-
public var asSchedulable: any SchedulableExecutor { self }
180244
}
181245

246+
#if !$Embedded && !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
182247
@available(StdlibDeploymentTarget 6.2, *)
183-
extension CooperativeExecutor: SchedulableExecutor {
184-
var currentTime: Timestamp {
248+
extension CooperativeExecutor: SchedulingExecutor {
249+
250+
public var asScheduling: (any SchedulingExecutor)? {
251+
return self
252+
}
253+
254+
func currentTime(clock: _ClockID) -> Timestamp {
185255
var now: Timestamp = .zero
186256
unsafe _getTime(seconds: &now.seconds,
187257
nanoseconds: &now.nanoseconds,
188-
clock: _ClockID.suspending.rawValue)
258+
clock: clock.rawValue)
189259
return now
190260
}
191261

192262
public func enqueue<C: Clock>(_ job: consuming ExecutorJob,
193263
after delay: C.Duration,
194264
tolerance: C.Duration? = nil,
195265
clock: C) {
196-
guard let swiftDuration = delay as? Swift.Duration else {
197-
fatalError("Unsupported clock")
266+
// If it's a clock we know, get the duration to wait
267+
if let _ = clock as? ContinuousClock {
268+
let continuousDuration = delay as! ContinuousClock.Duration
269+
let duration = Duration(from: continuousDuration)
270+
continuousWaitQueue.enqueue(job, after: duration)
271+
} else if let _ = clock as? SuspendingClock {
272+
let suspendingDuration = delay as! SuspendingClock.Duration
273+
let duration = Duration(from: suspendingDuration)
274+
suspendingWaitQueue.enqueue(job, after: duration)
275+
} else {
276+
clock.enqueue(job, on: self, at: clock.now.advanced(by: delay),
277+
tolerance: tolerance)
278+
return
198279
}
199-
200-
let duration = Duration(from: swiftDuration)
201-
let deadline = self.currentTime + duration
202-
203-
job.setupCooperativeExecutorTimestamp()
204-
job.cooperativeExecutorTimestamp = deadline
205-
waitQueue.push(UnownedJob(job))
206280
}
281+
207282
}
283+
#endif
208284

209285
@available(StdlibDeploymentTarget 6.2, *)
210286
extension CooperativeExecutor: RunLoopExecutor {
@@ -215,36 +291,47 @@ extension CooperativeExecutor: RunLoopExecutor {
215291
public func runUntil(_ condition: () -> Bool) throws {
216292
shouldStop = false
217293
while !shouldStop && !condition() {
218-
// Process the timer queue
219-
let now = currentTime
220-
while let job = waitQueue.pop(when: {
221-
ExecutorJob($0).cooperativeExecutorTimestamp <= now
222-
}) {
223-
var theJob = ExecutorJob(job)
224-
theJob.clearCooperativeExecutorTimestamp()
225-
runQueue.push(job)
294+
#if !$Embedded && !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
295+
// Process the timer queues
296+
suspendingWaitQueue.forEachReadyJob {
297+
runQueue.push(UnownedJob($0))
298+
}
299+
continuousWaitQueue.forEachReadyJob {
300+
runQueue.push(UnownedJob($0))
226301
}
302+
#endif
227303

228304
// Now run any queued jobs
229-
while let job = runQueue.pop() {
305+
var runQ = runQueue.take()
306+
while let job = runQ.pop() {
230307
unsafe ExecutorJob(job).runSynchronously(
231308
on: self.asUnownedSerialExecutor()
232309
)
233310
}
234311

312+
#if !$Embedded && !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
235313
// Finally, wait until the next deadline
236-
if let job = waitQueue.top {
237-
let deadline = ExecutorJob(job).cooperativeExecutorTimestamp
238-
let now = self.currentTime
239-
if deadline > now {
240-
let toWait = deadline - now
241-
_sleep(seconds: toWait.seconds,
242-
nanoseconds: toWait.nanoseconds)
314+
var toWait: Duration? = suspendingWaitQueue.timeToNextJob
315+
316+
if let continuousToWait = continuousWaitQueue.timeToNextJob {
317+
if toWait == nil || continuousToWait < toWait! {
318+
toWait = continuousToWait
243319
}
244-
} else {
320+
}
321+
322+
if let toWait {
323+
_sleep(seconds: toWait.seconds,
324+
nanoseconds: toWait.nanoseconds)
325+
} else if runQueue.isEmpty {
326+
// Stop if no more jobs are available
327+
break
328+
}
329+
#else // $Embedded || SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
330+
if runQueue.isEmpty {
245331
// Stop if no more jobs are available
246332
break
247333
}
334+
#endif
248335
}
249336
}
250337

0 commit comments

Comments
 (0)