Skip to content

Commit 046d280

Browse files
committed
improved scheduler task sync
1 parent 75406bc commit 046d280

File tree

1 file changed

+14
-11
lines changed

1 file changed

+14
-11
lines changed

streamflow/scheduling/scheduler.py

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ def __init__(
4040
super().__init__(context)
4141
self.binding_filter_map: MutableMapping[str, BindingFilter] = {}
4242
self.pending_jobs: MutableMapping[str, MutableSequence[JobContext]] = {}
43+
self.pending_jobs_conditional = asyncio.Condition()
4344
self.pending_job_event = asyncio.Event()
4445
self.policy_map: MutableMapping[str, Policy] = {}
4546
self.retry_interval: int | None = retry_delay if retry_delay != 0 else None
@@ -181,7 +182,12 @@ async def _get_available_locations(
181182
async def _scheduling_task(self):
182183
try:
183184
while True:
185+
# Event that awake the scheduling task:
186+
# - there is a new job to schedule
187+
# - some resources are released and there are some pending jobs
184188
await self.pending_job_event.wait()
189+
async with self.pending_jobs_conditional:
190+
self.pending_job_event.clear()
185191
for deployment_name, job_contexts in self.pending_jobs.items():
186192
logger.info("Start scheduling")
187193

@@ -230,14 +236,9 @@ async def _scheduling_task(self):
230236
job_context.targets[0],
231237
)
232238
job_context.event.set()
233-
234-
# todo: awake scheduling:
235-
# - there is a new job to schedule
236-
# - some resources are released and there are some pending jobs
237-
# self.pending_job_event.clear()
238-
logger.info("Sleep")
239-
await asyncio.sleep(0.2)
239+
# todo: fix job with multi-targets case
240240
except Exception as e:
241+
# todo: propagate error to context (?)
241242
logger.exception(f"Scheduler failed: {e}")
242243
raise
243244

@@ -260,8 +261,9 @@ async def notify_status(self, job_name: str, status: Status) -> None:
260261
if logger.isEnabledFor(logging.DEBUG):
261262
logger.debug(f"Job {job_name} changed status to {status.name}")
262263

263-
# Notify scheduling loop: there are free resources
264-
self.pending_job_event.set()
264+
# Notify scheduling task: there are free resources
265+
async with self.pending_jobs_conditional:
266+
self.pending_job_event.set()
265267

266268
async def schedule(
267269
self,
@@ -278,8 +280,9 @@ async def schedule(
278280
deployment = target.deployment
279281
self.pending_jobs.setdefault(deployment.name, []).append(job_context)
280282

281-
# Notify scheduling loop: there is a job to schedule
282-
self.pending_job_event.set()
283+
# Notify scheduling task: there is a job to schedule
284+
async with self.pending_jobs_conditional:
285+
self.pending_job_event.set()
283286

284287
# Wait the job is scheduled
285288
await job_context.event.wait()

0 commit comments

Comments
 (0)