Skip to content

Commit 39f0d8a

Browse files
committed
executors get current time directly from time, not from job status poller
this is a behavioural change - but what's the behavioural change? if there are multiple executors, then cache refreshes might now happen on a slightly different cadence: the 2nd executor now time will be later (it will be the end time of the 1st executor polling process) which means it might now fire a bit earlier
1 parent f135277 commit 39f0d8a

File tree

2 files changed

+6
-6
lines changed

2 files changed

+6
-6
lines changed

parsl/executors/status_handling.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22
import logging
33
import threading
4+
import time
45
from itertools import compress
56
from abc import abstractmethod, abstractproperty
67
from concurrent.futures import Future
@@ -242,7 +243,8 @@ def workers_per_node(self) -> Union[int, float]:
242243
def _should_poll(self, now: float) -> bool:
243244
return now >= self._last_poll_time + self.status_polling_interval
244245

245-
def _refresh_poll_mutable_status_if_time(self, now):
246+
def _refresh_poll_mutable_status_if_time(self):
247+
now = time.time()
246248
if self._should_poll(now):
247249
self._poller_mutable_status = self.status()
248250
self._last_poll_time = now

parsl/jobs/job_status_poller.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import logging
22
import parsl
3-
import time
43
import zmq
54
from typing import Dict, List, Sequence, Optional, Union
65

@@ -34,10 +33,10 @@ def __init__(self, executor: BlockProviderExecutor, dfk: Optional["parsl.dataflo
3433
self.hub_channel.connect("tcp://{}:{}".format(hub_address, hub_port))
3534
logger.info("Monitoring enabled on job status poller")
3635

37-
def poll(self, now: float) -> None:
36+
def poll(self) -> None:
3837
previous_status = self.executor._poller_mutable_status
3938

40-
self.executor._refresh_poll_mutable_status_if_time(now)
39+
self.executor._refresh_poll_mutable_status_if_time()
4140

4241
if previous_status != self.executor._poller_mutable_status:
4342
# short circuit the case where the two objects are identical so
@@ -124,9 +123,8 @@ def _run_error_handlers(self, status: List[PolledExecutorFacade]) -> None:
124123
es.executor.handle_errors(es.status)
125124

126125
def _update_state(self) -> None:
127-
now = time.time()
128126
for item in self._poll_items:
129-
item.poll(now)
127+
item.poll()
130128

131129
def add_executors(self, executors: Sequence[BlockProviderExecutor]) -> None:
132130
for executor in executors:

0 commit comments

Comments
 (0)