Skip to content

Commit 9dc562e

Browse files
committed
chore: sync shared modules and metrics across all repositories
- Updated all repos to support prometheus metrics
1 parent 19afcdd commit 9dc562e

File tree

4 files changed

+201
-53
lines changed

4 files changed

+201
-53
lines changed

src/app/config_shared.py

Lines changed: 135 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -8,47 +8,9 @@
88

99
from app.utils.config_utils import get_config_bool, get_config_value
1010
from app.utils.types import OutputMode
11-
from app.utils.vault_client import VaultClient, get_secret_or_env
11+
from app.utils.vault_client import get_secret_or_env
1212

13-
_vault = VaultClient()
14-
15-
# @lru_cache
16-
# def get_config_value(key: str, default: Optional[str] = None) -> str:
17-
# """
18-
# Retrieve a configuration value from Vault, environment variable, or fallback.
19-
20-
# Args:
21-
# key (str): The configuration key to retrieve.
22-
# default (Optional[str]): Fallback value if the key is not found.
23-
24-
# Returns:
25-
# str: The resolved configuration value.
26-
27-
# Raises:
28-
# ValueError: If no value is found and no default is provided.
29-
# """
30-
# val = _vault.get(key, os.getenv(key))
31-
# if val is None:
32-
# if default is not None:
33-
# return str(default)
34-
# raise ValueError(f"❌ Missing required config for key: {key}")
35-
# return str(val)
36-
37-
38-
# @lru_cache
39-
# def get_config_bool(key: str, default: bool = False) -> bool:
40-
# """
41-
# Retrieve a boolean configuration value with support for multiple true-like values.
42-
43-
# Args:
44-
# key (str): The configuration key to retrieve.
45-
# default (bool): Default value if the key is not found.
46-
47-
# Returns:
48-
# bool: The resolved boolean configuration value.
49-
# """
50-
# val = get_config_value(key, str(default)).strip().lower()
51-
# return val in {"1", "true", "yes", "on"}
13+
# _vault = VaultClient()
5214

5315

