Skip to content

Commit 7626141

Browse files
committed
Replace use of queue.qsize() in monitoring database manager
Prior to this PR, each use of .qsize() was of the form q.qsize() != 0 which is equivalent to (not q.empty()) Queue.empty() is better supported across platforms than .qsize() This PR makes that substitution, meaning the broken (#3856) MacSafeQueue subclass will not be needed in this situation any more and the database manager will be able to work with a regular multiprocessing Queue even on Mac. This PR does not actually change the Queue type - it only reduces the functionality required by whichever queue is supplied. This is part of a series of PRs to remove that platform-specific functionality entirely - see PR #3932.
1 parent 0b9cd5b commit 7626141

File tree

1 file changed

+8
-8
lines changed

1 file changed

+8
-8
lines changed

parsl/monitoring/db_manager.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -346,9 +346,9 @@ def start(self,
346346
exception_happened = False
347347

348348
while (not self._kill_event.is_set() or
349-
self.pending_priority_queue.qsize() != 0 or self.pending_resource_queue.qsize() != 0 or
350-
self.pending_node_queue.qsize() != 0 or self.pending_block_queue.qsize() != 0 or
351-
resource_queue.qsize() != 0):
349+
not self.pending_priority_queue.empty() or not self.pending_resource_queue.empty() or
350+
not self.pending_node_queue.empty() or not self.pending_block_queue.empty() or
351+
not resource_queue.empty()):
352352

353353
"""
354354
WORKFLOW_INFO and TASK_INFO messages (i.e. priority messages)
@@ -357,9 +357,9 @@ def start(self,
357357
try:
358358
logger.debug("""Checking STOP conditions: {}, {}, {}, {}, {}, {}""".format(
359359
self._kill_event.is_set(),
360-
self.pending_priority_queue.qsize() != 0, self.pending_resource_queue.qsize() != 0,
361-
self.pending_node_queue.qsize() != 0, self.pending_block_queue.qsize() != 0,
362-
resource_queue.qsize() != 0))
360+
not self.pending_priority_queue.empty(), not self.pending_resource_queue.empty(),
361+
not self.pending_node_queue.empty(), not self.pending_block_queue.empty(),
362+
not resource_queue.empty()))
363363

364364
# This is the list of resource messages which can be reprocessed as if they
365365
# had just arrived because the corresponding first task message has been
@@ -558,9 +558,9 @@ def start(self,
558558
def _migrate_logs_to_internal(self, logs_queue: mpq.Queue, kill_event: threading.Event) -> None:
559559
logger.info("Starting _migrate_logs_to_internal")
560560

561-
while not kill_event.is_set() or logs_queue.qsize() != 0:
561+
while not kill_event.is_set() or not logs_queue.empty():
562562
logger.debug("Checking STOP conditions: kill event: %s, queue has entries: %s",
563-
kill_event.is_set(), logs_queue.qsize() != 0)
563+
kill_event.is_set(), not logs_queue.empty())
564564

565565
try:
566566
x = logs_queue.get(timeout=0.1)

0 commit comments

Comments
 (0)