@@ -97,16 +97,77 @@ extension ExecutorJob {
97
97
}
98
98
}
99
99
100
+ #if !$Embedded && !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
101
+ /// A wait queue is a specialised priority queue used to run a timer.
102
+ @available ( StdlibDeploymentTarget 6 . 2 , * )
103
+ struct WaitQueue {
104
+ var queue : PriorityQueue < UnownedJob >
105
+ var clock : _ClockID
106
+
107
+ init ( clock: _ClockID ) {
108
+ queue = PriorityQueue ( compare: {
109
+ ExecutorJob ( $0) . cooperativeExecutorTimestamp
110
+ < ExecutorJob( $1) . cooperativeExecutorTimestamp
111
+ } )
112
+ self . clock = clock
113
+ }
114
+
115
+ var currentTime : CooperativeExecutor . Timestamp {
116
+ var now : CooperativeExecutor . Timestamp = . zero
117
+ unsafe _getTime ( seconds: & now. seconds,
118
+ nanoseconds: & now. nanoseconds,
119
+ clock: clock. rawValue)
120
+ return now
121
+ }
122
+
123
+ mutating func enqueue( _ job: consuming ExecutorJob ,
124
+ after delay: CooperativeExecutor . Duration ) {
125
+ let deadline = currentTime + delay
126
+ job. setupCooperativeExecutorTimestamp ( )
127
+ job. cooperativeExecutorTimestamp = deadline
128
+ queue. push ( UnownedJob ( job) )
129
+ }
130
+
131
+ mutating func forEachReadyJob( body: ( consuming ExecutorJob ) -> ( ) ) {
132
+ let now = currentTime
133
+ while let job = queue. pop (
134
+ when: {
135
+ ExecutorJob ( $0) . cooperativeExecutorTimestamp < now
136
+ } ) {
137
+ var theJob = ExecutorJob ( job)
138
+ theJob. clearCooperativeExecutorTimestamp ( )
139
+ body ( theJob)
140
+ }
141
+ }
142
+
143
+ var timeToNextJob : CooperativeExecutor . Duration ? {
144
+ if let job = queue. top {
145
+ let deadline = ExecutorJob ( job) . cooperativeExecutorTimestamp
146
+ let now = currentTime
147
+ if deadline > now {
148
+ return deadline - now
149
+ } else {
150
+ return CooperativeExecutor . Duration ( seconds: 0 , nanoseconds: 0 )
151
+ }
152
+ }
153
+ return nil
154
+ }
155
+ }
156
+ #endif
157
+
100
158
/// A co-operative executor that can be used as the main executor or as a
101
159
/// task executor.
102
160
@available ( StdlibDeploymentTarget 6 . 2 , * )
103
- class CooperativeExecutor : Executor , @unchecked Sendable {
161
+ final class CooperativeExecutor : Executor , @unchecked Sendable {
104
162
var runQueue : PriorityQueue < UnownedJob >
105
- var waitQueue : PriorityQueue < UnownedJob >
163
+ #if !$Embedded && !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
164
+ var suspendingWaitQueue = WaitQueue ( clock: . suspending)
165
+ var continuousWaitQueue = WaitQueue ( clock: . continuous)
166
+ #endif
106
167
var shouldStop : Bool = false
107
168
108
169
/// Internal representation of a duration for CooperativeExecutor
109
- struct Duration {
170
+ struct Duration : Comparable {
110
171
var seconds : Int64
111
172
var nanoseconds : Int64
112
173
@@ -120,6 +181,16 @@ class CooperativeExecutor: Executor, @unchecked Sendable {
120
181
self . seconds = seconds
121
182
self . nanoseconds = attoseconds / 1_000_000_000
122
183
}
184
+
185
+ static func == ( lhs: Duration , rhs: Duration ) -> Bool {
186
+ return lhs. seconds == rhs. seconds && lhs. nanoseconds == rhs. nanoseconds
187
+ }
188
+ static func < ( lhs: Duration , rhs: Duration ) -> Bool {
189
+ return lhs. seconds < rhs. seconds || (
190
+ lhs. seconds == rhs. seconds
191
+ && lhs. nanoseconds < rhs. nanoseconds
192
+ )
193
+ }
123
194
}
124
195
125
196
/// Internal representation of a timestamp for CooperativeExecutor
@@ -163,48 +234,53 @@ class CooperativeExecutor: Executor, @unchecked Sendable {
163
234
164
235
public init ( ) {
165
236
runQueue = PriorityQueue ( compare: { $0. priority > $1. priority } )
166
- waitQueue =
167
- PriorityQueue ( compare: {
168
- ExecutorJob ( $0) . cooperativeExecutorTimestamp
169
- < ExecutorJob( $1) . cooperativeExecutorTimestamp
170
- } )
171
237
}
172
238
173
239
public func enqueue( _ job: consuming ExecutorJob ) {
174
240
runQueue. push ( UnownedJob ( job) )
175
241
}
176
242
177
243
public var isMainExecutor : Bool { true }
178
-
179
- public var asSchedulable : any SchedulableExecutor { self }
180
244
}
181
245
246
+ #if !$Embedded && !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
182
247
@available ( StdlibDeploymentTarget 6 . 2 , * )
183
- extension CooperativeExecutor : SchedulableExecutor {
184
- var currentTime : Timestamp {
248
+ extension CooperativeExecutor : SchedulingExecutor {
249
+
250
+ public var asScheduling : ( any SchedulingExecutor ) ? {
251
+ return self
252
+ }
253
+
254
+ func currentTime( clock: _ClockID ) -> Timestamp {
185
255
var now : Timestamp = . zero
186
256
unsafe _getTime( seconds: & now. seconds,
187
257
nanoseconds: & now. nanoseconds,
188
- clock: _ClockID . suspending . rawValue)
258
+ clock: clock . rawValue)
189
259
return now
190
260
}
191
261
192
262
public func enqueue< C: Clock > ( _ job: consuming ExecutorJob ,
193
263
after delay: C . Duration ,
194
264
tolerance: C . Duration ? = nil ,
195
265
clock: C ) {
196
- guard let swiftDuration = delay as? Swift . Duration else {
197
- fatalError ( " Unsupported clock " )
266
+ // If it's a clock we know, get the duration to wait
267
+ if let _ = clock as? ContinuousClock {
268
+ let continuousDuration = delay as! ContinuousClock . Duration
269
+ let duration = Duration ( from: continuousDuration)
270
+ continuousWaitQueue. enqueue ( job, after: duration)
271
+ } else if let _ = clock as? SuspendingClock {
272
+ let suspendingDuration = delay as! SuspendingClock . Duration
273
+ let duration = Duration ( from: suspendingDuration)
274
+ suspendingWaitQueue. enqueue ( job, after: duration)
275
+ } else {
276
+ clock. enqueue ( job, on: self , at: clock. now. advanced ( by: delay) ,
277
+ tolerance: tolerance)
278
+ return
198
279
}
199
-
200
- let duration = Duration ( from: swiftDuration)
201
- let deadline = self . currentTime + duration
202
-
203
- job. setupCooperativeExecutorTimestamp ( )
204
- job. cooperativeExecutorTimestamp = deadline
205
- waitQueue. push ( UnownedJob ( job) )
206
280
}
281
+
207
282
}
283
+ #endif
208
284
209
285
@available ( StdlibDeploymentTarget 6 . 2 , * )
210
286
extension CooperativeExecutor : RunLoopExecutor {
@@ -215,36 +291,47 @@ extension CooperativeExecutor: RunLoopExecutor {
215
291
public func runUntil( _ condition: ( ) -> Bool ) throws {
216
292
shouldStop = false
217
293
while !shouldStop && !condition( ) {
218
- // Process the timer queue
219
- let now = currentTime
220
- while let job = waitQueue. pop ( when: {
221
- ExecutorJob ( $0) . cooperativeExecutorTimestamp <= now
222
- } ) {
223
- var theJob = ExecutorJob ( job)
224
- theJob. clearCooperativeExecutorTimestamp ( )
225
- runQueue. push ( job)
294
+ #if !$Embedded && !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
295
+ // Process the timer queues
296
+ suspendingWaitQueue. forEachReadyJob {
297
+ runQueue. push ( UnownedJob ( $0) )
298
+ }
299
+ continuousWaitQueue. forEachReadyJob {
300
+ runQueue. push ( UnownedJob ( $0) )
226
301
}
302
+ #endif
227
303
228
304
// Now run any queued jobs
229
- while let job = runQueue. pop ( ) {
305
+ var runQ = runQueue. take ( )
306
+ while let job = runQ. pop ( ) {
230
307
unsafe ExecutorJob( job) . runSynchronously (
231
308
on: self . asUnownedSerialExecutor ( )
232
309
)
233
310
}
234
311
312
+ #if !$Embedded && !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
235
313
// Finally, wait until the next deadline
236
- if let job = waitQueue. top {
237
- let deadline = ExecutorJob ( job) . cooperativeExecutorTimestamp
238
- let now = self . currentTime
239
- if deadline > now {
240
- let toWait = deadline - now
241
- _sleep ( seconds: toWait. seconds,
242
- nanoseconds: toWait. nanoseconds)
314
+ var toWait: Duration? = suspendingWaitQueue. timeToNextJob
315
+
316
+ if let continuousToWait = continuousWaitQueue . timeToNextJob {
317
+ if toWait == nil || continuousToWait < toWait! {
318
+ toWait = continuousToWait
243
319
}
244
- } else {
320
+ }
321
+
322
+ if let toWait {
323
+ _sleep ( seconds: toWait. seconds,
324
+ nanoseconds: toWait. nanoseconds)
325
+ } else if runQueue . isEmpty {
326
+ // Stop if no more jobs are available
327
+ break
328
+ }
329
+ #else // $Embedded || SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
330
+ if runQueue. isEmpty {
245
331
// Stop if no more jobs are available
246
332
break
247
333
}
334
+ #endif
248
335
}
249
336
}
250
337
0 commit comments