Skip to content

Commit 165fdc5

Browse files
authored
Replace one interchange task counter with task_id (#4013)
Prior to this PR, the interchange had two task counters. The one removed by this PR was used to: i) disambiguate tasks of equal priority - in effect using the counter as yet another task identifier, distinct from the Parsl or HTEX task IDs. ii) log how many tasks have been received by the interchange This PR removes that counter: i) tasks of equal priority are disambiguated by their HTEX-level task_id. It is already a protocol requirement that different tasks have different task IDs. ii) The total tasks received log message is replaced by per-task logging at the same place which makes it clear *which* tasks have been received, instead of recording the count received. In debugging task flow, this has been more useful to me. # Changed Behaviour Tasks of equal priority might be differently disambiguated now, but probably not -- in general tasks will arrive in task_id order. Equal priority tasks, by definition, have no expected relative execution order, though. ## Type of change - Update to human readable text: Documentation/error messages/comments - Code maintenance/cleanup
1 parent cdc4445 commit 165fdc5

File tree

2 files changed

+5
-8
lines changed

2 files changed

+5
-8
lines changed

parsl/executors/high_throughput/interchange.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -138,9 +138,6 @@ def __init__(self,
138138

139139
self.pending_task_queue: SortedList[Any] = SortedList(key=lambda tup: (tup[0], tup[1]))
140140

141-
# count of tasks that have been received from the submit side
142-
self.task_counter = 0
143-
144141
# count of tasks that have been sent out to worker pools
145142
self.count = 0
146143

@@ -332,15 +329,15 @@ def process_task_incoming(self) -> None:
332329
msg = self.task_incoming.recv_pyobj()
333330

334331
# Process priority, higher number = lower priority
332+
task_id = msg['task_id']
335333
resource_spec = msg['context'].get('resource_spec', {})
336334
priority = resource_spec.get('priority', float('inf'))
337-
queue_entry = (-priority, -self.task_counter, msg)
335+
queue_entry = (-priority, -task_id, msg)
338336

339-
logger.debug("putting message onto pending_task_queue")
337+
logger.debug("Putting task %s onto pending_task_queue", task_id)
340338

341339
self.pending_task_queue.add(queue_entry)
342-
self.task_counter += 1
343-
logger.debug(f"Fetched {self.task_counter} tasks so far")
340+
logger.debug("Put task %s onto pending_task_queue", task_id)
344341

345342
def process_manager_socket_message(
346343
self,

parsl/tests/test_htex/test_priority_queue.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def interchange_logs_task_count():
6161
with open(htex.worker_logdir + "/interchange.log", "r") as f:
6262
lines = f.readlines()
6363
for line in lines:
64-
if f"Fetched {n} tasks so far" in line:
64+
if f"Put task {n} onto pending_task_queue" in line:
6565
return True
6666
return False
6767

0 commit comments

Comments
 (0)