Skip to content

Commit 73571be

Browse files
committed
Handle init_blocks in scaling strategy, rather than special-casing it
This is part of issue #3278 tidying up job and block management. Now init_blocks scale out happens on the first strategy poll, not at executor start - that will often delay init_blocks scaling by one strategy poll period compared to before this PR.
1 parent 114f7c5 commit 73571be

File tree

7 files changed

+22
-40
lines changed

7 files changed

+22
-40
lines changed

parsl/dataflow/dflow.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1141,14 +1141,7 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None:
11411141
self._create_remote_dirs_over_channel(executor.provider, executor.provider.channel)
11421142

11431143
self.executors[executor.label] = executor
1144-
block_ids = executor.start()
1145-
if self.monitoring and block_ids:
1146-
new_status = {}
1147-
for bid in block_ids:
1148-
new_status[bid] = JobStatus(JobState.PENDING)
1149-
msg = executor.create_monitoring_info(new_status)
1150-
logger.debug("Sending monitoring message {} to hub from DFK".format(msg))
1151-
self.monitoring.send(MessageType.BLOCK_INFO, msg)
1144+
executor.start()
11521145
block_executors = [e for e in executors if isinstance(e, BlockProviderExecutor)]
11531146
self.job_status_poller.add_executors(block_executors)
11541147

parsl/executors/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> Literal[False]:
5353
return False
5454

5555
@abstractmethod
56-
def start(self) -> Optional[List[str]]:
56+
def start(self) -> None:
5757
"""Start the executor.
5858
5959
Any spin-up operations (for example: starting thread pools) should be performed here.

parsl/executors/high_throughput/executor.py

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -400,16 +400,6 @@ def initialize_scaling(self):
400400

401401
logger.debug("Starting HighThroughputExecutor with provider:\n%s", self.provider)
402402

403-
# TODO: why is this a provider property?
404-
block_ids = []
405-
if hasattr(self.provider, 'init_blocks'):
406-
try:
407-
block_ids = self.scale_out(blocks=self.provider.init_blocks)
408-
except Exception as e:
409-
logger.error("Scaling out failed: {}".format(e))
410-
raise e
411-
return block_ids
412-
413403
def start(self):
414404
"""Create the Interchange process and connect to it.
415405
"""
@@ -439,8 +429,7 @@ def start(self):
439429

440430
logger.debug("Created management thread: {}".format(self._queue_management_thread))
441431

442-
block_ids = self.initialize_scaling()
443-
return block_ids
432+
self.initialize_scaling()
444433

445434
@wrap_with_logs
446435
def _queue_management_worker(self):

parsl/executors/taskvine/executor.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -580,13 +580,6 @@ def initialize_scaling(self):
580580
self._worker_command = self._construct_worker_command()
581581
self._patch_providers()
582582

583-
if hasattr(self.provider, 'init_blocks'):
584-
try:
585-
self.scale_out(blocks=self.provider.init_blocks)
586-
except Exception as e:
587-
logger.error("Initial block scaling out failed: {}".format(e))
588-
raise e
589-
590583
@property
591584
def outstanding(self) -> int:
592585
"""Count the number of outstanding tasks."""

parsl/executors/workqueue/executor.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -669,13 +669,6 @@ def initialize_scaling(self):
669669
self.worker_command = self._construct_worker_command()
670670
self._patch_providers()
671671

672-
if hasattr(self.provider, 'init_blocks'):
673-
try:
674-
self.scale_out(blocks=self.provider.init_blocks)
675-
except Exception as e:
676-
logger.error("Initial block scaling out failed: {}".format(e))
677-
raise e
678-
679672
@property
680673
def outstanding(self) -> int:
681674
"""Count the number of outstanding tasks. This is inefficiently

parsl/jobs/job_status_poller.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ def __init__(self, executor: BlockProviderExecutor, dfk: Optional["parsl.dataflo
2323
self._interval = executor.status_polling_interval
2424
self._last_poll_time = 0.0
2525
self._status = {} # type: Dict[str, JobStatus]
26+
self.first = True
2627

2728
# Create a ZMQ channel to send poll status to monitoring
2829
self.monitoring_enabled = False

parsl/jobs/strategy.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,8 @@ def __init__(self, *, strategy: Optional[str], max_idletime: float) -> None:
129129
self.executors = {}
130130
self.max_idletime = max_idletime
131131

132-
self.strategies = {None: self._strategy_noop,
133-
'none': self._strategy_noop,
132+
self.strategies = {None: self._strategy_init_only,
133+
'none': self._strategy_init_only,
134134
'simple': self._strategy_simple,
135135
'htex_auto_scale': self._strategy_htex_auto_scale}
136136

@@ -146,10 +146,17 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None:
146146
for executor in executors:
147147
self.executors[executor.label] = {'idle_since': None}
148148

149-
def _strategy_noop(self, status: List[jsp.PollItem]) -> None:
150-
"""Do nothing.
149+
def _strategy_init_only(self, status_list: List[jsp.PollItem]) -> None:
150+
"""Scale up to init_blocks at the start, then nothing more.
151151
"""
152-
logger.debug("strategy_noop: doing nothing")
152+
for exec_status in status_list:
153+
if exec_status.first:
154+
executor = exec_status.executor
155+
logger.debug(f"strategy_init_only: scaling out {executor.provider.init_blocks} initial blocks for {executor.label}")
156+
exec_status.scale_out(executor.provider.init_blocks)
157+
exec_status.first = False
158+
else:
159+
logger.debug("strategy_init_only: doing nothing")
153160

154161
def _strategy_simple(self, status_list: List[jsp.PollItem]) -> None:
155162
self._general_strategy(status_list, strategy_type='simple')
@@ -183,6 +190,12 @@ def _general_strategy(self, status_list, *, strategy_type):
183190
continue
184191
logger.debug(f"Strategizing for executor {label}")
185192

193+
if exec_status.first:
194+
executor = exec_status.executor
195+
logger.debug(f"Scaling out {executor.provider.init_blocks} initial blocks for {label}")
196+
exec_status.scale_out(executor.provider.init_blocks)
197+
exec_status.first = False
198+
186199
# Tasks that are either pending completion
187200
active_tasks = executor.outstanding
188201

0 commit comments

Comments
 (0)