Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
af5161d
Add HTTP health check to celery workers; default is port 9001 but con…
johnewart Dec 3, 2025
3f84e92
Add celery-healthcheck to the codebase because upgrading the required…
johnewart Dec 4, 2025
8da5830
Sorry mypy, pylint and black
johnewart Dec 4, 2025
765b06f
fix / disable isort
johnewart Dec 4, 2025
bb8edb4
fix black error
johnewart Dec 4, 2025
1706854
Remove uvicorn in favor of plain-old Python HTTP server, add some tes…
johnewart Dec 9, 2025
6ad8c77
Merge branch 'main' into johnewart/ENG-1948
johnewart Dec 9, 2025
6af3d6d
Comment out celery_worker_parameters fixture
johnewart Dec 9, 2025
4153a68
Update tests/conftest.py
johnewart Dec 11, 2025
195868e
Update src/fides/api/tasks/celery_healthcheck/server.py
johnewart Dec 11, 2025
8da1f48
Update tests/task/test_healthcheck_server.py
johnewart Dec 11, 2025
b6a4eee
Update src/fides/api/tasks/celery_healthcheck/server.py
johnewart Dec 11, 2025
073e0b8
Clean up some variable names, add typing / linting fixes
johnewart Dec 11, 2025
b2648c6
Add note for port override
johnewart Dec 11, 2025
14ee36c
Merge branch 'main' into johnewart/ENG-1948
galvana Jan 7, 2026
e61afeb
Merge branch 'main' into johnewart/ENG-1948
johnewart Jan 9, 2026
c7216c5
Move session outside of lock, add a bit of logging on pool creation
johnewart Jan 14, 2026
f578ee3
Merge branch 'main' into johnewart/ENG-1948
johnewart Feb 4, 2026
b195f04
Minor fix for healthcheck on Celery
johnewart Feb 4, 2026
e973c40
Add changelog yaml
johnewart Feb 4, 2026
421eaa7
Merge branch 'main' into johnewart/ENG-1948
johnewart Feb 4, 2026
080146f
Fix formatting after merging main
johnewart Feb 4, 2026
c7e9531
Merge branch 'main' into johnewart/ENG-1948
galvana Feb 5, 2026
8249088
Merge branch 'main' into johnewart/ENG-1948
johnewart Feb 10, 2026
cd21e53
Formatting fixes
johnewart Feb 10, 2026
145d9d2
Merge branch 'main' into johnewart/ENG-1948
johnewart Feb 11, 2026
452d351
Add some tweaks to the http server
johnewart Feb 11, 2026
a97855b
Handle xdist better
johnewart Feb 11, 2026
690be83
Fix unit tests
johnewart Feb 11, 2026
420e084
Fix formatting
johnewart Feb 11, 2026
0ca82eb
Merge branch 'main' into johnewart/ENG-1948
johnewart Feb 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 9 additions & 42 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ services:
FIDES__CONFIG_PATH: ${FIDES__CONFIG_PATH:-/fides/.fides/fides.toml}
FIDES__LOGGING__COLORIZE: "True"
FIDES__USER__ANALYTICS_OPT_OUT: "True"
FIDES__CELERY__HEALTHCHECK_PORT: "9001"
expose:
- 9001
volumes:
- type: bind
source: ./
Expand All @@ -143,52 +146,16 @@ services:
- /fides/src/fides.egg-info

worker-privacy-preferences:
image: ethyca/fides:local
extends:
service: worker-other
command: fides worker --queues=fides.privacy_preferences,fides.privacy_request_exports,fides.privacy_request_ingestion
depends_on:
redis:
condition: service_started
restart: always
healthcheck:
test: ["CMD", "celery", "-A", "fides.api.tasks", "inspect", "ping"]
start_period: 60s
interval: 20s
timeout: 5s
retries: 10
environment:
FIDES__CONFIG_PATH: ${FIDES__CONFIG_PATH:-/fides/.fides/fides.toml}
FIDES__LOGGING__COLORIZE: "True"
FIDES__USER__ANALYTICS_OPT_OUT: "True"
volumes:
- type: bind
source: ./
target: /fides
read_only: False
- /fides/src/fides.egg-info

worker-dsr:
image: ethyca/fides:local
command: fides worker --queues=fides.dsr
depends_on:
redis:
condition: service_started
restart: always
extends:
service: worker-other
healthcheck:
test: ["CMD", "celery", "-A", "fides.api.tasks", "inspect", "ping"]
start_period: 60s
interval: 20s
timeout: 5s
retries: 10
environment:
FIDES__CONFIG_PATH: ${FIDES__CONFIG_PATH:-/fides/.fides/fides.toml}
FIDES__LOGGING__COLORIZE: "True"
FIDES__USER__ANALYTICS_OPT_OUT: "True"
volumes:
- type: bind
source: ./
target: /fides
read_only: False
- /fides/src/fides.egg-info
test: [ "CMD", "curl", "-f", "http://localhost:9001/"]
command: fides worker --queues=fides.dsr

