1515from app import config_shared
1616from app .queue_sender import publish_to_queue
1717from app .utils .metrics import (
18- db_dispatch_counter ,
19- db_dispatch_duration ,
20- db_dispatch_failures ,
21- output_counter ,
22- paper_trade_counter ,
23- paper_trade_failures ,
24- rest_dispatch_counter ,
25- rest_dispatch_duration ,
26- rest_dispatch_failures ,
27- s3_dispatch_counter ,
28- s3_dispatch_duration ,
29- s3_dispatch_failures ,
18+ record_output_metrics ,
19+ record_paper_trade_metrics ,
20+ record_sink_metrics ,
3021)
3122from app .utils .setup_logger import setup_logger
3223from app .utils .types import OutputMode , validate_list_of_dicts
@@ -42,11 +33,11 @@ def __init__(self) -> None:
4233 self .output_modes = config_shared .get_output_modes ()
4334
4435 def send (self , data : list [dict [str , Any ]]) -> None :
45- """
46- Dispatch processed analysis output to one or more configured destinations.
36+ """Dispatch processed analysis output to one or more configured destinations.
4737
4838 Args:
4939 data (list[dict[str, Any]]): List of data payloads to send.
40+
5041 """
5142 try :
5243 validate_list_of_dicts (data , required_keys = ["text" ])
@@ -72,11 +63,11 @@ def send(self, data: list[dict[str, Any]]) -> None:
7263 logger .error ("❌ Failed to send output: %s" , e )
7364
7465 def send_trade_simulation (self , data : dict [str , Any ]) -> None :
75- """
76- Send simulated trade data to the appropriate paper trade destination.
66+ """Send simulated trade data to the appropriate paper trade destination.
7767
7868 Args:
7969 data (dict[str, Any]): Simulated trade payload.
70+
8071 """
8172 try :
8273 if config_shared .get_paper_trading_database_enabled ():
@@ -85,19 +76,19 @@ def send_trade_simulation(self, data: dict[str, Any]) -> None:
8576 self ._output_paper_trade_to_queue (data )
8677 except Exception as e :
8778 logger .error ("❌ Failed to send paper trade: %s" , e )
88- self . _record_metric ( "paper_trade_failure " , 1 )
79+ record_paper_trade_metrics ( "queue " , success = False , duration_sec = 0 )
8980
9081 def _get_dispatch_method (
9182 self , mode : OutputMode
9283 ) -> Callable [[list [dict [str , Any ]]], None ] | None :
93- """
94- Resolve the output dispatch method based on the mode.
84+ """Resolve the output dispatch method based on the mode.
9585
9686 Args:
9787 mode (OutputMode): Output mode enum value.
9888
9989 Returns:
10090 Callable or None: Method to handle the output.
91+
10192 """
10293 return {
10394 OutputMode .LOG : self ._output_to_log ,
@@ -109,24 +100,46 @@ def _get_dispatch_method(
109100 }.get (mode )
110101
111102 def _output_to_log (self , data : list [dict [str , Any ]]) -> None :
112- """Log each item in the data list."""
103+ """Log each item in the data list.
104+
105+ Args:
106+ data (list[dict[str, Any]]): Data to log.
107+
108+ """
113109 for item in data :
114110 logger .info ("📝 Processed message:\n %s" , json .dumps (item , indent = 4 ))
115111
116112 def _output_to_stdout (self , data : list [dict [str , Any ]]) -> None :
117- """Print each item in the data list to standard output."""
113+ """Print each item in the data list to standard output.
114+
115+ Args:
116+ data (list[dict[str, Any]]): Data to print.
117+
118+ """
118119 for item in data :
119120 print (json .dumps (item , indent = 4 ))
120121
121122 @retry (stop = stop_after_attempt (3 ), wait = wait_exponential (multiplier = 1 , min = 1 , max = 10 ))
122123 def _output_to_queue (self , data : list [dict [str , Any ]]) -> None :
123- """Publish the data to the configured queue."""
124+ """Publish the data to the configured queue.
125+
126+ Retries on failure using exponential backoff.
127+
128+ Args:
129+ data (list[dict[str, Any]]): Data to publish.
130+
131+ """
124132 publish_to_queue (data )
125133 logger .info ("✅ Output published to queue: %d message(s)" , len (data ))
126- self . _record_metric ( "output_queue_success " , len ( data ) )
134+ record_output_metrics ( "queue " , success = True , duration_sec = 0 )
127135
128136 def _output_to_rest (self , data : list [dict [str , Any ]]) -> None :
129- """Send the data to the configured REST endpoint."""
137+ """Send the data to the configured REST endpoint.
138+
139+ Args:
140+ data (list[dict[str, Any]]): Data to post to REST API.
141+
142+ """
130143 import requests
131144
132145 url = config_shared .get_rest_output_url ()
@@ -135,20 +148,23 @@ def _output_to_rest(self, data: list[dict[str, Any]]) -> None:
135148 try :
136149 response = requests .post (url , json = data , headers = headers , timeout = 10 )
137150 duration = time .perf_counter () - start
138- rest_dispatch_duration . labels ( status = str (response .status_code )). observe ( duration )
151+ record_sink_metrics ( "rest" , str (response .status_code ), duration , failed = not response . ok )
139152
140153 if response .ok :
141154 logger .info ("🚀 Sent data to REST: HTTP %d" , response .status_code )
142- rest_dispatch_counter .labels (status = str (response .status_code )).inc ()
143155 else :
144156 logger .error ("❌ REST output failed: HTTP %d" , response .status_code )
145- rest_dispatch_failures .labels (status = str (response .status_code )).inc ()
146157 except Exception as e :
147158 logger .error ("❌ REST output error: %s" , e )
148- rest_dispatch_failures . labels ( status = " exception"). inc ( )
159+ record_sink_metrics ( "rest" , " exception", 0 , failed = True )
149160
150161 def _output_to_s3 (self , data : list [dict [str , Any ]]) -> None :
151- """Upload the data as a JSON file to an S3 bucket."""
162+ """Upload the data as a JSON file to an S3 bucket.
163+
164+ Args:
165+ data (list[dict[str, Any]]): Data to upload.
166+
167+ """
152168 import boto3
153169
154170 s3 = boto3 .client ("s3" )
@@ -158,15 +174,19 @@ def _output_to_s3(self, data: list[dict[str, Any]]) -> None:
158174 try :
159175 s3 .put_object (Bucket = bucket , Key = key , Body = json .dumps (data ).encode ("utf-8" ))
160176 duration = time .perf_counter () - start
161- s3_dispatch_duration .labels (status = "200" ).observe (duration )
162- s3_dispatch_counter .labels (status = "200" ).inc ()
177+ record_sink_metrics ("s3" , "200" , duration , failed = False )
163178 logger .info ("🚚 Uploaded output to S3: %s/%s" , bucket , key )
164179 except Exception as e :
165180 logger .error ("❌ S3 upload failed: %s" , e )
166- s3_dispatch_failures . labels ( status = " exception"). inc ( )
181+ record_sink_metrics ( "s3" , " exception", 0 , failed = True )
167182
168183 def _output_to_database (self , data : list [dict [str , Any ]]) -> None :
169- """Write the data to the configured database using raw SQL inserts."""
184+ """Write the data to the configured database using raw SQL inserts.
185+
186+ Args:
187+ data (list[dict[str, Any]]): Data records to insert.
188+
189+ """
170190 import sqlalchemy
171191
172192 engine = sqlalchemy .create_engine (config_shared .get_database_output_url ())
@@ -179,48 +199,46 @@ def _output_to_database(self, data: list[dict[str, Any]]) -> None:
179199 continue
180200 conn .execute (sqlalchemy .text (config_shared .get_database_insert_sql ()), ** item )
181201 duration = time .perf_counter () - start
182- db_dispatch_duration .labels (status = "success" ).observe (duration )
183- db_dispatch_counter .labels (status = "success" ).inc ()
202+ record_sink_metrics ("db" , "success" , duration , failed = False )
184203 logger .info ("📊 Wrote %d records to database" , len (data ))
185204 except Exception as e :
186205 logger .error ("❌ Database output failed: %s" , e )
187- db_dispatch_failures . labels ( status = " exception"). inc ( )
206+ record_sink_metrics ( "db" , " exception", 0 , failed = True )
188207
189208 @retry (stop = stop_after_attempt (3 ), wait = wait_exponential (multiplier = 1 , min = 1 , max = 10 ))
190209 def _output_paper_trade_to_queue (self , data : dict [str , Any ]) -> None :
191- """Send paper trade data to a paper trading queue."""
210+ """Send paper trade data to a paper trading queue.
211+
212+ Args:
213+ data (dict[str, Any]): Simulated trade to queue.
214+
215+ """
192216 queue_name = config_shared .get_paper_trading_queue_name ()
193217 exchange = config_shared .get_paper_trading_exchange ()
194218 publish_to_queue ([data ], queue = queue_name , exchange = exchange )
195219 logger .info ("🪙 Paper trade sent to queue:\n %s" , json .dumps (data , indent = 4 ))
196- self . _record_metric ( "paper_trade_sent " , 1 )
220+ record_paper_trade_metrics ( "queue " , success = True , duration_sec = 0 )
197221
198222 def _output_paper_trade_to_database (self , data : dict [str , Any ]) -> None :
199- """Placeholder for paper trade DB support (not implemented)."""
200- logger .warning ("⚠️ Paper trading database integration not implemented." )
201- self ._record_metric ("paper_trade_skipped" , 1 )
223+ """Placeholder for future paper trade DB integration.
224+
225+ Args:
226+ data (dict[str, Any]): Simulated trade record.
202227
203- def _record_metric (self , name : str , value : int ) -> None :
204- """Record a metric for dispatch monitoring."""
205- if name == "output_queue_success" :
206- output_counter .labels (mode = "queue" ).inc (value )
207- elif name == "paper_trade_sent" :
208- paper_trade_counter .labels (destination = "queue" ).inc (value )
209- elif name == "paper_trade_failure" :
210- paper_trade_failures .labels (destination = "queue" ).inc (value )
211- else :
212- logger .debug ("📊 Metric: %s = %d" , name , value )
228+ """
229+ logger .warning ("⚠️ Paper trading database integration not implemented." )
230+ logger .info ("📊 Skipped paper trade (DB output not implemented)." )
213231
214232
215233output_handler = OutputDispatcher ()
216234
217235
218236def send_to_output (data : list [dict [str , Any ]]) -> None :
219- """
220- Send data using the default output handler instance.
237+ """Send data using the default output handler instance.
221238
222239 Args:
223240 data (list[dict[str, Any]]): List of messages to dispatch.
241+
224242 """
225243 output_handler .send (data )
226244
0 commit comments