Skip to content

Commit f12e76d

Browse files
authored
feat: Add configurable health check endpoint to online ingestor (#207)
1 parent d3a6db5 commit f12e76d

15 files changed

+848
-6
lines changed

.github/workflows/integration.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,6 @@ jobs:
3535
- run: scicat_validate_ingestor_config tests/integration/config.test.yml
3636
# Run the ingestor tests
3737
- run: python tests/_scicat_ingestor.py --config tests/integration/config.test.yml --data-dir test-data
38+
# Run the health check tests
39+
- run: python tests/_scicat_healthcheck.py --config tests/integration/config.test.yml
3840
- run: docker compose -f tests/docker-compose.yml down

documentation/docs/user-guide/configuration.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,9 @@ This section is specific for the offline ingestor and contains all the options t
133133
Available options:
134134
- absolute: use absolute paths
135135
- ...
136+
- data_directory: _string_
137+
This option provides the full path to the base data directory where all the data files are located. It is used for testing the mount points.
138+
Example: "/ess/data/"
136139

137140
## Kafka (kafka)
138141

@@ -214,13 +217,24 @@ This section is used by both programs and it specify how and where to send the l
214217
It is suggested to pick a meaningful string that can be used when selecting entries in graylog interface and queries
215218
Example: "scicat_ingestor"
216219

220+
## Health Check (health_check)
221+
222+
The online ingestor exposes a small HTTP endpoint that external monitors can query to verify Kafka, storage, and SciCat connectivity. This section controls where that endpoint listens.
223+
224+
- host: _string_
225+
IP address or hostname the embedded HTTP server should bind to. Use `0.0.0.0` (default) to listen on every interface, or `127.0.0.1` to restrict access to the local machine.
226+
- port: _integer_
227+
TCP port for the health endpoint. The default is `8080`, but you can change it to match your infrastructure or to avoid collisions with other services.
228+
217229
## SciCat (scicat)
218230

219231
This section is used by the offline ingestor and it contains the info about url where the relevant scicat instance is reachable and also the token that needs to be used for authentication purposes.
220232

221233
- host: _valid url as string_
222234
URL of the SciCat instance of reference where we want to create the dataset records.
223235
Example: "https://scicat.ess.eu/api/v3",
236+
- health_endpoint: _string_
237+
Relative path (e.g. `health` or `api/v3/health`) or full URL that the health server should query to verify SciCat availability. Defaults to `health` which is appended to the configured host.
224238
- token: _string_
225239
Valid JWT token used to connect to SciCat with the proper permissions to query and create datasets.
226240
- timeout: _integer_ or _null_

resources/config.sample.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ ingestion:
3131
message_to_file: true
3232
message_file_extension: message.json
3333
file_path_type: relative
34+
data_directory: ''
3435
data_file_open_max_tries: 3
3536
data_file_open_retry_delay: []
3637
kafka:
@@ -65,8 +66,12 @@ scicat:
6566
timeout: 0
6667
stream: true
6768
verify: false
69+
health_endpoint: health
6870
api_endpoints:
6971
datasets: datasets
7072
proposals: proposals
7173
origdatablocks: origdatablocks
7274
instruments: instruments
75+
health_check:
76+
host: 0.0.0.0
77+
port: 8080

src/scicat_configuration.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,12 @@ class LoggingOptions:
220220
graylog_facility: str = "scicat.ingestor"
221221

222222

223+
@dataclass(kw_only=True)
224+
class HealthCheckOptions:
225+
host: str = "0.0.0.0" # noqa: S104
226+
port: int = 8080
227+
228+
223229
@dataclass(kw_only=True)
224230
class KafkaOptions:
225231
"""
@@ -264,6 +270,7 @@ class FileHandlingOptions:
264270
message_to_file: bool = True
265271
message_file_extension: str = "message.json"
266272
file_path_type: str = "relative" # allowed values: absolute and relative
273+
data_directory: str = ""
267274
data_file_open_max_tries: int = 3
268275
"""How many times to try opening a data file before giving up."""
269276
data_file_open_retry_delay: list[int] = field(default_factory=lambda: [])
@@ -338,6 +345,7 @@ class SciCatOptions:
338345
timeout: int = 0
339346
stream: bool = True
340347
verify: bool = False
348+
health_endpoint: str = "health"
341349
api_endpoints: ScicatEndpoints = field(default_factory=ScicatEndpoints)
342350

343351
@property
@@ -363,6 +371,15 @@ def host_address(self) -> str:
363371
"""Return the host address ready to be used."""
364372
return self.host.removesuffix('/') + "/"
365373

374+
@property
375+
def health_url(self) -> str:
376+
"""Return the health-check URL, allowing either relative or absolute values."""
377+
378+
endpoint = self.health_endpoint
379+
if endpoint.startswith(("http://", "https://")):
380+
return endpoint
381+
return urljoin(self.host_address, endpoint.lstrip("/"))
382+
366383

367384
@dataclass(kw_only=True)
368385
class OnlineIngestorConfig:
@@ -378,6 +395,7 @@ class OnlineIngestorConfig:
378395
kafka: KafkaOptions = field(default_factory=KafkaOptions)
379396
logging: LoggingOptions = field(default_factory=LoggingOptions)
380397
scicat: SciCatOptions = field(default_factory=SciCatOptions)
398+
health_check: HealthCheckOptions = field(default_factory=HealthCheckOptions)
381399

382400
def to_dict(self) -> dict:
383401
"""Return the configuration as a dictionary."""

src/scicat_health_check.py

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
# SPDX-License-Identifier: BSD-3-Clause
2+
# Copyright (c) 2024 Scicatproject contributors (https://github.com/ScicatProject)
3+
"""
4+
This module contains the health check server for the online ingestor.
5+
It exposes an HTTP endpoint that checks the status of Kafka, Storage and SciCat.
6+
"""
7+
8+
import json
9+
import logging
10+
import pathlib
11+
import threading
12+
from functools import partial
13+
from http.server import BaseHTTPRequestHandler, HTTPServer
14+
from time import sleep
15+
from typing import Any
16+
17+
import requests
18+
from confluent_kafka import Consumer
19+
20+
from scicat_configuration import OnlineIngestorConfig
21+
22+
23+
class HealthCheckHandler(BaseHTTPRequestHandler):
24+
"""
25+
HTTP Handler for the health check endpoint.
26+
It checks the status of Kafka, Storage and SciCat.
27+
"""
28+
29+
def __init__(
30+
self,
31+
config: OnlineIngestorConfig,
32+
consumer: Consumer,
33+
logger: logging.Logger,
34+
*args: Any,
35+
**kwargs: Any,
36+
):
37+
self.config: OnlineIngestorConfig = config
38+
self.consumer: Consumer = consumer
39+
self.logger: logging.Logger = logger
40+
super().__init__(*args, **kwargs)
41+
42+
def do_GET(self) -> None:
43+
"""Handle GET requests."""
44+
if self.path == "/health":
45+
kafka_status = self._check_kafka()
46+
storage_status = self._check_storage()
47+
scicat_status = self._check_scicat()
48+
49+
health_status = {
50+
"kafka": kafka_status,
51+
"storage": storage_status,
52+
"scicat": scicat_status,
53+
}
54+
55+
if all(health_status.values()):
56+
self.send_response(200)
57+
else:
58+
self.send_response(503)
59+
60+
self.send_header("Content-type", "application/json")
61+
self.end_headers()
62+
self.wfile.write(json.dumps(health_status).encode("utf-8"))
63+
else:
64+
self.send_response(404)
65+
self.end_headers()
66+
67+
def _check_kafka(self) -> bool:
68+
"""Check if Kafka is reachable."""
69+
try:
70+
self.consumer.list_topics(timeout=5)
71+
return True
72+
except Exception as e:
73+
self.logger.error("Health check: Kafka connection failed: %s", e)
74+
return False
75+
76+
def _check_storage(self) -> bool:
77+
"""Check if the storage directory is accessible."""
78+
try:
79+
file_handling = self.config.ingestion.file_handling
80+
directory = file_handling.data_directory
81+
if not directory:
82+
self.logger.warning("Health check: No data_directory configured.")
83+
return False
84+
path = pathlib.Path(directory)
85+
if not path.exists():
86+
self.logger.error("Health check: Storage path does not exist: %s", path)
87+
return False
88+
89+
# Attempt to list the directory to make sure the mount is accessible.
90+
next(path.iterdir(), None)
91+
return True
92+
except Exception as e:
93+
self.logger.error("Health check: Storage access failed: %s", e)
94+
return False
95+
96+
def _check_scicat(self) -> bool:
97+
"""Check if SciCat is reachable."""
98+
try:
99+
scicat_config = self.config.scicat
100+
url = scicat_config.health_url
101+
response = requests.get(url, timeout=5)
102+
return response.status_code == 200
103+
except Exception as e:
104+
self.logger.error("Health check: SciCat connection failed: %s", e)
105+
return False
106+
107+
def log_message(self, format: str, *args: Any) -> None:
108+
pass # Disable default logging of BaseHTTPRequestHandler
109+
110+
111+
def _serve_health_server(
112+
server: HTTPServer,
113+
logger: logging.Logger,
114+
restart_delay: float = 5.0,
115+
) -> None:
116+
"""Run the HTTP server forever, restarting if it crashes."""
117+
118+
while True:
119+
try:
120+
server.serve_forever()
121+
except Exception as exc:
122+
logger.error(
123+
"Health check server stopped unexpectedly: %s. Restarting in %s seconds.",
124+
exc,
125+
restart_delay,
126+
)
127+
sleep(restart_delay)
128+
129+
130+
def start_health_server(
131+
config: OnlineIngestorConfig, consumer: Consumer, logger: logging.Logger
132+
) -> None:
133+
"""Start the health check server in a daemon thread."""
134+
handler = partial(HealthCheckHandler, config, consumer, logger)
135+
host = config.health_check.host
136+
port = config.health_check.port
137+
server = HTTPServer((host, port), handler)
138+
thread = threading.Thread(target=_serve_health_server, args=(server, logger))
139+
thread.daemon = True
140+
thread.start()
141+
logger.info("Health check server started on %s:%s", host, port)

src/scicat_offline_ingestor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ def build_offline_config(logger: logging.Logger | None = None) -> OfflineIngesto
5454
# It is because ``OfflineIngestorConfig`` shares the template config file
5555
# with ``OnlineIngestorConfig``.
5656
del merged_configuration["kafka"]
57+
del merged_configuration["health_check"]
5758

5859
config = build_dataclass(
5960
tp=OfflineIngestorConfig, data=merged_configuration, logger=logger, strict=False

src/scicat_online_ingestor.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
build_dataclass,
2424
merge_config_and_input_args,
2525
)
26+
from scicat_health_check import start_health_server
2627
from scicat_kafka import (
2728
WritingFinished,
2829
build_consumer,
@@ -125,6 +126,9 @@ def main() -> None:
125126
if (consumer := build_consumer(config.kafka, logger)) is None:
126127
raise RuntimeError("Failed to build the Kafka consumer")
127128

129+
# Start health server without affecting the online ingestor loop
130+
start_health_server(config, consumer, logger)
131+
128132
# this is the dictionary that contains the list of offline ingestor running
129133
offline_ingestors: dict = {}
130134

test-data/README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,10 @@ Run the integration tests with:
6363

6464
```bash
6565
python tests/_scicat_ingestor.py
66+
```
67+
68+
Run the stress tests with:
69+
70+
```bash
71+
python tests/_scicat_stress_test.py
6672
```

tests/_scicat_healthcheck.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
#!/usr/bin/env python3
2+
"""Standalone integration test that verifies the health endpoint only."""
3+
4+
import argparse
5+
import logging
6+
import signal
7+
import subprocess
8+
import sys
9+
import time
10+
from pathlib import Path
11+
12+
import requests
13+
14+
DEFAULT_CONFIG_FILE = Path("tests/integration/config.test.yml")
15+
HEALTH_CHECK_URL = "http://localhost:8080/health"
16+
HEALTH_CHECK_TIMEOUT = 60 # seconds
17+
HEALTH_CHECK_INTERVAL = 1 # seconds
18+
19+
20+
def parse_args() -> argparse.Namespace:
21+
parser = argparse.ArgumentParser(
22+
description="Verify the online ingestor health endpoint"
23+
)
24+
parser.add_argument(
25+
"--config",
26+
default=str(DEFAULT_CONFIG_FILE),
27+
help="Path to ingestor config file (default: %(default)s)",
28+
)
29+
return parser.parse_args()
30+
31+
32+
def wait_for_health_endpoint(
33+
url: str = HEALTH_CHECK_URL,
34+
timeout_seconds: int = HEALTH_CHECK_TIMEOUT,
35+
poll_interval: int = HEALTH_CHECK_INTERVAL,
36+
) -> bool:
37+
"""Poll the health endpoint until it returns HTTP 200 or the timeout elapses."""
38+
39+
deadline = time.time() + timeout_seconds
40+
while time.time() < deadline:
41+
try:
42+
response = requests.get(url, timeout=5)
43+
if response.status_code == 200:
44+
logging.info("Health endpoint reachable at %s", url)
45+
return True
46+
logging.warning(
47+
"Health endpoint unhealthy (status %s): %s",
48+
response.status_code,
49+
response.text,
50+
)
51+
except requests.RequestException as err:
52+
logging.warning("Health endpoint not reachable yet: %s", err)
53+
54+
time.sleep(poll_interval)
55+
56+
logging.error(
57+
"Health endpoint %s not ready after %s seconds.", url, timeout_seconds
58+
)
59+
return False
60+
61+
62+
def main() -> None:
63+
logging.basicConfig(
64+
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
65+
)
66+
args = parse_args()
67+
config_path = Path(args.config).expanduser().resolve()
68+
69+
if not config_path.exists():
70+
logging.error("Config file not found: %s", config_path)
71+
sys.exit(1)
72+
73+
logging.info("Starting Online Ingestor for health check only...")
74+
cmd = [
75+
sys.executable,
76+
"-m",
77+
"scicat_online_ingestor",
78+
"-c",
79+
str(config_path),
80+
"--logging.verbose",
81+
]
82+
process = subprocess.Popen(cmd, shell=False) # noqa: S603
83+
84+
try:
85+
time.sleep(5)
86+
if not wait_for_health_endpoint():
87+
logging.error("Health endpoint verification FAILED")
88+
sys.exit(1)
89+
90+
logging.info("Health endpoint verification PASSED")
91+
92+
finally:
93+
logging.info("Stopping ingestor...")
94+
process.send_signal(signal.SIGINT)
95+
process.wait()
96+
97+
98+
if __name__ == "__main__":
99+
main()

0 commit comments

Comments
 (0)