Skip to content

Commit 004f066

Browse files
authored
Merge pull request #1991 from ahoppen/fix-max-tasks
Fix a bug in `TaskScheduler` that incorrectly restricted the number of executing tasks
2 parents 1209747 + 2d96dd7 commit 004f066

File tree

2 files changed

+170
-55
lines changed

2 files changed

+170
-55
lines changed

Sources/SemanticIndex/TaskScheduler.swift

Lines changed: 100 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -331,12 +331,13 @@ package actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
331331
/// The queue of pending tasks that haven't been scheduled for execution yet.
332332
private var pendingTasks: [QueuedTask<TaskDescription>] = []
333333

334-
/// An ordered list of task priorities to the number of tasks that might execute concurrently at that (or a higher)
335-
/// priority.
334+
/// An ordered list of task priorities to the number of tasks that might execute concurrently at that or a lower
335+
/// priority. As the task priority is decreased, the number of current tasks becomes more restricted.
336336
///
337-
/// This list is sorted in descending priority order.
337+
/// This list is normalized according to `normalize(maxConcurrentTasksByPriority:)`.
338338
///
339-
/// The `maxConcurrentTasks` of the last element in this list is also used for tasks with a lower priority.
339+
/// The highest priority entry in this list restricts the total number of tasks that can be executed at any priority
340+
/// (including priorities higher than its entry).
340341
///
341342
/// For example if you have
342343
/// ```swift
@@ -347,38 +348,40 @@ package actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
347348
/// ```
348349
///
349350
/// Then we allow the following number of concurrent tasks at the following priorities
350-
/// - `.high`: 4
351+
/// - `.high`: 4 (because `.medium: 4` restricts the total number of tasks to 4)
351352
/// - `.medium`: 4
352353
/// - `.low`: 2
353354
/// - `.background`: 2
355+
///
356+
/// When combining tasks with different priorities:
357+
/// - If we have 3 medium priority tasks, we can have at most 1 low priority task
358+
/// - If we have 1 medium priority task, we can still have 2 low priority tasks, but no more
354359
private var maxConcurrentTasksByPriority: [(priority: TaskPriority, maxConcurrentTasks: Int)] {
355360
didSet {
356-
// These preconditions need to match the ones in `init`
357-
maxConcurrentTasksByPriority = maxConcurrentTasksByPriority.sorted(by: { $0.priority > $1.priority })
358-
precondition(maxConcurrentTasksByPriority.allSatisfy { $0.maxConcurrentTasks >= 0 })
359-
precondition(maxConcurrentTasksByPriority.map(\.maxConcurrentTasks).isSorted(descending: true))
360-
precondition(!maxConcurrentTasksByPriority.isEmpty)
361-
362-
// Check we are over-subscribed in currently executing tasks. If we are, cancel currently executing task to be
363-
// rescheduled until we are within the new limit.
364-
var tasksToReschedule: [QueuedTask<TaskDescription>] = []
365-
for (priority, maxConcurrentTasks) in maxConcurrentTasksByPriority {
366-
var tasksInPrioritySlot = currentlyExecutingTasks.filter { $0.priority <= priority }
367-
if tasksInPrioritySlot.count <= maxConcurrentTasks {
368-
// We have enough available slots. Nothing to do.
369-
continue
370-
}
371-
tasksInPrioritySlot = tasksInPrioritySlot.sorted { $0.priority > $1.priority }
372-
while tasksInPrioritySlot.count > maxConcurrentTasks {
373-
// Cancel the task with the lowest priority (because it is least important and also takes a slot in the lower
374-
// priority execution buckets) and the among those the most recent one because it has probably made the least
375-
// progress.
376-
guard let mostRecentTaskInSlot = tasksInPrioritySlot.popLast() else {
377-
// Should never happen because `tasksInPrioritySlot.count > maxConcurrentTasks >= 0`
378-
logger.fault("Unexpectedly unable to pop last task from tasksInPrioritySlot")
379-
break
380-
}
381-
tasksToReschedule.append(mostRecentTaskInSlot)
361+
maxConcurrentTasksByPriority = Self.normalize(maxConcurrentTasksByPriority: maxConcurrentTasksByPriority)
362+
363+
if maxConcurrentTasksByPriority.count == oldValue.count,
364+
zip(maxConcurrentTasksByPriority, oldValue).allSatisfy(==)
365+
{
366+
// We didn't actually change anything, so we don't need to perform any validation or task processing.
367+
return
368+
}
369+
370+
// Check we are over-subscribed in currently executing tasks by walking through all currently executing tasks and
371+
// checking if we could schedule them within the new execution limits. Cancel any tasks that do not fit within the
372+
// new limit to be rescheduled when we are within the limit again.
373+
var currentlyExecutingTaskDetails: [(priority: TaskPriority, estimatedCPUCoreCount: Int)] = []
374+
var tasksToCancelAndReschedule: [QueuedTask<TaskDescription>] = []
375+
for task in currentlyExecutingTasks.sorted(by: { $0.priority > $1.priority }) {
376+
let taskPriority = task.priority
377+
if Self.canScheduleTask(
378+
withPriority: taskPriority,
379+
maxConcurrentTasksByPriority: maxConcurrentTasksByPriority,
380+
currentlyExecutingTaskDetails: currentlyExecutingTaskDetails
381+
) {
382+
currentlyExecutingTaskDetails.append((taskPriority, task.description.estimatedCPUCoreCount))
383+
} else {
384+
tasksToCancelAndReschedule.append(task)
382385
}
383386
}
384387

@@ -390,7 +393,7 @@ package actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
390393
// would cancel the tasks and then immediately reschedule it – while that's doing unnecessary work, it's still
391394
// correct.
392395
Task.detached(priority: .high) {
393-
for tasksToReschedule in tasksToReschedule {
396+
for tasksToReschedule in tasksToCancelAndReschedule {
394397
await tasksToReschedule.cancelToBeRescheduled()
395398
}
396399
}
@@ -406,11 +409,7 @@ package actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
406409
}
407410

408411
package init(maxConcurrentTasksByPriority: [(priority: TaskPriority, maxConcurrentTasks: Int)]) {
409-
// These preconditions need to match the ones in `maxConcurrentTasksByPriority:didSet`
410-
self.maxConcurrentTasksByPriority = maxConcurrentTasksByPriority.sorted(by: { $0.priority > $1.priority })
411-
precondition(maxConcurrentTasksByPriority.allSatisfy { $0.maxConcurrentTasks >= 0 })
412-
precondition(maxConcurrentTasksByPriority.map(\.maxConcurrentTasks).isSorted(descending: true))
413-
precondition(!maxConcurrentTasksByPriority.isEmpty)
412+
self.maxConcurrentTasksByPriority = Self.normalize(maxConcurrentTasksByPriority: maxConcurrentTasksByPriority)
414413
}
415414

416415
/// Enqueue a new task to be executed.
@@ -448,16 +447,58 @@ package actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
448447
return queuedTask
449448
}
450449

451-
/// Returns the maximum number of concurrent tasks that are allowed to execute at the given priority.
452-
private func maxConcurrentTasks(at priority: TaskPriority) -> Int {
453-
for (atPriority, maxConcurrentTasks) in maxConcurrentTasksByPriority {
454-
if atPriority <= priority {
455-
return maxConcurrentTasks
450+
private static func normalize(
451+
maxConcurrentTasksByPriority: [(priority: TaskPriority, maxConcurrentTasks: Int)]
452+
) -> [(priority: TaskPriority, maxConcurrentTasks: Int)] {
453+
var maxConcurrentTasksByPriority = maxConcurrentTasksByPriority
454+
455+
// Ensure elements are sorted decreasingly by priority.
456+
maxConcurrentTasksByPriority = maxConcurrentTasksByPriority.sorted(by: { $0.priority > $1.priority })
457+
458+
// Ensure array is not empty.
459+
if maxConcurrentTasksByPriority.isEmpty {
460+
logger.fault("Received empty maxConcurrentTasksByPriority. Allowing as many tasks as there are processor cores.")
461+
maxConcurrentTasksByPriority = [(.medium, ProcessInfo.processInfo.processorCount)]
462+
}
463+
464+
// Ensure `maxConcurrentTasks` is not increasing with lower priority tasks.
465+
var lastMaxConcurrentTasks = maxConcurrentTasksByPriority.first!.maxConcurrentTasks
466+
for i in 1..<maxConcurrentTasksByPriority.count {
467+
if maxConcurrentTasksByPriority[i].maxConcurrentTasks > lastMaxConcurrentTasks {
468+
logger.fault("More tasks allowed for lower priority than for higher priority")
469+
maxConcurrentTasksByPriority[i].maxConcurrentTasks = lastMaxConcurrentTasks
470+
} else {
471+
lastMaxConcurrentTasks = maxConcurrentTasksByPriority[i].maxConcurrentTasks
456472
}
457473
}
458-
// `last!` is fine because the initializer of `maxConcurrentTasksByPriority` has a precondition that
459-
// `maxConcurrentTasksByPriority` is not empty.
460-
return maxConcurrentTasksByPriority.last!.maxConcurrentTasks
474+
475+
return maxConcurrentTasksByPriority
476+
}
477+
478+
/// Returns `true` if we can schedule a task with the given priority, assuming that the currently executing tasks have
479+
/// the given priorities.
480+
package static func canScheduleTask(
481+
withPriority newTaskPriority: TaskPriority,
482+
maxConcurrentTasksByPriority: [(priority: TaskPriority, maxConcurrentTasks: Int)],
483+
currentlyExecutingTaskDetails: [(priority: TaskPriority, estimatedCPUCoreCount: Int)]
484+
) -> Bool {
485+
if currentlyExecutingTaskDetails.sum(of: \.estimatedCPUCoreCount)
486+
>= maxConcurrentTasksByPriority.first!.maxConcurrentTasks
487+
{
488+
return false
489+
}
490+
for (priority, maxConcurrentTasks) in maxConcurrentTasksByPriority {
491+
guard priority >= newTaskPriority else {
492+
// This limit does not affect the new task
493+
continue
494+
}
495+
if currentlyExecutingTaskDetails.filter({ $0.priority <= priority }).sum(of: \.estimatedCPUCoreCount)
496+
>= maxConcurrentTasks
497+
{
498+
return false
499+
}
500+
}
501+
return true
461502
}
462503

463504
/// Poke the execution of more tasks in the queue.
@@ -466,12 +507,19 @@ package actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
466507
private func poke() {
467508
pendingTasks.sort(by: { $0.priority > $1.priority })
468509
for task in pendingTasks {
469-
if currentlyExecutingTasks.map(\.description.estimatedCPUCoreCount).sum() >= maxConcurrentTasks(at: task.priority)
470-
{
510+
guard
511+
Self.canScheduleTask(
512+
withPriority: task.priority,
513+
maxConcurrentTasksByPriority: maxConcurrentTasksByPriority,
514+
currentlyExecutingTaskDetails: currentlyExecutingTasks.map({
515+
($0.priority, $0.description.estimatedCPUCoreCount)
516+
})
517+
)
518+
else {
471519
// We don't have any execution slots left. Thus, this poker has nothing to do and is done.
472520
// When the next task finishes, it calls `poke` again.
473-
// If the low priority task's priority gets elevated that task's priority will get elevated and it will be
474-
// picked up on the next `poke` call.
521+
// If a low priority task's priority gets elevated that task's priority will get elevated, which will call
522+
// `poke`.
475523
return
476524
}
477525
let dependencies = task.description.dependencies(to: currentlyExecutingTasks.map(\.description))
@@ -600,11 +648,11 @@ fileprivate extension Collection where Element: Comparable {
600648
}
601649
}
602650

603-
fileprivate extension Collection<Int> {
604-
func sum() -> Int {
651+
fileprivate extension Collection {
652+
func sum(of transform: (Self.Element) -> Int) -> Int {
605653
var result = 0
606654
for element in self {
607-
result += element
655+
result += transform(element)
608656
}
609657
return result
610658
}

Tests/SemanticIndexTests/TaskSchedulerTests.swift

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,65 @@ final class TaskSchedulerTests: XCTestCase {
276276
await taskScheduler.setMaxConcurrentTasksByPriority([(.low, 1)])
277277
try await fulfillmentOfOrThrow([taskExecutedAgain])
278278
}
279+
280+
func testUseAllExecutionSlotsWithHighAndLowPriorityTasks() async throws {
281+
let taskScheduler = TaskScheduler<ClosureTaskDescription>(maxConcurrentTasksByPriority: [(.high, 2), (.low, 1)])
282+
283+
let highPriorityTaskStartedExecuting = WrappedSemaphore(name: "High priority task started executing")
284+
let lowPriorityTaskStartedExecuting = WrappedSemaphore(name: "Low priority task started executing")
285+
286+
let highPriorityTaskFinished = WrappedSemaphore(name: "High priority task finished")
287+
let lowPriorityTaskFinished = WrappedSemaphore(name: "Low priority task finished")
288+
289+
await taskScheduler.schedule(priority: .high, id: .highPriority(1)) {
290+
highPriorityTaskStartedExecuting.signal()
291+
lowPriorityTaskStartedExecuting.waitOrXCTFail()
292+
highPriorityTaskFinished.signal()
293+
}
294+
295+
await taskScheduler.schedule(priority: .low, id: .lowPriority(3)) {
296+
lowPriorityTaskStartedExecuting.signal()
297+
highPriorityTaskStartedExecuting.waitOrXCTFail()
298+
lowPriorityTaskFinished.signal()
299+
}
300+
301+
highPriorityTaskFinished.waitOrXCTFail()
302+
lowPriorityTaskFinished.waitOrXCTFail()
303+
}
304+
305+
func testScheduleTask() {
306+
XCTAssertFalse(
307+
TaskScheduler<ClosureTaskDescription>.canScheduleTask(
308+
withPriority: .low,
309+
maxConcurrentTasksByPriority: [(.high, 2), (.low, 1)],
310+
currentlyExecutingTaskDetails: [(.high, 1), (.high, 1)]
311+
)
312+
)
313+
314+
XCTAssert(
315+
TaskScheduler<ClosureTaskDescription>.canScheduleTask(
316+
withPriority: .low,
317+
maxConcurrentTasksByPriority: [(.high, 3), (.low, 1)],
318+
currentlyExecutingTaskDetails: [(.high, 1), (.high, 1)]
319+
)
320+
)
321+
322+
XCTAssert(
323+
TaskScheduler<ClosureTaskDescription>.canScheduleTask(
324+
withPriority: .low,
325+
maxConcurrentTasksByPriority: [(.high, 3), (.low, 2)],
326+
currentlyExecutingTaskDetails: [(.high, 1), (.low, 1)]
327+
)
328+
)
329+
330+
XCTAssertFalse(
331+
TaskScheduler<ClosureTaskDescription>.canScheduleTask(
332+
withPriority: .low,
333+
maxConcurrentTasksByPriority: [(.high, 2), (.low, 0)],
334+
currentlyExecutingTaskDetails: []
335+
)
336+
)
337+
}
279338
}
280339

281340
// MARK: - Test helpers
@@ -425,15 +484,23 @@ fileprivate extension TaskScheduler<ClosureTaskDescription> {
425484
priority: TaskPriority? = nil,
426485
id: TaskID?,
427486
estimatedCPUCoreCount: Int = 1,
428-
body: @Sendable @escaping () async -> Void,
487+
body: @Sendable @escaping () async throws -> Void,
429488
dependencies: @Sendable @escaping ([ClosureTaskDescription]) -> [TaskDependencyAction<ClosureTaskDescription>] = {
430489
_ in []
431-
}
490+
},
491+
file: StaticString = #filePath,
492+
line: UInt = #line
432493
) async -> Task<Void, Never> {
433494
let taskDescription = ClosureTaskDescription(
434495
id: id,
435496
estimatedCPUCoreCount: estimatedCPUCoreCount,
436-
body,
497+
{
498+
do {
499+
try await body()
500+
} catch {
501+
XCTFail("Received unexpected error: \(error)", file: file, line: line)
502+
}
503+
},
437504
dependencies: dependencies
438505
)
439506
// Make sure that we call `schedule` outside of the `Task` because the execution order of `Task`s is not guaranteed

0 commit comments

Comments
 (0)