Skip to content

Commit 19afcdd

Browse files
committed
chore: sync shared modules and metrics across all repositories
- Updated Dockerfile for Prometheus integration - Finalized metrics_server module and Prometheus counters/histograms - Synchronized shared files: queue_handler.py, queue_sender.py, metrics.py - Improved logging, retry logic, and structured metrics tracking - Ensured consistency across repositories to prevent configuration drift
1 parent ad674fb commit 19afcdd

File tree

8 files changed

+424
-159
lines changed

8 files changed

+424
-159
lines changed

Dockerfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ ENV PYTHONPATH="/app"
3737
RUN useradd -m appuser && chown -R appuser /app
3838
USER appuser
3939

40+
EXPOSE 8000
41+
4042
HEALTHCHECK --interval=30s --timeout=10s --retries=3 \
4143
CMD pgrep -f "app.main" > /dev/null || exit 1
4244

src/app/output_handler.py

Lines changed: 71 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,9 @@
1515
from app import config_shared
1616
from app.queue_sender import publish_to_queue
1717
from app.utils.metrics import (
18-
db_dispatch_counter,
19-
db_dispatch_duration,
20-
db_dispatch_failures,
21-
output_counter,
22-
paper_trade_counter,
23-
paper_trade_failures,
24-
rest_dispatch_counter,
25-
rest_dispatch_duration,
26-
rest_dispatch_failures,
27-
s3_dispatch_counter,
28-
s3_dispatch_duration,
29-
s3_dispatch_failures,
18+
record_output_metrics,
19+
record_paper_trade_metrics,
20+
record_sink_metrics,
3021
)
3122
from app.utils.setup_logger import setup_logger
3223
from app.utils.types import OutputMode, validate_list_of_dicts
@@ -42,11 +33,11 @@ def __init__(self) -> None:
4233
self.output_modes = config_shared.get_output_modes()
4334

4435
def send(self, data: list[dict[str, Any]]) -> None:
45-
"""
46-
Dispatch processed analysis output to one or more configured destinations.
36+
"""Dispatch processed analysis output to one or more configured destinations.
4737
4838
Args:
4939
data (list[dict[str, Any]]): List of data payloads to send.
40+
5041
"""
5142
try:
5243
validate_list_of_dicts(data, required_keys=["text"])
@@ -72,11 +63,11 @@ def send(self, data: list[dict[str, Any]]) -> None:
7263
logger.error("❌ Failed to send output: %s", e)
7364

7465
def send_trade_simulation(self, data: dict[str, Any]) -> None:
75-
"""
76-
Send simulated trade data to the appropriate paper trade destination.
66+
"""Send simulated trade data to the appropriate paper trade destination.
7767
7868
Args:
7969
data (dict[str, Any]): Simulated trade payload.
70+
8071
"""
8172
try:
8273
if config_shared.get_paper_trading_database_enabled():
@@ -85,19 +76,19 @@ def send_trade_simulation(self, data: dict[str, Any]) -> None:
8576
self._output_paper_trade_to_queue(data)
8677
except Exception as e:
8778
logger.error("❌ Failed to send paper trade: %s", e)
88-
self._record_metric("paper_trade_failure", 1)
79+
record_paper_trade_metrics("queue", success=False, duration_sec=0)
8980