redis:
image: "redis:8.0-alpine"
Expand Down
4 changes: 4 additions & 0 deletions src/fides/api/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
)

from fides.api.db.session import get_db_engine, get_db_session
from fides.api.tasks import celery_healthcheck
from fides.api.util.logger import setup as setup_logging
from fides.config import CONFIG, FidesConfig

Expand Down Expand Up @@ -102,6 +103,7 @@ def _create_celery(config: FidesConfig = CONFIG) -> Celery:
)

app = Celery(__name__)
celery_healthcheck.register(app) # type: ignore

celery_config: Dict[str, Any] = {
# Defaults for the celery config
Expand All @@ -112,6 +114,8 @@ def _create_celery(config: FidesConfig = CONFIG) -> Celery:
# Ops requires this to route emails to separate queues
"task_create_missing_queues": True,
"task_default_queue": "fides",
"healthcheck_port": config.celery.healthcheck_port,
"healthcheck_ping_timeout": config.celery.healthcheck_ping_timeout,
}

celery_config.update(config.celery)
Expand Down
11 changes: 11 additions & 0 deletions src/fides/api/tasks/celery_healthcheck/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# fmt: off
# type: ignore
# pylint: skip-file
# isort:off


from .server import HealthCheckServer


def register(celery_app):
celery_app.steps["worker"].add(HealthCheckServer)
94 changes: 94 additions & 0 deletions src/fides/api/tasks/celery_healthcheck/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import json
import threading
from http.server import HTTPServer, SimpleHTTPRequestHandler

from celery import bootsteps
from celery.worker import WorkController
from loguru import logger

HEALTHCHECK_DEFAULT_PORT = 9000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use a consistent value, I think 9001 would be ok. It's 9000 in some places (celery_settings.py , server.py, tests) but 9001 in docker-compose.yml

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is 9001 in the compose file explicitly to ensure that the config would override it and it would work as-expected. But we can just keep it 9000 everywhere / default.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment in the docker-compose.yml file as to why it's 9001 there (since it seems valuable to ensure that override works correctly and it's easy to do there)

HEALTHCHECK_DEFAULT_PING_TIMEOUT = 2.0
DEFAULT_SHUTDOWN_TIMEOUT = 2.0


class HealthcheckHandler(SimpleHTTPRequestHandler):
"""HTTP request handler with additional properties and functions"""

def __init__(self, parent: WorkController, healthcheck_ping_timeout: float, *args):
self.parent = parent
self.healthcheck_ping_timeout = healthcheck_ping_timeout
super().__init__(*args)

def do_GET(self):
"""Handle GET requests"""
# Do something
try:
try:
parent = self.parent
insp = parent.app.control.inspect(
destination=[parent.hostname], timeout=self.healthcheck_ping_timeout
)
result = insp.ping()

data = json.dumps({"status": "ok", "data": result})
logger.info(f"Healthcheck ping result: {data}")

self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(bytes(data, "utf-8"))
except Exception as e:
logger.warning(f"Healthcheck ping exception: {e}")
response = {"status": "error", "data": str(e)}
self.send_response(503)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(bytes(json.dumps(response), "utf-8"))
except Exception as ex:
logger.exception("HealthcheckHandler exception", exc_info=ex)
self.send_response(500)


class HealthCheckServer(bootsteps.StartStopStep):
def __init__(self, parent: WorkController, **kwargs):
self.thread = None
self.parent = parent
# config
self.healthcheck_port = int(
getattr(parent.app.conf, "healthcheck_port", HEALTHCHECK_DEFAULT_PORT)
)
self.healthcheck_ping_timeout = float(
getattr(
parent.app.conf,
"healthcheck_ping_timeout",
HEALTHCHECK_DEFAULT_PING_TIMEOUT,
)
)
self.shutdown_timeout = float(
getattr(
parent.app.conf,
"shutdown_timeout",
DEFAULT_SHUTDOWN_TIMEOUT,
)
)

def http_handler(self, *args):
HealthcheckHandler(self.parent, self.healthcheck_ping_timeout, *args)

def start(self, parent: WorkController):
self.http_server = HTTPServer(
("0.0.0.0", self.healthcheck_port), self.http_handler
)

self.thread = threading.Thread(
target=self.http_server.serve_forever, daemon=True
)
self.thread.start()

