From bb98f5fd4541da5f2bc0ccce76153bda9af8f2df Mon Sep 17 00:00:00 2001 From: Dongling Date: Thu, 11 Jul 2024 12:00:11 +0800 Subject: [PATCH] cordinate producer and consumer more gracefully --- python/utils/worker.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/python/utils/worker.py b/python/utils/worker.py index 3d11b3cae..5d3d29813 100644 --- a/python/utils/worker.py +++ b/python/utils/worker.py @@ -248,6 +248,7 @@ def producer( ) if q.qsize() == 0: break + sleep(1) # don't log to much logging.info( "[Parser] The stream is ended", extra={ @@ -362,8 +363,12 @@ async def consumer_impl( total_size = 0 for task_index in range(num_concurrent_processing_tasks): if task_index == 0: - # If we're the first task, we should wait until we get data. - chain_id, size_in_bytes, transactions = q.get() + # If we're the first task, we should wait longer to get data. + try: + # here the producer may have exited, just calling q.get() will make the consumer_impl be stucked here + chain_id, size_in_bytes, transactions = q.get(timeout=5) + except queue.Empty: + continue else: # If we're not the first task, we should poll to see if we get any data. try: @@ -389,7 +394,9 @@ async def consumer_impl( # os._exit(1) last_fetched_version = transactions[-1].version transaction_batches.append(transactions) - + if len(transaction_batches) == 0: + # continue to check producer_thread.is_alive() + continue processor_threads = [] for transactions in transaction_batches: thread = IndexerProcessorServer.WorkerThread(