Skip to content

Commit f135919

Browse files
authored
Remove monitoring queue tag switch monitoring db pre-router (#3587)
The main goals of this PR is to make _migrate_logs_to_internal much more clearly a message forwarder, rather than a message interpreter. This follows on from PR #2168 which introduces _dispatch_to_internal to dispatches messages based on their tag rather than on the queue the message was received on, and is part of an ongoing series to simplify the queue and routing structure inside the monitoring router and database code. Further PRs in preparation (in draft PR #3315) contain further simplifications building on this PR. After this PR: * the database manager will respond to a STOP message on any incoming queue, vs previously only on the priority queue. This is a consequence of treating the queues all the same now. * the database manager will not perform such strong validation of message structure based on message tag at this point. That's part of expecting the code to forward messages, not inspect them, with later inspecting code being the place to care abou structure. This only affects behaviour when invalid messages are sent. Related PRs and context: #3567 changes the monitoring router to be more of a router and to not inspect and modify certain in-transit messages. There is a long slow project to regularise queues: PR #2117 makes resource info messages look like other message so they can be dispatched alongside other message types. The priority queue was initially (as I understand it) introduced to attempt to address a race condition of message order arrival vs SQL database key constraints. The priority queue is an attempt to force certain messages to be processed before others (not in the priority queue). However a subsequent commit in 2019, 0a4b685, introduces a more robust approach because this priority queue approach does not work and so is not needed.
1 parent 357547f commit f135919

File tree

1 file changed

+13
-30
lines changed

1 file changed

+13
-30
lines changed

parsl/monitoring/db_manager.py

Lines changed: 13 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -316,31 +316,31 @@ def start(self,
316316
self._kill_event = threading.Event()
317317
self._priority_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal,
318318
args=(
319-
priority_queue, 'priority', self._kill_event,),
319+
priority_queue, self._kill_event,),
320320
name="Monitoring-migrate-priority",
321321
daemon=True,
322322
)
323323
self._priority_queue_pull_thread.start()
324324

325325
self._node_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal,
326326
args=(
327-
node_queue, 'node', self._kill_event,),
327+
node_queue, self._kill_event,),
328328
name="Monitoring-migrate-node",
329329
daemon=True,
330330
)
331331
self._node_queue_pull_thread.start()
332332

333333
self._block_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal,
334334
args=(
335-
block_queue, 'block', self._kill_event,),
335+
block_queue, self._kill_event,),
336336
name="Monitoring-migrate-block",
337337
daemon=True,
338338
)
339339
self._block_queue_pull_thread.start()
340340

341341
self._resource_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal,
342342
args=(
343-
resource_queue, 'resource', self._kill_event,),
343+
resource_queue, self._kill_event,),
344344
name="Monitoring-migrate-resource",
345345
daemon=True,
346346
)
@@ -577,43 +577,26 @@ def start(self,
577577
raise RuntimeError("An exception happened sometime during database processing and should have been logged in database_manager.log")
578578

579579
@wrap_with_logs(target="database_manager")
580-
def _migrate_logs_to_internal(self, logs_queue: queue.Queue, queue_tag: str, kill_event: threading.Event) -> None:
581-
logger.info("Starting processing for queue {}".format(queue_tag))
580+
def _migrate_logs_to_internal(self, logs_queue: queue.Queue, kill_event: threading.Event) -> None:
581+
logger.info("Starting _migrate_logs_to_internal")
582582

583583
while not kill_event.is_set() or logs_queue.qsize() != 0:
584-
logger.debug("""Checking STOP conditions for {} threads: {}, {}"""
585-
.format(queue_tag, kill_event.is_set(), logs_queue.qsize() != 0))
584+
logger.debug("Checking STOP conditions: kill event: %s, queue has entries: %s",
585+
kill_event.is_set(), logs_queue.qsize() != 0)
586586
try:
587587
x, addr = logs_queue.get(timeout=0.1)
588588
except queue.Empty:
589589
continue
590590
else:
591-
if queue_tag == 'priority' and x == 'STOP':
591+
if x == 'STOP':
592592
self.close()
593-
elif queue_tag == 'priority': # implicitly not 'STOP'
594-
assert isinstance(x, tuple)
595-
assert len(x) == 2
596-
assert x[0] in [MessageType.WORKFLOW_INFO, MessageType.TASK_INFO], \
597-
"_migrate_logs_to_internal can only migrate WORKFLOW_,TASK_INFO message from priority queue, got x[0] == {}".format(x[0])
598-
self._dispatch_to_internal(x)
599-
elif queue_tag == 'resource':
600-
assert isinstance(x, tuple), "_migrate_logs_to_internal was expecting a tuple, got {}".format(x)
601-
assert x[0] == MessageType.RESOURCE_INFO, (
602-
"_migrate_logs_to_internal can only migrate RESOURCE_INFO message from resource queue, "
603-
"got tag {}, message {}".format(x[0], x)
604-
)
605-
self._dispatch_to_internal(x)
606-
elif queue_tag == 'node':
607-
assert len(x) == 2, "expected message tuple to have exactly two elements"
608-
assert x[0] == MessageType.NODE_INFO, "_migrate_logs_to_internal can only migrate NODE_INFO messages from node queue"
609-
610-
self._dispatch_to_internal(x)
611-
elif queue_tag == "block":
612-
self._dispatch_to_internal(x)
613593
else:
614-
logger.error(f"Discarding because unknown queue tag '{queue_tag}', message: {x}")
594+
self._dispatch_to_internal(x)
615595

616596
def _dispatch_to_internal(self, x: Tuple) -> None:
597+
assert isinstance(x, tuple)
598+
assert len(x) == 2, "expected message tuple to have exactly two elements"
599+
617600
if x[0] in [MessageType.WORKFLOW_INFO, MessageType.TASK_INFO]:
618601
self.pending_priority_queue.put(cast(Any, x))
619602
elif x[0] == MessageType.RESOURCE_INFO:

0 commit comments

Comments
 (0)