Skip to content

Commit e4f744c

Browse files
johnewartgalvanagreptile-apps[bot]
authored
ENG-1948 - Celery healthcheck HTTP endpoint (#7091)
Co-authored-by: Adrian Galvan <adrian@ethyca.com> Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
1 parent ab491ec commit e4f744c

File tree

11 files changed

+322
-65
lines changed

11 files changed

+322
-65
lines changed

changelog/7091.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# Copy this file and rename it (e.g., pr-number.yaml or feature-name.yaml)
2+
# Fill in the required fields and delete this comment block
3+
4+
type: Added # One of: Added, Changed, Developer Experience, Deprecated, Docs, Fixed, Removed, Security
5+
description: Celery workers now have an HTTP healthcheck endpoint that can be used to check if the workers are running for environments that do not support running a command to check if the workers are running.
6+
pr: 7091 # PR number
7+
labels: [] # Optional: ["high-risk", "db-migration"]

docker-compose.yml

Lines changed: 11 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,11 @@ services:
140140
FIDES__CONFIG_PATH: ${FIDES__CONFIG_PATH:-/fides/.fides/fides.toml}
141141
FIDES__LOGGING__COLORIZE: "True"
142142
FIDES__USER__ANALYTICS_OPT_OUT: "True"
143+
# The default HTTP health check port is 9000, override it here to ensure that
144+
# the override works as expected.
145+
FIDES__CELERY__HEALTHCHECK_PORT: "9001"
146+
expose:
147+
- 9001
143148
volumes:
144149
- type: bind
145150
source: ./
@@ -148,52 +153,16 @@ services:
148153
- /fides/src/fides.egg-info
149154

150155
worker-privacy-preferences:
151-
image: ethyca/fides:local${IMAGE_SUFFIX:-}
156+
extends:
157+
service: worker-other
152158
command: fides worker --queues=fides.privacy_preferences,fides.privacy_request_exports,fides.privacy_request_ingestion
153-
depends_on:
154-
redis:
155-
condition: service_started
156-
restart: always
157-
healthcheck:
158-
test: ["CMD", "/opt/fides/bin/python", "-m", "celery", "-A", "fides.api.tasks", "inspect", "ping"]
159-
start_period: 60s
160-
interval: 20s
161-
timeout: 20s
162-
retries: 10
163-
environment:
164-
FIDES__CONFIG_PATH: ${FIDES__CONFIG_PATH:-/fides/.fides/fides.toml}
165-
FIDES__LOGGING__COLORIZE: "True"
166-
FIDES__USER__ANALYTICS_OPT_OUT: "True"
167-
volumes:
168-
- type: bind
169-
source: ./
170-
target: /fides
171-
read_only: False
172-
- /fides/src/fides.egg-info
173159

174160
worker-dsr:
175-
image: ethyca/fides:local${IMAGE_SUFFIX:-}
176-
command: fides worker --queues=fides.dsr
177-
depends_on:
178-
redis:
179-
condition: service_started
180-
restart: always
161+
extends:
162+
service: worker-other
181163
healthcheck:
182-
test: ["CMD", "/opt/fides/bin/python", "-m", "celery", "-A", "fides.api.tasks", "inspect", "ping"]
183-
start_period: 60s
184-
interval: 20s
185-
timeout: 20s
186-
retries: 10
187-
environment:
188-
FIDES__CONFIG_PATH: ${FIDES__CONFIG_PATH:-/fides/.fides/fides.toml}
189-
FIDES__LOGGING__COLORIZE: "True"
190-
FIDES__USER__ANALYTICS_OPT_OUT: "True"
191-
volumes:
192-
- type: bind
193-
source: ./
194-
target: /fides
195-
read_only: False
196-
- /fides/src/fides.egg-info
164+
test: [ "CMD", "curl", "-f", "http://localhost:9001/"]
165+
command: fides worker --queues=fides.dsr
197166

198167
redis:
199168
image: "redis:8.0-alpine"

qa/scenarios/manual_task_with_conditional_dependencies.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -673,7 +673,9 @@ def _create_conditional_dependencies(self) -> bool:
673673
dependency_data = create_dependency_data(self.manual_task.id)
674674
dep = ManualTaskConditionalDependency.create(db=db, data=dependency_data)
675675
self.conditional_dependencies.append(dep)
676-
self.info(f"Created conditional dependency with full condition tree: {dep.id}")
676+
self.info(
677+
f"Created conditional dependency with full condition tree: {dep.id}"
678+
)
677679

678680
self.success("Created ManualTaskConditionalDependency with condition tree")
679681
return True

qa/scenarios/sql_translator_demo.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -473,11 +473,15 @@ def _create_manual_task_config(self) -> bool:
473473
"""Create Manual Task configuration."""
474474
try:
475475
# Check if config already exists for this task
476-
existing_config = self.db.query(ManualTaskConfig).filter_by(
477-
task_id=self.manual_task.id,
478-
config_type=ActionType.access,
479-
is_current=True
480-
).first()
476+
existing_config = (
477+
self.db.query(ManualTaskConfig)
478+
.filter_by(
479+
task_id=self.manual_task.id,
480+
config_type=ActionType.access,
481+
is_current=True,
482+
)
483+
.first()
484+
)
481485

482486
if existing_config:
483487
self.info(

src/fides/api/db/ctl_session.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ async def prewarmed_async_readonly_session() -> AsyncGenerator[Any, Any]:
121121
)
122122
ASYNC_READONLY_POOL_WARMED = True
123123

124-
session = readonly_async_session_factory()
124+
session = readonly_async_session_factory()
125125

126126
try:
127127
yield session

src/fides/api/tasks/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
)
1515