9081
def _get_dispatch_method(
9182
self, mode: OutputMode
9283
) -> Callable[[list[dict[str, Any]]], None] | None:
93-
"""
94-
Resolve the output dispatch method based on the mode.
84+
"""Resolve the output dispatch method based on the mode.
9585
9686
Args:
9787
mode (OutputMode): Output mode enum value.
9888
9989
Returns:
10090
Callable or None: Method to handle the output.
91+
10192
"""
10293
return {
10394
OutputMode.LOG: self._output_to_log,
@@ -109,24 +100,46 @@ def _get_dispatch_method(
109100
}.get(mode)
110101

111102
def _output_to_log(self, data: list[dict[str, Any]]) -> None:
112-
"""Log each item in the data list."""
103+
"""Log each item in the data list.
104+
105+
Args:
106+
data (list[dict[str, Any]]): Data to log.
107+
108+
"""
113109
for item in data:
114110
logger.info("📝 Processed message:\n%s", json.dumps(item, indent=4))
115111

116112
def _output_to_stdout(self, data: list[dict[str, Any]]) -> None:
117-
"""Print each item in the data list to standard output."""
113+
"""Print each item in the data list to standard output.
114+
115+
Args:
116+
data (list[dict[str, Any]]): Data to print.
117+
118+
"""
118119
for item in data:
119120
print(json.dumps(item, indent=4))
120121

121122
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10))
122123
def _output_to_queue(self, data: list[dict[str, Any]]) -> None:
123-
"""Publish the data to the configured queue."""
124+
"""Publish the data to the configured queue.
125+
126+
Retries on failure using exponential backoff.
127+
128+
Args:
129+
data (list[dict[str, Any]]): Data to publish.
130+
131+
"""
124132
publish_to_queue(data)
125133
logger.info("✅ Output published to queue: %d message(s)", len(data))
126-
self._record_metric("output_queue_success", len(data))
134+
record_output_metrics("queue", success=True, duration_sec=0)
127135

128136
def _output_to_rest(self, data: list[dict[str, Any]]) -> None:
129-
"""Send the data to the configured REST endpoint."""
137+
"""Send the data to the configured REST endpoint.
138+
139+
Args:
140+
data (list[dict[str, Any]]): Data to post to REST API.
141+
142+
"""
130143
import requests
131144

132145
url = config_shared.get_rest_output_url()
@@ -135,20 +148,23 @@ def _output_to_rest(self, data: list[dict[str, Any]]) -> None:
135148
try:
136149
response = requests.post(url, json=data, headers=headers, timeout=10)
137150
duration = time.perf_counter() - start
138-
rest_dispatch_duration.labels(status=str(response.status_code)).observe(duration)
151+
record_sink_metrics("rest", str(response.status_code), duration, failed=not response.ok)
139152

140153
if response.ok:
141154
logger.info("🚀 Sent data to REST: HTTP %d", response.status_code)
142-
rest_dispatch_counter.labels(status=str(response.status_code)).inc()
143155
else:
144156
logger.error("❌ REST output failed: HTTP %d", response.status_code)
145-
rest_dispatch_failures.labels(status=str(response.status_code)).inc()
146157
except Exception as e:
147158
logger.error("❌ REST output error: %s", e)
148-
rest_dispatch_failures.labels(status="exception").inc()
159+
record_sink_metrics("rest", "exception", 0, failed=True)
149160

150161
def _output_to_s3(self, data: list[dict[str, Any]]) -> None:
151-
"""Upload the data as a JSON file to an S3 bucket."""
162+
"""Upload the data as a JSON file to an S3 bucket.
163+
164+
Args:
165+
data (list[dict[str, Any]]): Data to upload.
166+
167+
"""
152168
import boto3
153169

154170
s3 = boto3.client("s3")
@@ -158,15 +174,19 @@ def _output_to_s3(self, data: list[dict[str, Any]]) -> None:
158174
try:
159175
s3.put_object(Bucket=bucket, Key=key, Body=json.dumps(data).encode("utf-8"))
160176
duration = time.perf_counter() - start
161-
s3_dispatch_duration.labels(status="200").observe(duration)
162-
s3_dispatch_counter.labels(status="200").inc()
177+
record_sink_metrics("s3", "200", duration, failed=False)
163178
logger.info("🚚 Uploaded output to S3: %s/%s", bucket, key)
164179
except Exception as e:
165180
logger.error("❌ S3 upload failed: %s", e)
166-
s3_dispatch_failures.labels(status="exception").inc()
181+
record_sink_metrics("s3", "exception", 0, failed=True)
167182

168183
def _output_to_database(self, data: list[dict[str, Any]]) -> None:
169-
"""Write the data to the configured database using raw SQL inserts."""
184+
"""Write the data to the configured database using raw SQL inserts.
185+
186+
Args:
187+
data (list[dict[str, Any]]): Data records to insert.
188+
189+
"""
170190
import sqlalchemy
171191

172192
engine = sqlalchemy.create_engine(config_shared.get_database_output_url())
@@ -179,48 +199,46 @@ def _output_to_database(self, data: list[dict[str, Any]]) -> None:
179199
continue
180200
conn.execute(sqlalchemy.text(config_shared.get_database_insert_sql()), **item)
181201
duration = time.perf_counter() - start
182-
db_dispatch_duration.labels(status="success").observe(duration)
183-
db_dispatch_counter.labels(status="success").inc()
202+
record_sink_metrics("db", "success", duration, failed=False)
184203
logger.info("📊 Wrote %d records to database", len(data))
185204
except Exception as e:
186205
logger.error("❌ Database output failed: %s", e)
187-
db_dispatch_failures.labels(status="exception").inc()
206+
record_sink_metrics("db", "exception", 0, failed=True)
188207

189208
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10))
190209
def _output_paper_trade_to_queue(self, data: dict[str, Any]) -> None:
191-
"""Send paper trade data to a paper trading queue."""
210+
"""Send paper trade data to a paper trading queue.
211+
212+
Args:
213+
data (dict[str, Any]): Simulated trade to queue.
214+
215+
"""
192216
queue_name = config_shared.get_paper_trading_queue_name()
193217
exchange = config_shared.get_paper_trading_exchange()
194218
publish_to_queue([data], queue=queue_name, exchange=exchange)
195219
logger.info("🪙 Paper trade sent to queue:\n%s", json.dumps(data, indent=4))
196-
self._record_metric("paper_trade_sent", 1)
220+
record_paper_trade_metrics("queue", success=True, duration_sec=0)
197221

198222
def _output_paper_trade_to_database(self, data: dict[str, Any]) -> None:
199-
"""Placeholder for paper trade DB support (not implemented)."""
200-
logger.warning("⚠️ Paper trading database integration not implemented.")
201-
self._record_metric("paper_trade_skipped", 1)
223+
"""Placeholder for future paper trade DB integration.
224+
225+
Args:
226+
data (dict[str, Any]): Simulated trade record.
202227
203-
def _record_metric(self, name: str, value: int) -> None:
204-
"""Record a metric for dispatch monitoring."""
205-
if name == "output_queue_success":
206-
output_counter.labels(mode="queue").inc(value)
207-
elif name == "paper_trade_sent":
208-
paper_trade_counter.labels(destination="queue").inc(value)
209-
elif name == "paper_trade_failure":
210-
paper_trade_failures.labels(destination="queue").inc(value)
211-
else:
212-
logger.debug("📊 Metric: %s = %d", name, value)
228+
"""
229+
logger.warning("⚠️ Paper trading database integration not implemented.")
230+
logger.info("📊 Skipped paper trade (DB output not implemented).")
213231

214232

215233
output_handler = OutputDispatcher()
216234

217235

218236
def send_to_output(data: list[dict[str, Any]]) -> None:
219-
"""
220-
Send data using the default output handler instance.
237+
"""Send data using the default output handler instance.
221238
222239
Args:
223240
data (list[dict[str, Any]]): List of messages to dispatch.
241+
224242
"""
225243
output_handler.send(data)
226244

