66
77import json
88import time
9- from typing import Any
9+ from typing import Any , Optional
1010
1111import boto3
1212import pika
1616
1717from app import config_shared
1818from app .utils .metrics import queue_publish_counter , queue_publish_latency
19- from app .utils .setup_logger import setup_logger
19+ from app .utils .safe_logger import safe_info , safe_error
2020
21- logger = setup_logger (__name__ )
22-
23- REDACT_SENSITIVE_LOGS = (
21+ REDACT_SENSITIVE_LOGS : bool = (
2422 config_shared .get_config_value_cached ("REDACT_SENSITIVE_LOGS" , "true" ).lower () == "true"
2523)
2624
2725
2826class SQSMessageSendError (Exception ):
2927 """Raised when SQS returns a non-200 HTTP status."""
28+ pass
3029
3130
3231def safe_log_message (data : dict [str , Any ]) -> str :
@@ -36,60 +35,56 @@ def safe_log_message(data: dict[str, Any]) -> str:
3635 data (dict[str, Any]): The message payload.
3736
3837 Returns:
39- str: JSON string or redacted placeholder.
40-
38+ str: Redacted placeholder or JSON-formatted string.
4139 """
4240 return "[REDACTED]" if REDACT_SENSITIVE_LOGS else json .dumps (data , ensure_ascii = False )
4341
4442
4543def publish_to_queue (
4644 payload : list [dict [str , Any ]],
47- queue : str | None = None ,
48- exchange : str | None = None ,
45+ queue : Optional [ str ] = None ,
46+ exchange : Optional [ str ] = None ,
4947) -> None :
5048 """Publish a batch of processed messages to the configured queue.
5149
5250 Args:
5351 payload (list[dict[str, Any]]): List of messages to send.
54- queue (str | None): Optional override for queue name or routing key.
55- exchange (str | None): Optional override for RabbitMQ exchange.
56-
52+ queue (Optional[str]): Optional override for queue name or routing key.
53+ exchange (Optional[str]): Optional override for RabbitMQ exchange.
5754 """
5855 if not isinstance (payload , list ):
59- logger . error ( "❌ Invalid payload type: expected list, got %s" , type (payload ).__name__ )
56+ safe_error ( " Invalid payload type" , { " expected" : " list" , " got" : str ( type (payload ).__name__ )} )
6057 return
6158
62- queue_type = config_shared .get_queue_type ().lower ()
59+ queue_type : str = config_shared .get_queue_type ().lower ()
6360
6461 for message in payload :
6562 if queue_type == "rabbitmq" :
6663 _send_to_rabbitmq (message , queue , exchange )
6764 elif queue_type == "sqs" :
6865 _send_to_sqs (message , queue )
6966 else :
70- redacted_type = "[REDACTED]" if REDACT_SENSITIVE_LOGS else queue_type
71- logger .error ("❌ Invalid QUEUE_TYPE: %s" , redacted_type )
67+ safe_error ("Invalid QUEUE_TYPE" , {"queue_type" : "[REDACTED]" if REDACT_SENSITIVE_LOGS else queue_type })
7268
7369
7470@retry (stop = stop_after_attempt (3 ), wait = wait_exponential (min = 2 , max = 10 ))
7571def _send_to_rabbitmq (
7672 data : dict [str , Any ],
77- routing_key : str | None = None ,
78- exchange : str | None = None ,
73+ routing_key : Optional [ str ] = None ,
74+ exchange : Optional [ str ] = None ,
7975) -> None :
8076 """Send a single message to RabbitMQ.
8177
8278 Args:
8379 data (dict[str, Any]): The message payload.
84- routing_key (str | None ): Optional routing key override.
85- exchange (str | None ): Optional exchange override.
80+ routing_key (Optional[ str] ): Optional routing key override.
81+ exchange (Optional[ str] ): Optional exchange override.
8682
8783 Raises:
8884 AMQPConnectionError: On RabbitMQ connection failure.
8985 Exception: On publish failure.
90-
9186 """
92- start = time .perf_counter ()
87+ start : float = time .perf_counter ()
9388 try :
9489 credentials = pika .PlainCredentials (
9590 config_shared .get_rabbitmq_user (),
@@ -105,84 +100,96 @@ def _send_to_rabbitmq(
105100
106101 with pika .BlockingConnection (parameters ) as connection :
107102 channel = connection .channel ()
108- resolved_exchange = exchange or config_shared .get_rabbitmq_exchange ()
109- resolved_routing_key = routing_key or config_shared .get_rabbitmq_routing_key ()
103+ resolved_exchange : str = exchange or config_shared .get_rabbitmq_exchange ()
104+ resolved_routing_key : str = routing_key or config_shared .get_rabbitmq_routing_key ()
110105 channel .basic_publish (
111106 exchange = resolved_exchange ,
112107 routing_key = resolved_routing_key ,
113108 body = json .dumps (data , ensure_ascii = False ),
114109 )
115110
116- duration = time .perf_counter () - start
111+ duration : float = time .perf_counter () - start
117112 queue_publish_counter .labels (queue_type = "rabbitmq" , status = "success" ).inc ()
118113 queue_publish_latency .labels (queue_type = "rabbitmq" , status = "success" ).observe (duration )
119- logger .info ("✅ Published message to RabbitMQ: %s" , safe_log_message (data )) # nosec
114+ safe_info ("Published message to RabbitMQ" , {
115+ "exchange" : resolved_exchange ,
116+ "routing_key" : resolved_routing_key ,
117+ "duration" : duration ,
118+ "message" : data ,
119+ })
120120
121121 except AMQPConnectionError as e :
122122 duration = time .perf_counter () - start
123123 queue_publish_counter .labels (queue_type = "rabbitmq" , status = "failure" ).inc ()
124124 queue_publish_latency .labels (queue_type = "rabbitmq" , status = "failure" ).observe (duration )
125- logger . exception ( "❌ RabbitMQ publish connection error: %s " , e )
125+ safe_error ( " RabbitMQ publish connection error" , { "error" : str ( e ), "duration" : duration } )
126126 raise
127127 except Exception as e :
128128 duration = time .perf_counter () - start
129129 queue_publish_counter .labels (queue_type = "rabbitmq" , status = "exception" ).inc ()
130130 queue_publish_latency .labels (queue_type = "rabbitmq" , status = "exception" ).observe (duration )
131- logger . exception ( "❌ Failed to publish message to RabbitMQ: %s " , e )
131+ safe_error ( "Unhandled error during RabbitMQ publish " , { "error" : str ( e ), "duration" : duration } )
132132 raise
133133
134134
135135@retry (stop = stop_after_attempt (3 ), wait = wait_exponential (min = 2 , max = 10 ))
136136def _send_to_sqs (
137137 data : dict [str , Any ],
138- queue_name : str | None = None ,
138+ queue_name : Optional [ str ] = None ,
139139) -> None :
140140 """Send a single message to AWS SQS.
141141
142142 Args:
143143 data (dict[str, Any]): The message payload.
144- queue_name (str | None ): Optional override for SQS queue URL.
144+ queue_name (Optional[ str] ): Optional override for SQS queue URL.
145145
146146 Raises:
147147 BotoCoreError: On SQS client error.
148148 NoCredentialsError: If AWS credentials are not available.
149149 SQSMessageSendError: If HTTP response code is not 200.
150150 Exception: On publish failure.
151-
152151 """
153- sqs_url = queue_name or config_shared .get_sqs_queue_url ()
154- region = config_shared .get_sqs_region ()
152+ sqs_url : str = queue_name or config_shared .get_sqs_queue_url ()
153+ region : str = config_shared .get_sqs_region ()
155154
156- start = time .perf_counter ()
155+ start : float = time .perf_counter ()
157156 try :
158157 sqs_client = boto3 .client ("sqs" , region_name = region )
159158 response = sqs_client .send_message (
160159 QueueUrl = sqs_url ,
161160 MessageBody = json .dumps (data , ensure_ascii = False ),
162161 )
163162
164- status_code = response ["ResponseMetadata" ]["HTTPStatusCode" ]
165- duration = time .perf_counter () - start
163+ status_code : int = response ["ResponseMetadata" ]["HTTPStatusCode" ]
164+ duration : float = time .perf_counter () - start
166165
167166 if status_code != 200 :
168167 queue_publish_counter .labels (queue_type = "sqs" , status = "failure" ).inc ()
169168 queue_publish_latency .labels (queue_type = "sqs" , status = "failure" ).observe (duration )
170- logger .error ("❌ Failed to publish message to SQS: HTTP %d" , status_code )
169+ safe_error ("Failed to publish message to SQS" , {
170+ "http_status" : status_code ,
171+ "duration" : duration ,
172+ "queue_url" : sqs_url ,
173+ })
171174 raise SQSMessageSendError (f"SQS returned HTTP status { status_code } " )
172175
173176 queue_publish_counter .labels (queue_type = "sqs" , status = "success" ).inc ()
174177 queue_publish_latency .labels (queue_type = "sqs" , status = "success" ).observe (duration )
175- logger .info ("✅ Published message to SQS: %s" , safe_log_message (data )) # nosec
178+ safe_info ("Published message to SQS" , {
179+ "queue_url" : sqs_url ,
180+ "duration" : duration ,
181+ "message" : data ,
182+ })
176183
177184 except (BotoCoreError , NoCredentialsError ) as e :
178185 duration = time .perf_counter () - start
179186 queue_publish_counter .labels (queue_type = "sqs" , status = "failure" ).inc ()
180187 queue_publish_latency .labels (queue_type = "sqs" , status = "failure" ).observe (duration )
181- logger . exception ( "❌ Failed to initialize SQS client: %s " , e )
188+ safe_error ( " SQS client error " , { "error" : str ( e ), "duration" : duration } )
182189 raise
183190 except Exception as e :
184191 duration = time .perf_counter () - start
185192 queue_publish_counter .labels (queue_type = "sqs" , status = "exception" ).inc ()
186193 queue_publish_latency .labels (queue_type = "sqs" , status = "exception" ).observe (duration )
187- logger . exception ( "❌ Failed to publish message to SQS: %s " , e )
194+ safe_error ( "Unhandled error during SQS publish " , { "error" : str ( e ), "duration" : duration } )
188195 raise
0 commit comments