1616
from fides.api.db.session import get_db_engine, get_db_session
17+
from fides.api.tasks import celery_healthcheck
1718
from fides.api.util.logger import setup as setup_logging
1819
from fides.config import CONFIG, FidesConfig
1920

@@ -102,6 +103,7 @@ def _create_celery(config: FidesConfig = CONFIG) -> Celery:
102103
)
103104

104105
app = Celery(__name__)
106+
celery_healthcheck.register(app) # type: ignore
105107

106108
celery_config: Dict[str, Any] = {
107109
# Defaults for the celery config
@@ -112,6 +114,8 @@ def _create_celery(config: FidesConfig = CONFIG) -> Celery:
112114
# Ops requires this to route emails to separate queues
113115
"task_create_missing_queues": True,
114116
"task_default_queue": "fides",
117+
"healthcheck_port": config.celery.healthcheck_port,
118+
"healthcheck_ping_timeout": config.celery.healthcheck_ping_timeout,
115119
}
116120

117121
celery_config.update(config.celery)
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# fmt: off
2+
# type: ignore
3+
# pylint: skip-file
4+
# isort:off
5+
6+
7+
from .server import HealthCheckServer
8+
9+
10+
def register(celery_app):
11+
celery_app.steps["worker"].add(HealthCheckServer)
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
import json
2+
import socket
3+
import threading
4+
from http.server import HTTPServer, SimpleHTTPRequestHandler
5+
from typing import Any, Optional
6+
7+
from celery import bootsteps
8+
from celery.worker import WorkController
9+
from loguru import logger
10+
11+
HEALTHCHECK_DEFAULT_PORT = 9000
12+
HEALTHCHECK_DEFAULT_PING_TIMEOUT = 2.0
13+
HEALTHCHECK_DEFAULT_HTTP_SERVER_SHUTDOWN_TIMEOUT = 2.0
14+
15+
16+
class HealthcheckHandler(SimpleHTTPRequestHandler):
17+
"""HTTP request handler with additional properties and functions"""
18+
19+
def __init__(
20+
self, parent: WorkController, healthcheck_ping_timeout: float, *args: Any
21+
):
22+
self.parent = parent
23+
self.healthcheck_ping_timeout = healthcheck_ping_timeout
24+
super().__init__(*args)
25+
26+
def log_message(self, format: str, *args: Any) -> None:
27+
"""
28+
Override to suppress default HTTP server logging to stderr.
29+
The default implementation writes to stderr which can cause
30+
contention and deadlocks in test environments, especially with
31+
pytest's output capturing and parallel test execution.
32+
We use loguru for structured logging instead at the debug level.
33+
"""
34+
logger.debug(f"Healthcheck: {self.address_string()} - {format % args}")
35+
36+
def do_GET(self) -> None:
37+
"""Handle GET requests"""
38+
try:
39+
try:
40+
parent = self.parent
41+
insp = parent.app.control.inspect(
42+
destination=[parent.hostname], timeout=self.healthcheck_ping_timeout
43+
)
44+
result = insp.ping()
45+
46+
data = json.dumps({"status": "ok", "data": result})
47+
logger.debug(f"Healthcheck ping result: {data}")
48+
49+
self.send_response(200)
50+
self.send_header("Content-type", "application/json")
51+
self.end_headers()
52+
self.wfile.write(bytes(data, "utf-8"))
53+
except Exception as e:
54+
logger.warning(f"Healthcheck ping exception: {e}")
55+
response = {"status": "error", "data": str(e)}
56+
self.send_response(503)
57+
self.send_header("Content-type", "application/json")
58+
self.end_headers()
59+
self.wfile.write(bytes(json.dumps(response), "utf-8"))
60+
except Exception as ex:
61+
logger.exception("HealthcheckHandler exception", exc_info=ex)
62+
self.send_response(500)
63+
64+
65+
class HealthCheckServer(bootsteps.StartStopStep):
66+
# ignore kwargs type
67+
def __init__(self, parent: WorkController, **kwargs): # type: ignore [arg-type, no-untyped-def]
68+
self.thread: Optional[threading.Thread] = None
69+
self.http_server: Optional[HTTPServer] = None
70+
71+
self.parent = parent
72+
73+
# config
74+
self.healthcheck_port = int(
75+
getattr(parent.app.conf, "healthcheck_port", HEALTHCHECK_DEFAULT_PORT)
76+
)
77+
self.healthcheck_ping_timeout = float(
78+
getattr(
79+
parent.app.conf,
80+
"healthcheck_ping_timeout",
81+
HEALTHCHECK_DEFAULT_PING_TIMEOUT,
82+
)
83+
)
84+
self.shutdown_timeout = float(
85+
getattr(
86+
parent.app.conf,
87+
"shutdown_timeout",
88+
HEALTHCHECK_DEFAULT_HTTP_SERVER_SHUTDOWN_TIMEOUT,
89+
)
90+
)
91+
92+
super().__init__(parent, **kwargs)
93+
94+
# The mypy hints for an HTTP handler are strange, so ignoring them here
95+
def http_handler(self, *args) -> None: # type: ignore [arg-type, no-untyped-def]
96+
HealthcheckHandler(self.parent, self.healthcheck_ping_timeout, *args)
97+
98+
def start(self, parent: WorkController) -> None:
99+
# Ignore mypy hints here as the constructed object immediately handles the request
100+
# (if you look in the source code for SimpleHTTPRequestHandler, specifically the finalize request method)
101+
self.http_server = HTTPServer(
102+
("0.0.0.0", self.healthcheck_port),
103+
self.http_handler, # type: ignore [arg-type]
104+
)
105+
106+
# Enable socket reuse to prevent port conflicts during rapid test cycling
107+
# This is especially important for session-scoped test workers
108+
self.http_server.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
109+
110+
# Set a socket timeout to prevent indefinite blocking on requests
111+
self.http_server.timeout = 5.0
112+
113+
self.thread = threading.Thread(
114+
target=self.http_server.serve_forever, daemon=True
115+
)
116+
self.thread.start()
117+
logger.info(f"Health check server started on port {self.healthcheck_port}")
118+
119+
def stop(self, parent: WorkController) -> None:
120+
if self.http_server is None:
121+
logger.warning(
122+
"Requested stop of HTTP healthcheck server, but no server was started"
123+
)
124+
else:
125+
logger.info(
126+
f"Stopping health check server with a timeout of {self.shutdown_timeout} seconds"
127+
)
128+
try:
129+
# Call shutdown - this should be safe from any thread
130+
# It will cause serve_forever() to return after handling any current request
131+
self.http_server.shutdown()
132+
except Exception as e:
133+
logger.warning(f"Error during HTTP server shutdown: {e}")
134+
135+
# Wait for the thread to finish with a timeout
136+
if self.thread is None:
137+
logger.warning("No thread in HTTP healthcheck server to shutdown...")
138+
else:
139+
self.thread.join(self.shutdown_timeout)
140+
if self.thread.is_alive():
141+
logger.warning(
142+
f"Healthcheck thread still alive after {self.shutdown_timeout}s timeout. "
143+
"It will continue running as a daemon thread."
144+
)
145+
else:
146+
logger.info(
147+
f"Health check server stopped cleanly on port {self.healthcheck_port}"
148+
)

src/fides/config/celery_settings.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@ class CelerySettings(FidesSettings):
2727
description="If true, tasks are executed locally instead of being sent to the queue. "
2828
"If False, tasks are sent to the queue.",
2929
)
30+
healthcheck_port: int = Field(
31+
default=9000, description="The port to use for the health check endpoint"
32+
)
33+
healthcheck_ping_timeout: float = Field(
34+
default=2.0, description="The timeout in seconds for the health check ping"
35+
)
3036
model_config = SettingsConfigDict(env_prefix=ENV_PREFIX)
3137

3238

0 commit comments

Comments
 (0)