11import os
2- import pika
32import subprocess
43import json
54from minio import Minio
87import threading
98import queue
109import redis
10+ import traceback
1111import time
1212
1313# Read environment variables.
14- RABBITMQ_URL = os .getenv ("RABBITMQ_URL" , "amqp://user:password@localhost:5672/" )
15- QUEUE_NAME = os .getenv ("RABBITMQ_QUEUE" , "task_queue" )
14+ QUEUE_NAME = os .getenv ("REDIS_QUEUE_NAME" , "task_queue" )
1615MINIO_URL = os .getenv ("MINIO_URL" , "localhost:9000" )
1716MINIO_ACCESS_KEY = os .getenv ("MINIO_ACCESS_KEY" , "minioadmin" )
1817MINIO_SECRET_KEY = os .getenv ("MINIO_SECRET_KEY" , "minioadmin" )
1918COCKROACHDB_URL = os .getenv (
2019 "COCKROACHDB_URL" , "postgresql://root@localhost:26257/defaultdb?sslmode=disable"
2120)
2221REDIS_URL = os .getenv ("REDIS_URL" , "redis://localhost:6379/0" )
23-
24-
22+ MESSAGE_RETRY_DELAY = 10
2523STREAM_END = object ()
2624LOG_DATA_FIELD = b"log_data"
2725
26+ redis_connection = None
27+ stop_flag = threading .Event ()
2828
2929def parse_json_string (json_string ):
3030 """Parses a JSON string and returns a Python object."""
@@ -230,20 +230,20 @@ def redis_stream_adder(redis_url, log_queue, run_id):
230230
231231
232232# Callback function that is called when a message is received.
233- def process_message (ch , method , properties , body ):
234- """Callback function when a message is received from RabbitMQ """
235- msg = body .decode ()
233+ def process_message (body ):
234+ """Callback function when a message is received from Redis List """
235+ msg = body .strip ()
236236 print (f"Received Message: { msg } " )
237237 data = parse_json_string (msg )
238238 if data is None :
239- ch . basic_ack ( delivery_tag = method . delivery_tag )
239+ print ( "Skipping. Invalid JSON message..." )
240240 return
241241
242242 runId = data .get ("runId" )
243243 fileName = data .get ("fileName" )
244244 extension = data .get ("extension" )
245245 if not all ([runId , fileName , extension ]):
246- ch . basic_ack ( delivery_tag = method . delivery_tag )
246+ print ( "Skipping. Missing required fields in message..." )
247247 return
248248
249249 redis_adder_thread = None
@@ -293,7 +293,10 @@ def process_message(ch, method, properties, body):
293293 except psycopg2 .Error as e :
294294 print (f"Error updating run status/fetching type in CockroachDB: { e } " )
295295 run_status = "failed"
296- raise
296+ print (f"Requeuing message for run { runId } ..." )
297+ redis_connection .lpush (QUEUE_NAME , msg ) # Requeue message
298+ time .sleep (MESSAGE_RETRY_DELAY )
299+ return
297300 finally :
298301 if conn :
299302 if "cur" in locals () and cur :
@@ -310,10 +313,6 @@ def process_message(ch, method, properties, body):
310313 print (f"Running command: { ' ' .join (command )} in { file_parent_dir } " )
311314 print (f"Timeout: { timeout_sec } seconds" )
312315
313- # Acknowledge message BEFORE starting the potentially long subprocess.
314- print ("Acknowledging RabbitMQ message before starting subprocess." )
315- ch .basic_ack (delivery_tag = method .delivery_tag )
316-
317316 process = subprocess .Popen (
318317 command ,
319318 stdout = subprocess .PIPE ,
@@ -471,30 +470,50 @@ def process_message(ch, method, properties, body):
471470 print ("-" * 20 )
472471 # TODO: Clean up local files.
473472
473+ # Redis Client Connection
474+ def connect_redis ():
475+ global redis_connection
476+ try :
477+ redis_connection = redis .from_url (REDIS_URL , decode_responses = True )
478+ redis_connection .ping ()
479+ print (f"Connected to Redis: { REDIS_URL } " )
480+ except redis .ConnectionError as conn_err :
481+ print (f"Failed to connect to Redis: { conn_err } " )
482+ exit (1 )
483+
484+ # Redis Lists Worker Loop
485+ def worker_loop ():
486+ print ("Worker started. Waiting for messages..." )
487+ while not stop_flag .is_set ():
488+ try :
489+ item = redis_connection .brpop (QUEUE_NAME , timeout = 5 )
490+ if not item :
491+ continue
492+
493+ _ , message = item
494+ process_message (message )
495+
496+ except redis .ConnectionError as e :
497+ print (f"Redis connection lost: { e } " )
498+ connect_redis ()
499+ time .sleep (5 )
500+ continue
501+ except KeyboardInterrupt :
502+ print ("\n Stopping worker..." )
503+ stop_flag .set ()
504+ break
505+ except Exception as e :
506+ print (f"Unexpected error in worker loop: { e } " )
507+ traceback .print_exc ()
508+ time .sleep (5 )
474509
475- # Main RabbitMQ Connection and Consumption
476- connection = None
477510try :
478- parameters = pika .URLParameters (RABBITMQ_URL )
479- connection = pika .BlockingConnection (parameters )
480- channel = connection .channel ()
481- channel .queue_declare (queue = QUEUE_NAME , durable = True )
482- channel .basic_qos (prefetch_count = 1 )
483- print ("Waiting for messages..." )
484- channel .basic_consume (queue = QUEUE_NAME , on_message_callback = process_message )
485- channel .start_consuming ()
486- except pika .exceptions .AMQPConnectionError as conn_err :
487- print (f"Failed to connect to RabbitMQ: { conn_err } " )
488- exit (1 )
511+ connect_redis ()
512+ worker_loop ()
489513except KeyboardInterrupt :
490- print ("\n Stopping worker..." )
491- if "channel" in locals () and channel .is_open :
492- channel .stop_consuming ()
493- if connection and connection .is_open :
494- connection .close ()
495- print ("Worker stopped." )
496- except Exception as e :
497- print (f"An unexpected error occurred in the main loop: { e } " )
498- if connection and connection .is_open :
499- connection .close ()
500- exit (1 )
514+ print ("\n Cleaning up and exiting..." )
515+ finally :
516+ stop_flag .set ()
517+ if redis_connection :
518+ redis_connection .close ()
519+ print ("Worker connection closed." )
0 commit comments