def stop(self, parent: WorkController):
logger.info(
f"Stopping health check server with a timeout of {self.shutdown_timeout} seconds"
)
self.http_server.shutdown()
self.thread.join(self.shutdown_timeout)
logger.info(f"Health check server stopped on port {self.healthcheck_port}")
6 changes: 6 additions & 0 deletions src/fides/config/celery_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ class CelerySettings(FidesSettings):
description="If true, tasks are executed locally instead of being sent to the queue. "
"If False, tasks are sent to the queue.",
)
healthcheck_port: int = Field(
default=9000, description="The port to use for the health check endpoint"
)
healthcheck_ping_timeout: float = Field(
default=2.0, description="The timeout in seconds for the health check ping"
)
model_config = SettingsConfigDict(env_prefix=ENV_PREFIX)


Expand Down
51 changes: 48 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
from fides.api.schemas.messaging.messaging import MessagingServiceType
from fides.api.schemas.privacy_request import PrivacyRequestStatus
from fides.api.task.graph_runners import access_runner, consent_runner, erasure_runner
from fides.api.tasks import celery_app
from fides.api.tasks import celery_app, celery_healthcheck
from fides.api.tasks.scheduled.scheduler import async_scheduler, scheduler
from fides.api.util.cache import get_cache
from fides.api.util.collection_util import Row
Expand Down Expand Up @@ -765,6 +765,49 @@ def celery_enable_logging():
return True


# Register health check for workers
@pytest.fixture(scope="session")
def celery_session_app(celery_session_app):
celery_healthcheck.register(celery_session_app)
return celery_session_app

# This is here because the test suite occasionally fails to teardown the
# Celery worker if it takes too long to terminate the worker thread. This
# will prevent that and, instead, log a warning
@pytest.fixture(scope="session")
def celery_session_worker(
request,
celery_session_app,
celery_includes,
celery_class_tasks,
celery_worker_pool,
celery_worker_parameters,
):
from celery.contrib.testing import worker

for module in celery_includes:
celery_session_app.loader.import_task_module(module)
for class_task in celery_class_tasks:
celery_session_app.register_task(class_task)

try:

logger.info("Starting safe celery session worker...")
with worker.start_worker(
celery_session_app,
pool=celery_worker_pool,
shutdown_timeout=2.0,
**celery_worker_parameters,
) as w:
try:
yield w
logger.info("Done with celery worker, trying to dispose of it..")
except RuntimeError:
logger.warning("Failed to dispose of the celery worker.")
except RuntimeError as re:
logger.warning("Failed to stop the celery worker: " + str(re))


@pytest.fixture(autouse=True, scope="session")
def celery_use_virtual_worker(celery_session_worker):
"""
Expand Down Expand Up @@ -869,7 +912,8 @@ def access_runner_tester(
connection_configs,
identity,
session,
privacy_request_proceed=False, # This allows the DSR 3.0 Access Runner to be tested in isolation, to just test running the access graph without queuing the privacy request
privacy_request_proceed=False,
# This allows the DSR 3.0 Access Runner to be tested in isolation, to just test running the access graph without queuing the privacy request
)
except PrivacyRequestExit:
# DSR 3.0 intentionally raises a PrivacyRequestExit status while it waits for
Expand Down Expand Up @@ -927,7 +971,8 @@ def consent_runner_tester(
connection_configs,
identity,
session,
privacy_request_proceed=False, # This allows the DSR 3.0 Consent Runner to be tested in isolation, to just test running the consent graph without queuing the privacy request
privacy_request_proceed=False,
# This allows the DSR 3.0 Consent Runner to be tested in isolation, to just test running the consent graph without queuing the privacy request
)
except PrivacyRequestExit:
# DSR 3.0 intentionally raises a PrivacyRequestExit status while it waits for
Expand Down
36 changes: 36 additions & 0 deletions tests/task/test_healthcheck_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import time

import pytest
import requests
from loguru import logger


class TestCeleryHealthCheckServer:
def test_responds_to_ping_properly(self, celery_session_app, celery_session_worker):
try:
response = requests.get("http://127.0.0.1:9000/")
assert response.status_code == 200
assert response.json()["status"] == "ok"
except requests.exceptions.ConnectionError:
pytest.fail("Connection error")


class TestCeleryHealthCheckWorker:
@pytest.fixture(autouse=True)
def setup_teardown(self):
yield
with pytest.raises(Exception):
requests.get("http://127.0.0.1:9000/", timeout=1)

def test_shutdown_gracefully(self, celery_session_app, celery_session_worker):
try:
logger.info("Shutdown gracefully")
celery_session_worker.stop()
logger.info("Shutdown gracefully finished")
except Exception:
pytest.fail("Failed to stop health check server")