src/app/queue_handler.py

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1-
"""Generic queue handler for RabbitMQ or SQS with batching and retries."""
1+
"""Generic queue handler for RabbitMQ or SQS with batching and retries.
2+
3+
This module supports consuming messages from either RabbitMQ or Amazon SQS.
4+
It provides batching, retry logic, graceful shutdown handling, and clean logging
5+
with optional redaction of sensitive values.
6+
"""
27

38
import json
49
import signal
@@ -22,15 +27,29 @@
2227

2328

2429
def safe_log(msg: str) -> str:
25-
"""Standardized redacted log message."""
30+
"""Standardized redacted log message helper.
31+
32+
Args:
33+
msg (str): Message to redact if sensitive logging is enabled.
34+
35+
Returns:
36+
str: Either the original message or a redacted form.
37+
38+
"""
2639
return f"{msg}: [REDACTED]" if REDACT_SENSITIVE_LOGS else msg
2740

2841

2942
def consume_messages(callback: Callable[[list[dict]], None]) -> None:
30-
"""Start the queue listener for the configured queue type.
43+
"""Start the message consumer using the configured QUEUE_TYPE.
44+
45+
This method determines whether to use RabbitMQ or SQS and invokes the
46+
appropriate listener. It also registers signal handlers for graceful shutdown.
3147
3248
Args:
33-
callback: A function that takes a list of messages and processes them.
49+
callback (Callable[[list[dict]], None]): Processing function for a batch of messages.
50+
51+
Raises:
52+
ValueError: If QUEUE_TYPE is not supported.
3453
3554
"""
3655
signal.signal(signal.SIGINT, _graceful_shutdown)
@@ -46,17 +65,23 @@ def consume_messages(callback: Callable[[list[dict]], None]) -> None:
4665

4766

4867
def _graceful_shutdown(signum, frame) -> None:
49-
"""Handle shutdown signals to terminate listeners cleanly."""
68+
"""Gracefully signal shutdown of the consumer loop.
69+
70+
Args:
71+
signum: Signal number.
72+
frame: Current stack frame.
73+
74+
"""
5075
logger.info("🛑 Shutdown signal received, stopping listener...")
5176
shutdown_event.set()
5277

5378

5479
@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=2, max=10))
5580
def _start_rabbitmq_listener(callback: Callable[[list[dict]], None]) -> None:
56-
"""Connect to RabbitMQ and start consuming messages.
81+
"""Connect to RabbitMQ and start consuming messages from the configured queue.
5782
5883
Args:
59-
callback: Function to process received messages.
84+
callback (Callable[[list[dict]], None]): Handler function for batches of messages.
6085
6186
"""
6287
connection = pika.BlockingConnection(
@@ -74,6 +99,15 @@ def _start_rabbitmq_listener(callback: Callable[[list[dict]], None]) -> None:
7499
channel.queue_declare(queue=queue_name, durable=True)
75100

76101
def on_message(ch: BlockingChannel, method, properties, body: bytes) -> None:
102+
"""Callback invoked for each incoming RabbitMQ message.
103+
104+
Args:
105+
ch (BlockingChannel): The channel object.
106+
method: Delivery method.
107+
properties: Message properties.
108+
body (bytes): Raw message body.
109+
110+
"""
77111
if shutdown_event.is_set():
78112
ch.stop_consuming()
79113
return
@@ -105,7 +139,7 @@ def _start_sqs_listener(callback: Callable[[list[dict]], None]) -> None:
105139
"""Connect to AWS SQS and start polling messages.
106140
107141
Args:
108-
callback: Function to process a batch of messages.
142+
callback (Callable[[list[dict]], None]): Handler function for a batch of messages.
109143
110144
"""
111145
sqs = boto3.client("sqs", region_name=config.get_sqs_region())

0 commit comments

Comments
 (0)