5416
@lru_cache
@@ -1371,3 +1333,136 @@ def get_healthcheck_port() -> int:
13711333
13721334
"""
13731335
return int(get_config_value("HEALTHCHECK_PORT", "8081"))
1336+
1337+
1338+
@lru_cache
1339+
def get_metrics_enabled() -> bool:
1340+
"""Determine whether the Prometheus metrics server should be started.
1341+
1342+
This checks the value of METRICS_ENABLED from Vault or the environment.
1343+
Accepts truthy values: "1", "true", "yes" (case-insensitive).
1344+
1345+
Returns:
1346+
bool: True if metrics are enabled, otherwise False.
1347+
1348+
"""
1349+
val = get_config_value("METRICS_ENABLED", "true").lower()
1350+
return val in ("1", "true", "yes")
1351+
1352+
1353+
@lru_cache
1354+
def get_metrics_port() -> int:
1355+
"""Retrieve the TCP port number on which the Prometheus metrics server should listen.
1356+
1357+
Returns:
1358+
int: Port number (default: 8000).
1359+
1360+
Raises:
1361+
ValueError: If METRICS_PORT is not a valid integer.
1362+
1363+
"""
1364+
port_str = get_config_value("METRICS_PORT", "8000")
1365+
try:
1366+
return int(port_str)
1367+
except ValueError:
1368+
raise ValueError(f"❌ Invalid METRICS_PORT value: '{port_str}' must be an integer.")
1369+
1370+
1371+
@lru_cache
1372+
def get_metrics_bind_address() -> str:
1373+
"""Get the network address to bind the metrics server.
1374+
1375+
Returns:
1376+
str: Bind address (default: "0.0.0.0").
1377+
1378+
"""
1379+
return get_config_value("METRICS_BIND_ADDRESS", "0.0.0.0")
1380+
1381+
1382+
@lru_cache
1383+
def get_config_bool(key: str, default: bool = False) -> bool:
1384+
"""Retrieve a boolean configuration value from Vault or environment.
1385+
1386+
Interprets common truthy values ("1", "true", "yes", "on") as True.
1387+
Uses a string fallback if the key is not found.
1388+
1389+
Args:
1390+
key (str): The configuration key to retrieve.
1391+
default (bool): The fallback value if key is missing or invalid.
1392+
1393+
Returns:
1394+
bool: Boolean value for the requested configuration key.
1395+
1396+
"""
1397+
val: str = get_config_value(key, str(default)).lower()
1398+
return val in ("1", "true", "yes", "on")
1399+
1400+
1401+
@lru_cache
1402+
def get_output_modes() -> List[str]:
1403+
"""Retrieve a list of enabled output modes.
1404+
1405+
This reads the OUTPUT_MODES config key, splits it by commas, and returns
1406+
a lowercase list of modes like "s3", "rest", or "database".
1407+
1408+
Returns:
1409+
List[str]: Enabled output modes.
1410+
1411+
"""
1412+
modes: str = get_config_value("OUTPUT_MODES", "")
1413+
return [m.strip().lower() for m in modes.split(",") if m.strip()]
1414+
1415+
1416+
@lru_cache
1417+
def get_rest_output_url() -> str:
1418+
"""Get the REST endpoint URL for output dispatch.
1419+
1420+
Returns:
1421+
str: Fully qualified REST API URL.
1422+
1423+
"""
1424+
return get_config_value("REST_OUTPUT_URL")
1425+
1426+
1427+
@lru_cache
1428+
def get_s3_output_bucket() -> str:
1429+
"""Get the name of the S3 bucket used for output dispatch.
1430+
1431+
Returns:
1432+
str: S3 bucket name.
1433+
1434+
"""
1435+
return get_config_value("S3_OUTPUT_BUCKET")
1436+
1437+
1438+
@lru_cache
1439+
def get_s3_output_prefix() -> str:
1440+
"""Get the object key prefix used for S3 output files.
1441+
1442+
Returns:
1443+
str: S3 key prefix (can be empty).
1444+
1445+
"""
1446+
return get_config_value("S3_OUTPUT_PREFIX", "")
1447+
1448+
1449+
@lru_cache
1450+
def get_database_output_url() -> str:
1451+
"""Get the SQLAlchemy-compatible connection string for output database.
1452+
1453+
Returns:
1454+
str: Database URL (e.g., postgresql://user:pass@host/db).
1455+
1456+
"""
1457+
return get_config_value("DATABASE_OUTPUT_URL")
1458+
1459+
1460+
@lru_cache
1461+
def get_database_insert_sql() -> str:
1462+
"""Get the raw SQL INSERT statement template for database output.
1463+
1464+
Returns:
1465+
str: SQL insert statement.
1466+
1467+
"""
1468+
return get_config_value("DATABASE_INSERT_SQL")

src/app/main.py

Lines changed: 63 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,89 @@
11
"""Main entry point for the service.
22
3-
This script initializes logging, loads the queue consumer, and begins
4-
consuming data using the configured processing callback.
3+
Initializes logging, sets up metrics, validates configuration, and
4+
starts consuming messages using the configured output handler.
55
"""
66

77
import os
88
import sys
9-
10-
# Add 'src/' to Python's module search path
11-
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
9+
import traceback
1210

1311
from app import config_shared
14-
from app.output_handler import send_to_output
12+
from app.output_handler import output_handler
1513
from app.queue_handler import consume_messages
14+
from app.utils.metrics_server import start_metrics_server
1615
from app.utils.setup_logger import setup_logger
1716

18-
# Initialize the module-level logger with optional structured logging
17+
# Add 'src/' to Python's module search path
18+
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
19+
20+
# Initialize structured or plain logger
1921
logger = setup_logger(
2022
__name__,
2123
structured=config_shared.get_config_bool("STRUCTURED_LOGGING", False),
2224
)
2325

26+
REDACT_LOGS = config_shared.get_config_bool("REDACT_SENSITIVE_LOGS", True)
27+
28+
29+
def redact(value: str) -> str:
30+
"""Redact sensitive values from logs if redaction is enabled.
31+
32+
Args:
33+
value (str): The value to redact.
34+
35+
Returns:
36+
str: Redacted or original string.
37+
38+
"""
39+
return "[REDACTED]" if REDACT_LOGS else value
40+
41+
42+
def validate_output_config() -> None:
43+
"""Validate that required config variables are present for selected output modes.
44+
45+
Logs sanitized values for REST, S3, and database outputs to help with troubleshooting.
46+
"""
47+
output_modes = config_shared.get_output_modes()
48+
logger.info("📤 Output modes enabled: %s", output_modes)
49+
50+
if "rest" in output_modes:
51+
url = config_shared.get_rest_output_url()
52+
logger.info("🌐 REST output URL: %s", redact(url))
53+
54+
if "s3" in output_modes:
55+
bucket = config_shared.get_s3_output_bucket()
56+
prefix = config_shared.get_s3_output_prefix()
57+
logger.info("🪣 S3 bucket: %s, prefix: %s", redact(bucket), redact(prefix))
58+
59+
if "database" in output_modes:
60+
db_url = config_shared.get_database_output_url()
61+
insert_sql = config_shared.get_database_insert_sql()
62+
logger.info("🗄️ DB URL: %s", redact(db_url))
63+
logger.debug("📝 Insert SQL: %s", redact(insert_sql))
64+
2465

2566
def main() -> None:
26-
"""Start the data processing service."""
67+
"""Start the data processing service.
68+
69+
This function performs startup tasks and begins consuming messages
70+
from the configured queue using the output handler.
71+
"""
2772
logger.info("🚀 Starting processing service...")
28-
consume_messages(send_to_output)
73+
74+
start_metrics_server()
75+
validate_output_config()
76+
77+
logger.info(
78+
"✅ Ready. Listening for messages on queue type: %s", config_shared.get_queue_type()
79+
)
80+
consume_messages(output_handler.send)
2981

3082

3183
if __name__ == "__main__":
3284
try:
3385
main()
3486
except Exception as e:
35-
logger.exception("❌ Unhandled exception in main: %s", e)
87+
logger.exception("❌ Unhandled exception: %s", redact(str(e)))
88+
traceback.print_exc()
3689
sys.exit(1)

src/app/utils/metrics_server.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@
1010
"""
1111

1212
import os
13+
1314
from prometheus_client import start_http_server
1415

1516

1617
def start_metrics_server() -> None:
17-
"""
18-
Conditionally start the Prometheus metrics HTTP server.
18+
"""Conditionally start the Prometheus metrics HTTP server.
1919
2020
This starts a simple HTTP server to expose metrics from the global
2121
Prometheus client registry on the given port. If the environment
@@ -31,6 +31,7 @@ def start_metrics_server() -> None:
3131
3232
Example:
3333
start_metrics_server() # Will start if METRICS_ENABLED is true
34+
3435
"""
3536
enabled = os.getenv("METRICS_ENABLED", "true").lower()
3637
if enabled not in ("1", "true", "yes"):

src/app/utils/setup_logger.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import logging
22
import sys
33
from logging import Logger
4-
from typing import Optional
54

65

76
def setup_logger(

0 commit comments

Comments
 (0)