diff --git a/Sources/SWBTaskConstruction/ProductPlanning/BuildPlan.swift b/Sources/SWBTaskConstruction/ProductPlanning/BuildPlan.swift index 81649982..78318ca8 100644 --- a/Sources/SWBTaskConstruction/ProductPlanning/BuildPlan.swift +++ b/Sources/SWBTaskConstruction/ProductPlanning/BuildPlan.swift @@ -12,6 +12,8 @@ package import SWBUtil package import SWBCore +import os +import Foundation /// Information describing a complete build plan request. package struct BuildPlanRequest: Sendable { @@ -88,8 +90,10 @@ package final class BuildPlan: StaleFileRemovalContext { // Create a planner to produce the actual product plans. let planner = ProductPlanner(planRequest: planRequest, taskPlanningDelegate: delegate) - // Create the queues to produce and aggregate the tasks. - let aggregationQueue = SWBQueue(label: "SWBTaskConstruction.BuildPlan.aggregationQueue", qos: planRequest.buildRequest.qos, autoreleaseFrequency: .workItem) + // Compute adaptive parallelism based on available cores + let processorCount = ProcessInfo.processInfo.activeProcessorCount + let highParallelism = max(processorCount * 2, 4) // Allow oversubscription for I/O-bound tasks + let mediumParallelism = max(processorCount, 2) // CPU-bound tasks // Compute a collated list of result contexts and task producers, so we can do a single parallel dispatch. // @@ -177,16 +181,13 @@ package final class BuildPlan: StaleFileRemovalContext { await ext.generateAdditionalTasks(&tasks, producer) } - aggregationQueue.async { [tasks] in - productPlanResultContext.addPlannedTasks(tasks) - } + // Direct call - thread-safe via ProductPlanResultContext's internal lock + productPlanResultContext.addPlannedTasks(tasks) } // Wait for task production. await group.waitForAll() } - - await aggregationQueue.sync{ } if delegate.cancelled { // Reset any deferred producers, which may participate in cycles. for context in productPlanResultContexts { @@ -200,23 +201,19 @@ package final class BuildPlan: StaleFileRemovalContext { // Compute all of the deferred tasks (in parallel). delegate.updateProgress(statusMessage: messageShortening == .full ? "Planning deferred tasks" : "Constructing deferred tasks", showInLog: false) - await TaskGroup.concurrentPerform(iterations: productPlanResultContexts.count, maximumParallelism: 10) { i in + await TaskGroup.concurrentPerform(iterations: productPlanResultContexts.count, maximumParallelism: mediumParallelism) { i in let productPlanResultContext = productPlanResultContexts[i] let plan = productPlanResultContext.productPlan plan.taskProducerContext.outputsOfMainTaskProducers = productPlanResultContext.outputNodes let deferredProducers = plan.taskProducerContext.takeDeferredProducers() if delegate.cancelled { return } - await TaskGroup.concurrentPerform(iterations: deferredProducers.count, maximumParallelism: 10) { i in + await TaskGroup.concurrentPerform(iterations: deferredProducers.count, maximumParallelism: mediumParallelism) { i in let tasks = await deferredProducers[i]() - aggregationQueue.async { - productPlanResultContext.addPlannedTasks(tasks) - } + // Direct call - thread-safe via ProductPlanResultContext's internal lock + productPlanResultContext.addPlannedTasks(tasks) } } - - // Wait for product plan aggregation. - await aggregationQueue.sync {} if delegate.cancelled { return nil } @@ -230,16 +227,13 @@ package final class BuildPlan: StaleFileRemovalContext { if delegate.cancelled { return [] } // Get the list of effective planned tasks for the product plan. - return await aggregationQueue.sync { resultContext.plannedTasks } + return resultContext.plannedTasks } } // Serially add the tasks for this product plan to the array for the whole build request. return await group.reduce(into: [], { $0.append(contentsOf: $1) }) } - - // Wait for task validation. - await aggregationQueue.sync{ } if delegate.cancelled { return nil } @@ -298,14 +292,20 @@ package final class BuildPlan: StaleFileRemovalContext { -/// This context stores the results of task generation for a product plan. It is used by a build plan to collect results of task generation, and once task generation is complete to compute the final set of planned tasks to be used for a product plan by evaluating task validity criteria.. +/// This context stores the results of task generation for a product plan. It is used by a build plan to collect results of task generation, and once task generation is complete to compute the final set of planned tasks to be used for a product plan by evaluating task validity criteria. /// -/// This class is not thread-safe; the build plan is expected to build up the context in a manner that accounts for that. +/// **Thread-Safety**: This class is thread-safe for concurrent task additions. +/// - `addPlannedTask()` and `addPlannedTasks()` are protected by an internal lock, allowing multiple producers to add tasks concurrently +/// - Properties like `plannedTasks`, `outputNodes`, `inputPaths`, etc. must only be accessed after all task additions are complete +/// - The BuildPlan ensures proper ordering: all writes complete before any reads of these properties private final class ProductPlanResultContext: TaskValidationContext, CustomStringConvertible { fileprivate let productPlan: ProductPlan private let targetName: String + /// Lock to protect concurrent access to mutable state + private let lock = Lock() + /// All planned tasks for the product plan. private var allPlannedTasks: Set> @@ -351,16 +351,18 @@ private final class ProductPlanResultContext: TaskValidationContext, CustomStrin } func addPlannedTask(_ plannedTask: any PlannedTask) { - allPlannedTasks.insert(Ref(plannedTask)) - - // Add the task's inputs and outputs to the result context. However, we only do this if the task doesn't have validity criteria. - // Otherwise, a task which we later determine is not valid might cause another to be considered valid when it otherwise would not be. - if plannedTask.validityCriteria == nil { - for input in plannedTask.inputs { - addInputPath(input.path) - } - for output in plannedTask.outputs { - addOutputPath(output.path) + lock.withLock { + allPlannedTasks.insert(Ref(plannedTask)) + + // Add the task's inputs and outputs to the result context. However, we only do this if the task doesn't have validity criteria. + // Otherwise, a task which we later determine is not valid might cause another to be considered valid when it otherwise would not be. + if plannedTask.validityCriteria == nil { + for input in plannedTask.inputs { + addInputPath(input.path) + } + for output in plannedTask.outputs { + addOutputPath(output.path) + } } } } diff --git a/Sources/SWBTaskConstruction/ProductPlanning/ProductPlanner.swift b/Sources/SWBTaskConstruction/ProductPlanning/ProductPlanner.swift index 8c1b889e..cf2a24f1 100644 --- a/Sources/SWBTaskConstruction/ProductPlanning/ProductPlanner.swift +++ b/Sources/SWBTaskConstruction/ProductPlanning/ProductPlanner.swift @@ -13,6 +13,7 @@ import SWBCore import SWBUtil import SWBMacro +import Foundation @PluginExtensionSystemActor internal func taskProducerExtensions(_ workspaceContext: WorkspaceContext) -> [any TaskProducerExtension] { let extensions = workspaceContext.core.pluginManager.extensions(of: TaskProducerExtensionPoint.self) @@ -48,7 +49,8 @@ package struct ProductPlanner let targetTaskInfos = globalProductPlan.targetGateNodes // Create the plans themselves in parallel. - var productPlans = await globalProductPlan.allTargets.asyncMap { configuredTarget in + let maxParallelism = max(1, ProcessInfo.processInfo.activeProcessorCount) + var productPlans = await globalProductPlan.allTargets.concurrentMap(maximumParallelism: maxParallelism) { configuredTarget in // Create the product plan for the this target, and serially add it to the list of product plans. return await ProductPlanBuilder(configuredTarget: configuredTarget, workspaceContext: self.planRequest.workspaceContext, delegate: self.delegate).createProductPlan(targetTaskInfos[configuredTarget]!, globalProductPlan) }