1818logger = setup_logger (__name__ )
1919shutdown_event = threading .Event ()
2020
21+ REDACT_SENSITIVE_LOGS = config .get_config_value ("REDACT_SENSITIVE_LOGS" , "true" ).lower () == "true"
22+
23+
24+ def safe_log (msg : str ) -> str :
25+ """Standardized redacted log message."""
26+ return f"{ msg } : [REDACTED]" if REDACT_SENSITIVE_LOGS else msg
27+
2128
2229def consume_messages (callback : Callable [[list [dict ]], None ]) -> None :
2330 """Start the queue listener for the configured queue type.
2431
2532 Args:
2633 callback: A function that takes a list of messages and processes them.
27-
2834 """
2935 signal .signal (signal .SIGINT , _graceful_shutdown )
3036 signal .signal (signal .SIGTERM , _graceful_shutdown )
@@ -35,7 +41,7 @@ def consume_messages(callback: Callable[[list[dict]], None]) -> None:
3541 elif queue_type == "sqs" :
3642 _start_sqs_listener (callback )
3743 else :
38- raise ValueError (f "Unsupported QUEUE_TYPE: { queue_type } " )
44+ raise ValueError ("Unsupported QUEUE_TYPE: [REDACTED] " )
3945
4046
4147def _graceful_shutdown (signum , frame ) -> None :
@@ -50,7 +56,6 @@ def _start_rabbitmq_listener(callback: Callable[[list[dict]], None]) -> None:
5056
5157 Args:
5258 callback: Function to process received messages.
53-
5459 """
5560 connection = pika .BlockingConnection (
5661 pika .ConnectionParameters (
@@ -76,11 +81,11 @@ def on_message(ch: BlockingChannel, method, properties, body: bytes) -> None:
7681 callback ([message ])
7782 ch .basic_ack (delivery_tag = method .delivery_tag )
7883 logger .debug ("✅ RabbitMQ message processed and acknowledged." )
79- except Exception as e :
80- logger .error (f "❌ Error processing RabbitMQ message: { e } " )
84+ except Exception :
85+ logger .error ("❌ RabbitMQ message processing failed (details redacted) " )
8186 ch .basic_nack (delivery_tag = method .delivery_tag , requeue = False )
8287
83- logger .info ("🚀 Consuming RabbitMQ messages from queue: %s" , queue_name )
88+ logger .info (safe_log ( "🚀 Consuming RabbitMQ messages from queue" ) )
8489
8590 try :
8691 channel .basic_qos (prefetch_count = config .get_batch_size ())
@@ -99,12 +104,11 @@ def _start_sqs_listener(callback: Callable[[list[dict]], None]) -> None:
99104
100105 Args:
101106 callback: Function to process a batch of messages.
102-
103107 """
104108 sqs = boto3 .client ("sqs" , region_name = config .get_sqs_region ())
105109 queue_url = config .get_sqs_queue_url ()
106110
107- logger .info ("🚀 Polling SQS queue: %s" , queue_url )
111+ logger .info (safe_log ( "🚀 Polling SQS queue" ) )
108112
109113 while not shutdown_event .is_set ():
110114 try :
@@ -125,17 +129,17 @@ def _start_sqs_listener(callback: Callable[[list[dict]], None]) -> None:
125129 payload = json .loads (msg ["Body" ])
126130 payloads .append (payload )
127131 receipt_handles .append (msg ["ReceiptHandle" ])
128- except Exception as e :
129- logger .warning (f "⚠️ Failed to parse SQS message: { e } " )
132+ except Exception :
133+ logger .warning ("⚠️ Failed to parse SQS message body (redacted) " )
130134
131135 if payloads :
132136 callback (payloads )
133137 for handle in receipt_handles :
134138 sqs .delete_message (QueueUrl = queue_url , ReceiptHandle = handle )
135139 logger .debug ("✅ SQS: Processed and deleted %d message(s)" , len (payloads ))
136140
137- except (BotoCoreError , NoCredentialsError ) as e :
138- logger .error ("❌ SQS error: %s" , e )
141+ except (BotoCoreError , NoCredentialsError ):
142+ logger .error ("❌ SQS error encountered (details redacted)" )
139143 time .sleep (5 )
140144
141145 logger .info ("🛑 SQS polling stopped." )
0 commit comments