Skip to content

Commit fe749fa

Browse files
committed
feat(utils): finalize shared utility modules and tests
- Added and validated all utility modules: retry_request, request_with_timeout, track_polling_metrics, track_request_metrics, validate_data, validate_environment_variables, vault_client, setup_logger, and types - Corrected and standardized all docstrings and type annotations - Added full test coverage for metrics, retry, and validation utilities - Synced and verified config_shared.py across repositories - Fixed Pyright error in test_track_polling_metrics by using cast for Literal checks - Confirmed no missing functions or unresolved references remain
1 parent 9cac7c7 commit fe749fa

25 files changed

+1621
-424
lines changed

src/app/config_shared.py

Lines changed: 886 additions & 75 deletions
Large diffs are not rendered by default.

src/app/output_handler.py

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
"""
77

88
import json
9-
from typing import Any, cast
9+
from typing import Any
1010

1111
from tenacity import retry, stop_after_attempt, wait_exponential
1212

@@ -19,35 +19,41 @@
1919

2020

2121
def send_to_output(data: list[dict[str, Any]]) -> None:
22-
"""Route processed output to the configured destination.
22+
"""
23+
Route processed output to one or more configured destinations.
2324
24-
Validates and dispatches messages to the configured output mode:
25-
'log', 'stdout', 'queue', 'rest', 's3', or 'database'.
25+
Validates and dispatches messages to each enabled output mode.
2626
2727
Args:
2828
data (list[dict[str, Any]]): A list of enriched messages to route.
29-
3029
"""
3130
try:
3231
validate_list_of_dicts(data, required_keys=["text"])
3332

34-
mode: OutputMode = cast(OutputMode, config_shared.get_output_mode())
35-
36-
if mode == OutputMode.LOG:
37-
_output_to_log(data)
38-
elif mode == OutputMode.STDOUT:
39-
_output_to_stdout(data)
40-
elif mode == OutputMode.QUEUE:
41-
_output_to_queue(data)
42-
elif mode == OutputMode.REST:
43-
_output_to_rest(data)
44-
elif mode == OutputMode.S3:
45-
_output_to_s3(data)
46-
elif mode == OutputMode.DATABASE:
47-
_output_to_database(data)
48-
else:
49-
logger.warning("⚠️ Unknown output mode: %s — defaulting to log.", mode)
50-
_output_to_log(data)
33+
# Use OUTPUT_MODES if defined, fallback to single OUTPUT_MODE
34+
modes = config_shared.get_output_modes()
35+
for mode_str in modes:
36+
try:
37+
mode = OutputMode(mode_str)
38+
except ValueError:
39+
logger.warning("⚠️ Unknown output mode: %s — skipping.", mode_str)
40+
continue
41+
42+
if mode == OutputMode.LOG:
43+
_output_to_log(data)
44+
elif mode == OutputMode.STDOUT:
45+
_output_to_stdout(data)
46+
elif mode == OutputMode.QUEUE:
47+
_output_to_queue(data)
48+
elif mode == OutputMode.REST:
49+
_output_to_rest(data)
50+
elif mode == OutputMode.S3:
51+
_output_to_s3(data)
52+
elif mode == OutputMode.DATABASE:
53+
_output_to_database(data)
54+
else:
55+
logger.warning("⚠️ Unhandled output mode: %s", mode)
56+
5157
except Exception as e:
5258
logger.error("❌ Failed to send output: %s", e)
5359

src/app/queue_sender.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020

2121
logger = setup_logger(__name__)
2222

23-
REDACT_SENSITIVE_LOGS = config_shared.get_config_value("REDACT_SENSITIVE_LOGS", "true").lower() == "true"
23+
REDACT_SENSITIVE_LOGS = (
24+
config_shared.get_config_value("REDACT_SENSITIVE_LOGS", "true").lower() == "true"
25+
)
2426

2527

