@@ -215,6 +215,13 @@ class WorkQueueExecutor(BlockProviderExecutor, putils.RepresentationMixin):
215215 This requires a version of Work Queue / cctools after commit
216216 874df524516441da531b694afc9d591e8b134b73 (release 7.5.0 is too early).
217217 Default is False.
218+
219+ scaling_cores_per_worker: int
220+ When using Parsl scaling, this specifies the number of cores that a
221+ worker is expected to have available for computation. Default 1. This
222+ parameter can be ignored when using a fixed number of blocks, or when
223+ using one task per worker (by omitting a ``cores`` resource
224+ specifiation for each task).
218225 """
219226
220227 radio_mode = "filesystem"
@@ -244,12 +251,14 @@ def __init__(self,
244251 full_debug : bool = True ,
245252 worker_executable : str = 'work_queue_worker' ,
246253 function_dir : Optional [str ] = None ,
247- coprocess : bool = False ):
254+ coprocess : bool = False ,
255+ scaling_cores_per_worker : int = 1 ):
248256 BlockProviderExecutor .__init__ (self , provider = provider ,
249257 block_error_handler = True )
250258 if not _work_queue_enabled :
251259 raise OptionalModuleMissing (['work_queue' ], "WorkQueueExecutor requires the work_queue module." )
252260
261+ self .scaling_cores_per_worker = scaling_cores_per_worker
253262 self .label = label
254263 self .task_queue = multiprocessing .Queue () # type: multiprocessing.Queue
255264 self .collector_queue = multiprocessing .Queue () # type: multiprocessing.Queue
@@ -469,6 +478,8 @@ def submit(self, func, resource_specification, *args, **kwargs):
469478 # Create a Future object and have it be mapped from the task ID in the tasks dictionary
470479 fu = Future ()
471480 fu .parsl_executor_task_id = executor_task_id
481+ assert isinstance (resource_specification , dict )
482+ fu .resource_specification = resource_specification
472483 logger .debug ("Getting tasks_lock to set WQ-level task entry" )
473484 with self .tasks_lock :
474485 logger .debug ("Got tasks_lock to set WQ-level task entry" )
@@ -654,20 +665,29 @@ def initialize_scaling(self):
654665
655666 @property
656667 def outstanding (self ) -> int :
657- """Count the number of outstanding tasks . This is inefficiently
668+ """Count the number of outstanding slots required . This is inefficiently
658669 implemented and probably could be replaced with a counter.
659670 """
671+ logger .debug ("Calculating outstanding task slot load" )
660672 outstanding = 0
673+ tasks = 0 # only for log message...
661674 with self .tasks_lock :
662675 for fut in self .tasks .values ():
663676 if not fut .done ():
664- outstanding += 1
665- logger .debug (f"Counted { outstanding } outstanding tasks" )
677+ # if a task does not specify a core count, Work Queue will allocate an entire
678+ # worker node to that task. That's approximated here by saying that it uses
679+ # scaling_cores_per_worker.
680+ resource_spec = getattr (fut , 'resource_specification' , {})
681+ cores = resource_spec .get ('cores' , self .scaling_cores_per_worker )
682+
683+ outstanding += cores
684+ tasks += 1
685+ logger .debug (f"Counted { tasks } outstanding tasks with { outstanding } outstanding slots" )
666686 return outstanding
667687
668688 @property
669689 def workers_per_node (self ) -> Union [int , float ]:
670- return 1
690+ return self . scaling_cores_per_worker
671691
672692 def scale_in (self , count : int ) -> List [str ]:
673693 """Scale in method.
0 commit comments