@@ -331,12 +331,13 @@ package actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
331
331
/// The queue of pending tasks that haven't been scheduled for execution yet.
332
332
private var pendingTasks : [ QueuedTask < TaskDescription > ] = [ ]
333
333
334
- /// An ordered list of task priorities to the number of tasks that might execute concurrently at that ( or a higher)
335
- /// priority.
334
+ /// An ordered list of task priorities to the number of tasks that might execute concurrently at that or a lower
335
+ /// priority. As the task priority is decreased, the number of current tasks becomes more restricted.
336
336
///
337
- /// This list is sorted in descending priority order .
337
+ /// This list is normalized according to `normalize(maxConcurrentTasksByPriority:)` .
338
338
///
339
- /// The `maxConcurrentTasks` of the last element in this list is also used for tasks with a lower priority.
339
+ /// The highest priority entry in this list restricts the total number of tasks that can be executed at any priority
340
+ /// (including priorities higher than its entry).
340
341
///
341
342
/// For example if you have
342
343
/// ```swift
@@ -347,38 +348,40 @@ package actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
347
348
/// ```
348
349
///
349
350
/// Then we allow the following number of concurrent tasks at the following priorities
350
- /// - `.high`: 4
351
+ /// - `.high`: 4 (because `.medium: 4` restricts the total number of tasks to 4)
351
352
/// - `.medium`: 4
352
353
/// - `.low`: 2
353
354
/// - `.background`: 2
355
+ ///
356
+ /// When combining tasks with different priorities:
357
+ /// - If we have 3 medium priority tasks, we can have at most 1 low priority task
358
+ /// - If we have 1 medium priority task, we can still have 2 low priority tasks, but no more
354
359
private var maxConcurrentTasksByPriority : [ ( priority: TaskPriority , maxConcurrentTasks: Int ) ] {
355
360
didSet {
356
- // These preconditions need to match the ones in `init`
357
- maxConcurrentTasksByPriority = maxConcurrentTasksByPriority. sorted ( by: { $0. priority > $1. priority } )
358
- precondition ( maxConcurrentTasksByPriority. allSatisfy { $0. maxConcurrentTasks >= 0 } )
359
- precondition ( maxConcurrentTasksByPriority. map ( \. maxConcurrentTasks) . isSorted ( descending: true ) )
360
- precondition ( !maxConcurrentTasksByPriority. isEmpty)
361
-
362
- // Check we are over-subscribed in currently executing tasks. If we are, cancel currently executing task to be
363
- // rescheduled until we are within the new limit.
364
- var tasksToReschedule : [ QueuedTask < TaskDescription > ] = [ ]
365
- for (priority, maxConcurrentTasks) in maxConcurrentTasksByPriority {
366
- var tasksInPrioritySlot = currentlyExecutingTasks. filter { $0. priority <= priority }
367
- if tasksInPrioritySlot. count <= maxConcurrentTasks {
368
- // We have enough available slots. Nothing to do.
369
- continue
370
- }
371
- tasksInPrioritySlot = tasksInPrioritySlot. sorted { $0. priority > $1. priority }
372
- while tasksInPrioritySlot. count > maxConcurrentTasks {
373
- // Cancel the task with the lowest priority (because it is least important and also takes a slot in the lower
374
- // priority execution buckets) and the among those the most recent one because it has probably made the least
375
- // progress.
376
- guard let mostRecentTaskInSlot = tasksInPrioritySlot. popLast ( ) else {
377
- // Should never happen because `tasksInPrioritySlot.count > maxConcurrentTasks >= 0`
378
- logger. fault ( " Unexpectedly unable to pop last task from tasksInPrioritySlot " )
379
- break
380
- }
381
- tasksToReschedule. append ( mostRecentTaskInSlot)
361
+ maxConcurrentTasksByPriority = Self . normalize ( maxConcurrentTasksByPriority: maxConcurrentTasksByPriority)
362
+
363
+ if maxConcurrentTasksByPriority. count == oldValue. count,
364
+ zip ( maxConcurrentTasksByPriority, oldValue) . allSatisfy ( == )
365
+ {
366
+ // We didn't actually change anything, so we don't need to perform any validation or task processing.
367
+ return
368
+ }
369
+
370
+ // Check we are over-subscribed in currently executing tasks by walking through all currently executing tasks and
371
+ // checking if we could schedule them within the new execution limits. Cancel any tasks that do not fit within the
372
+ // new limit to be rescheduled when we are within the limit again.
373
+ var currentlyExecutingTaskDetails : [ ( priority: TaskPriority , estimatedCPUCoreCount: Int ) ] = [ ]
374
+ var tasksToCancelAndReschedule : [ QueuedTask < TaskDescription > ] = [ ]
375
+ for task in currentlyExecutingTasks. sorted ( by: { $0. priority > $1. priority } ) {
376
+ let taskPriority = task. priority
377
+ if Self . canScheduleTask (
378
+ withPriority: taskPriority,
379
+ maxConcurrentTasksByPriority: maxConcurrentTasksByPriority,
380
+ currentlyExecutingTaskDetails: currentlyExecutingTaskDetails
381
+ ) {
382
+ currentlyExecutingTaskDetails. append ( ( taskPriority, task. description. estimatedCPUCoreCount) )
383
+ } else {
384
+ tasksToCancelAndReschedule. append ( task)
382
385
}
383
386
}
384
387
@@ -390,7 +393,7 @@ package actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
390
393
// would cancel the tasks and then immediately reschedule it – while that's doing unnecessary work, it's still
391
394
// correct.
392
395
Task . detached ( priority: . high) {
393
- for tasksToReschedule in tasksToReschedule {
396
+ for tasksToReschedule in tasksToCancelAndReschedule {
394
397
await tasksToReschedule. cancelToBeRescheduled ( )
395
398
}
396
399
}
@@ -406,11 +409,7 @@ package actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
406
409
}
407
410
408
411
package init ( maxConcurrentTasksByPriority: [ ( priority: TaskPriority , maxConcurrentTasks: Int ) ] ) {
409
- // These preconditions need to match the ones in `maxConcurrentTasksByPriority:didSet`
410
- self . maxConcurrentTasksByPriority = maxConcurrentTasksByPriority. sorted ( by: { $0. priority > $1. priority } )
411
- precondition ( maxConcurrentTasksByPriority. allSatisfy { $0. maxConcurrentTasks >= 0 } )
412
- precondition ( maxConcurrentTasksByPriority. map ( \. maxConcurrentTasks) . isSorted ( descending: true ) )
413
- precondition ( !maxConcurrentTasksByPriority. isEmpty)
412
+ self . maxConcurrentTasksByPriority = Self . normalize ( maxConcurrentTasksByPriority: maxConcurrentTasksByPriority)
414
413
}
415
414
416
415
/// Enqueue a new task to be executed.
@@ -448,16 +447,58 @@ package actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
448
447
return queuedTask
449
448
}
450
449
451
- /// Returns the maximum number of concurrent tasks that are allowed to execute at the given priority.
452
- private func maxConcurrentTasks( at priority: TaskPriority ) -> Int {
453
- for (atPriority, maxConcurrentTasks) in maxConcurrentTasksByPriority {
454
- if atPriority <= priority {
455
- return maxConcurrentTasks
450
+ private static func normalize(
451
+ maxConcurrentTasksByPriority: [ ( priority: TaskPriority , maxConcurrentTasks: Int ) ]
452
+ ) -> [ ( priority: TaskPriority , maxConcurrentTasks: Int ) ] {
453
+ var maxConcurrentTasksByPriority = maxConcurrentTasksByPriority
454
+
455
+ // Ensure elements are sorted decreasingly by priority.
456
+ maxConcurrentTasksByPriority = maxConcurrentTasksByPriority. sorted ( by: { $0. priority > $1. priority } )
457
+
458
+ // Ensure array is not empty.
459
+ if maxConcurrentTasksByPriority. isEmpty {
460
+ logger. fault ( " Received empty maxConcurrentTasksByPriority. Allowing as many tasks as there are processor cores. " )
461
+ maxConcurrentTasksByPriority = [ ( . medium, ProcessInfo . processInfo. processorCount) ]
462
+ }
463
+
464
+ // Ensure `maxConcurrentTasks` is not increasing with lower priority tasks.
465
+ var lastMaxConcurrentTasks = maxConcurrentTasksByPriority. first!. maxConcurrentTasks
466
+ for i in 1 ..< maxConcurrentTasksByPriority. count {
467
+ if maxConcurrentTasksByPriority [ i] . maxConcurrentTasks > lastMaxConcurrentTasks {
468
+ logger. fault ( " More tasks allowed for lower priority than for higher priority " )
469
+ maxConcurrentTasksByPriority [ i] . maxConcurrentTasks = lastMaxConcurrentTasks
470
+ } else {
471
+ lastMaxConcurrentTasks = maxConcurrentTasksByPriority [ i] . maxConcurrentTasks
456
472
}
457
473
}
458
- // `last!` is fine because the initializer of `maxConcurrentTasksByPriority` has a precondition that
459
- // `maxConcurrentTasksByPriority` is not empty.
460
- return maxConcurrentTasksByPriority. last!. maxConcurrentTasks
474
+
475
+ return maxConcurrentTasksByPriority
476
+ }
477
+
478
+ /// Returns `true` if we can schedule a task with the given priority, assuming that the currently executing tasks have
479
+ /// the given priorities.
480
+ package static func canScheduleTask(
481
+ withPriority newTaskPriority: TaskPriority ,
482
+ maxConcurrentTasksByPriority: [ ( priority: TaskPriority , maxConcurrentTasks: Int ) ] ,
483
+ currentlyExecutingTaskDetails: [ ( priority: TaskPriority , estimatedCPUCoreCount: Int ) ]
484
+ ) -> Bool {
485
+ if currentlyExecutingTaskDetails. sum ( of: \. estimatedCPUCoreCount)
486
+ >= maxConcurrentTasksByPriority. first!. maxConcurrentTasks
487
+ {
488
+ return false
489
+ }
490
+ for (priority, maxConcurrentTasks) in maxConcurrentTasksByPriority {
491
+ guard priority >= newTaskPriority else {
492
+ // This limit does not affect the new task
493
+ continue
494
+ }
495
+ if currentlyExecutingTaskDetails. filter ( { $0. priority <= priority } ) . sum ( of: \. estimatedCPUCoreCount)
496
+ >= maxConcurrentTasks
497
+ {
498
+ return false
499
+ }
500
+ }
501
+ return true
461
502
}
462
503
463
504
/// Poke the execution of more tasks in the queue.
@@ -466,12 +507,19 @@ package actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
466
507
private func poke( ) {
467
508
pendingTasks. sort ( by: { $0. priority > $1. priority } )
468
509
for task in pendingTasks {
469
- if currentlyExecutingTasks. map ( \. description. estimatedCPUCoreCount) . sum ( ) >= maxConcurrentTasks ( at: task. priority)
470
- {
510
+ guard
511
+ Self . canScheduleTask (
512
+ withPriority: task. priority,
513
+ maxConcurrentTasksByPriority: maxConcurrentTasksByPriority,
514
+ currentlyExecutingTaskDetails: currentlyExecutingTasks. map ( {
515
+ ( $0. priority, $0. description. estimatedCPUCoreCount)
516
+ } )
517
+ )
518
+ else {
471
519
// We don't have any execution slots left. Thus, this poker has nothing to do and is done.
472
520
// When the next task finishes, it calls `poke` again.
473
- // If the low priority task's priority gets elevated that task's priority will get elevated and it will be
474
- // picked up on the next `poke` call .
521
+ // If a low priority task's priority gets elevated that task's priority will get elevated, which will call
522
+ // `poke`.
475
523
return
476
524
}
477
525
let dependencies = task. description. dependencies ( to: currentlyExecutingTasks. map ( \. description) )
@@ -600,11 +648,11 @@ fileprivate extension Collection where Element: Comparable {
600
648
}
601
649
}
602
650
603
- fileprivate extension Collection < Int > {
604
- func sum( ) -> Int {
651
+ fileprivate extension Collection {
652
+ func sum( of transform : ( Self . Element ) -> Int ) -> Int {
605
653
var result = 0
606
654
for element in self {
607
- result += element
655
+ result += transform ( element)
608
656
}
609
657
return result
610
658
}
0 commit comments