|
18 | 18 | from parsl.serialize import pack_res_spec_apply_message, deserialize |
19 | 19 | from parsl.serialize.errors import SerializationError, DeserializationError |
20 | 20 | from parsl.app.errors import RemoteExceptionWrapper |
21 | | -from parsl.jobs.states import JobStatus, JobState |
| 21 | +from parsl.jobs.states import JobStatus, JobState, TERMINAL_STATES |
22 | 22 | from parsl.executors.high_throughput import zmq_pipes |
23 | 23 | from parsl.executors.high_throughput import interchange |
24 | 24 | from parsl.executors.errors import ( |
@@ -730,8 +730,22 @@ class BlockInfo: |
730 | 730 | tasks: int # sum of tasks in this block |
731 | 731 | idle: float # shortest idle time of any manager in this block |
732 | 732 |
|
| 733 | + # block_info will be populated from two sources: |
| 734 | + # the Job Status Poller mutable block list, and the list of blocks |
| 735 | + # which have connected to the interchange. |
| 736 | + |
| 737 | + def new_block_info(): |
| 738 | + return BlockInfo(tasks=0, idle=float('inf')) |
| 739 | + |
| 740 | + block_info: Dict[str, BlockInfo] = defaultdict(new_block_info) |
| 741 | + |
| 742 | + for block_id, job_status in self._poller_mutable_status.items(): |
| 743 | + if job_status.state not in TERMINAL_STATES: |
| 744 | + # TODO: is there a nicer way to make block_info come into existence? |
| 745 | + # can i write just the expression block_info[block_id] on its own? |
| 746 | + block_info[block_id] = new_block_info() |
| 747 | + |
733 | 748 | managers = self.connected_managers() |
734 | | - block_info: Dict[str, BlockInfo] = defaultdict(lambda: BlockInfo(tasks=0, idle=float('inf'))) |
735 | 749 | for manager in managers: |
736 | 750 | if not manager['active']: |
737 | 751 | continue |
|
0 commit comments