2628
def safe_log_message(data: dict[str, Any]) -> str:

src/app/utils/__init__.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1-
"""The module initializes the utilities package for the application.
1+
"""Initialize the `utils` package for shared application utilities.
22
3-
Utilities included:
4-
- retry_request: Function for retrying operations with exponential backoff.
5-
- validate_data: Validates the structure and content of data.
6-
- track_polling_metrics: Tracks polling metrics for success and failure.
7-
- track_request_metrics: Tracks metrics for individual API requests.
8-
- request_with_timeout: Makes HTTP requests with a timeout.
9-
- validate_environment_variables: Validates required environment variables.
10-
- setup_logger: Configures logging for the application.
3+
Included Utilities:
4+
- setup_logger: Configures logging with structured output.
5+
- retry_request: Retries a function with optional delay on failure.
6+
- request_with_timeout: Makes HTTP GET requests with timeout and validation.
7+
- validate_data: Validates schema and batch structure of data.
8+
- validate_environment_variables: Ensures required environment variables are set.
9+
- track_polling_metrics: Logs success/failure of polling operations.
10+
- track_request_metrics: Logs request-level metrics (rate limits, success, etc.).
1111
"""
1212

1313
from .request_with_timeout import request_with_timeout
@@ -19,14 +19,14 @@
1919
from .validate_environment_variables import validate_environment_variables
2020

2121
__all__ = [
22+
"setup_logger",
2223
"retry_request",
24+
"request_with_timeout",
2325
"validate_data",
26+
"validate_environment_variables",
2427
"track_polling_metrics",
2528
"track_request_metrics",
26-
"request_with_timeout",
27-
"validate_environment_variables",
28-
"setup_logger",
2929
]
3030

31-
# Initialize package-level logger
31+
# Initialize package-level logger for utilities
3232
logger = setup_logger(name="utils")

src/app/utils/healthcheck.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
"""Healthcheck utility module for readiness and liveness probes.
2+
3+
Provides application status flags and an optional HTTP server for use with
4+
container orchestrators like Kubernetes or Docker.
5+
"""
6+
7+
import logging
8+
import threading
9+
from http.server import BaseHTTPRequestHandler, HTTPServer
10+
11+
from app import config_shared
12+
13+
logger: logging.Logger = logging.getLogger(__name__)
14+
15+
# Service status flags
16+
_readiness_flag: bool = False
17+
_health_flag: bool = True
18+
19+
20+
def is_ready() -> bool:
21+
"""
22+
Check if the service is ready to handle requests.
23+
24+
Returns:
25+
bool: True if the service has completed startup and is ready.
26+
"""
27+
return _readiness_flag
28+
29+
30+
def is_healthy() -> bool:
31+
"""
32+
Check if the service is currently healthy.
33+
34+
Returns:
35+
bool: True if the service is healthy and not in a failure state.
36+
"""
37+
return _health_flag
38+
39+
40+
def set_ready() -> None:
41+
"""
42+
Mark the service as ready to handle traffic.
43+
"""
44+
global _readiness_flag
45+
_readiness_flag = True
46+
logger.info("✅ Service marked as ready")
47+
48+
49+
def set_unhealthy() -> None:
50+
"""
51+
Mark the service as unhealthy (e.g., during shutdown or failure).
52+
"""
53+
global _health_flag
54+
_health_flag = False
55+
logger.warning("❌ Service marked as unhealthy")
56+
57+
58+
class HealthHandler(BaseHTTPRequestHandler):
59+
"""
60+
HTTP request handler for /health and /ready endpoints.
61+
"""
62+
63+
def do_GET(self) -> None:
64+
"""
65+
Handle GET requests for readiness and liveness checks.
66+
"""
67+
if self.path == "/health":
68+
status: int = 200 if is_healthy() else 500
69+
self.send_response(status)
70+
self.end_headers()
71+
self.wfile.write(b"healthy" if status == 200 else b"unhealthy")
72+
73+
elif self.path == "/ready":
74+
status: int = 200 if is_ready() else 503
75+
self.send_response(status)
76+
self.end_headers()
77+
self.wfile.write(b"ready" if status == 200 else b"not ready")
78+
79+
else:
80+
self.send_response(404)
81+
self.end_headers()
82+
self.wfile.write(b"not found")
83+
84+
def log_message(self, format: str, *args: object) -> None:
85+
"""
86+
Suppress default access log output from BaseHTTPRequestHandler.
87+
"""
88+
pass
89+
90+
91+
def start_health_server(port: int = 8081) -> None:
92+
"""
93+
Start an HTTP server exposing /health and /ready endpoints.
94+
95+
This server runs in a background thread and is intended for use with
96+
readiness and liveness probes in orchestration environments.
97+
98+
Args:
99+
port (int): Port to bind the health server to. Defaults to 8081.
100+
"""
101+
if not config_shared.get_healthcheck_enabled():
102+
logger.info("⚠️ Healthcheck server is disabled by configuration.")
103+
return
104+
105+
def serve() -> None:
106+
with HTTPServer(("0.0.0.0", port), HealthHandler) as httpd:
107+
logger.info("📡 Healthcheck server running on port %d", port)
108+
httpd.serve_forever()
109+
110+
thread: threading.Thread = threading.Thread(target=serve, daemon=True)
111+
thread.start()

src/app/utils/metrics.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
"""Shared Prometheus metric definitions for all pollers and processors.
2+
3+
Exports counters and histograms for:
4+
- Polling operations
5+
- HTTP requests
6+
- Output handling
7+
- Message processing
8+
"""
9+
10+
from prometheus_client import Counter, Histogram
11+
12+
# Output Metrics
13+
output_counter = Counter(
14+
"output_messages_total",
15+
"Total number of messages successfully sent to each output mode.",
16+
["mode"],
17+
)
18+
19+
output_failures = Counter(
20+
"output_failures_total", "Total number of output failures by mode.", ["mode"]
21+
)
22+
23+
output_duration = Histogram(
24+
"output_duration_seconds", "Time taken to send output messages by mode.", ["mode"]
25+
)
26+
27+
# Polling Metrics
28+
poll_counter = Counter("poll_cycles_total", "Total number of polling cycles by poller.", ["poller"])
29+
30+
poll_errors = Counter("poll_errors_total", "Total number of poller errors by poller.", ["poller"])
31+
32+
poll_duration = Histogram(
33+
"poll_duration_seconds", "Duration of polling cycles by poller.", ["poller"]
34+
)
35+
36+
# HTTP Request Metrics
37+
http_request_counter = Counter(
38+
"http_requests_total",
39+
"Total number of HTTP requests by service and method.",
40+
["service", "method", "status"],
41+
)
42+
43+
http_request_duration = Histogram(
44+
"http_request_duration_seconds",
45+
"Duration of HTTP requests by service and method.",
46+
["service", "method"],
47+
)
48+
49+
# Message Processing Metrics
50+
process_success = Counter(
51+
"message_processing_success_total", "Number of messages processed successfully.", ["processor"]
52+
)
53+
54+
process_failure = Counter(
55+
"message_processing_failure_total",
56+
"Number of failed message processing attempts.",
57+
["processor"],
58+
)
59+
60+
process_duration = Histogram(
61+
"message_processing_duration_seconds", "Time taken to process messages.", ["processor"]
62+
)
63+
64+
# Validation Failures
65+
validation_failures = Counter(
66+
"message_validation_failures_total",
67+
"Number of failed message validation attempts.",
68+
["processor"],
69+
)
70+
71+
validation_duration = Histogram(
72+
"message_validation_duration_seconds",
73+
"Duration of validation checks per message.",
74+
["processor"],
75+
)

0 commit comments

Comments
 (0)