Skip to content
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
af5161d
Add HTTP health check to celery workers; default is port 9001 but con…
johnewart Dec 3, 2025
3f84e92
Add celery-healthcheck to the codebase because upgrading the required…
johnewart Dec 4, 2025
8da5830
Sorry mypy, pylint and black
johnewart Dec 4, 2025
765b06f
fix / disable isort
johnewart Dec 4, 2025
bb8edb4
fix black error
johnewart Dec 4, 2025
1706854
Remove uvicorn in favor of plain-old Python HTTP server, add some tes…
johnewart Dec 9, 2025
6ad8c77
Merge branch 'main' into johnewart/ENG-1948
johnewart Dec 9, 2025
6af3d6d
Comment out celery_worker_parameters fixture
johnewart Dec 9, 2025
4153a68
Update tests/conftest.py
johnewart Dec 11, 2025
195868e
Update src/fides/api/tasks/celery_healthcheck/server.py
johnewart Dec 11, 2025
8da1f48
Update tests/task/test_healthcheck_server.py
johnewart Dec 11, 2025
b6a4eee
Update src/fides/api/tasks/celery_healthcheck/server.py
johnewart Dec 11, 2025
073e0b8
Clean up some variable names, add typing / linting fixes
johnewart Dec 11, 2025
b2648c6
Add note for port override
johnewart Dec 11, 2025
14ee36c
Merge branch 'main' into johnewart/ENG-1948
galvana Jan 7, 2026
e61afeb
Merge branch 'main' into johnewart/ENG-1948
johnewart Jan 9, 2026
c7216c5
Move session outside of lock, add a bit of logging on pool creation
johnewart Jan 14, 2026
f578ee3
Merge branch 'main' into johnewart/ENG-1948
johnewart Feb 4, 2026
b195f04
Minor fix for healthcheck on Celery
johnewart Feb 4, 2026
e973c40
Add changelog yaml
johnewart Feb 4, 2026
421eaa7
Merge branch 'main' into johnewart/ENG-1948
johnewart Feb 4, 2026
080146f
Fix formatting after merging main
johnewart Feb 4, 2026
c7e9531
Merge branch 'main' into johnewart/ENG-1948
galvana Feb 5, 2026
8249088
Merge branch 'main' into johnewart/ENG-1948
johnewart Feb 10, 2026
cd21e53
Formatting fixes
johnewart Feb 10, 2026
145d9d2
Merge branch 'main' into johnewart/ENG-1948
johnewart Feb 11, 2026
452d351
Add some tweaks to the http server
johnewart Feb 11, 2026
a97855b
Handle xdist better
johnewart Feb 11, 2026
690be83
Fix unit tests
johnewart Feb 11, 2026
420e084
Fix formatting
johnewart Feb 11, 2026
0ca82eb
Merge branch 'main' into johnewart/ENG-1948
johnewart Feb 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions changelog/7091.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Copy this file and rename it (e.g., pr-number.yaml or feature-name.yaml)
# Fill in the required fields and delete this comment block

type: Added # One of: Added, Changed, Developer Experience, Deprecated, Docs, Fixed, Removed, Security
description: Celery workers now have an HTTP healthcheck endpoint that can be used to check if the workers are running for environments that do not support running a command to check if the workers are running.
pr: 7091 # PR number
labels: [] # Optional: ["high-risk", "db-migration"]
53 changes: 11 additions & 42 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ services:
FIDES__CONFIG_PATH: ${FIDES__CONFIG_PATH:-/fides/.fides/fides.toml}
FIDES__LOGGING__COLORIZE: "True"
FIDES__USER__ANALYTICS_OPT_OUT: "True"
# The default HTTP health check port is 9000, override it here to ensure that
# the override works as expected.
FIDES__CELERY__HEALTHCHECK_PORT: "9001"
expose:
- 9001
volumes:
- type: bind
source: ./
Expand All @@ -143,52 +148,16 @@ services:
- /fides/src/fides.egg-info

worker-privacy-preferences:
image: ethyca/fides:local
extends:
service: worker-other
command: fides worker --queues=fides.privacy_preferences,fides.privacy_request_exports,fides.privacy_request_ingestion
depends_on:
redis:
condition: service_started
restart: always
healthcheck:
test: ["CMD", "celery", "-A", "fides.api.tasks", "inspect", "ping"]
start_period: 60s
interval: 20s
timeout: 5s
retries: 10
environment:
FIDES__CONFIG_PATH: ${FIDES__CONFIG_PATH:-/fides/.fides/fides.toml}
FIDES__LOGGING__COLORIZE: "True"
FIDES__USER__ANALYTICS_OPT_OUT: "True"
volumes:
- type: bind
source: ./
target: /fides
read_only: False
- /fides/src/fides.egg-info

worker-dsr:
image: ethyca/fides:local
command: fides worker --queues=fides.dsr
depends_on:
redis:
condition: service_started
restart: always
extends:
service: worker-other
healthcheck:
test: ["CMD", "celery", "-A", "fides.api.tasks", "inspect", "ping"]
start_period: 60s
interval: 20s
timeout: 5s
retries: 10
environment:
FIDES__CONFIG_PATH: ${FIDES__CONFIG_PATH:-/fides/.fides/fides.toml}
FIDES__LOGGING__COLORIZE: "True"
FIDES__USER__ANALYTICS_OPT_OUT: "True"
volumes:
- type: bind
source: ./
target: /fides
read_only: False
- /fides/src/fides.egg-info
test: [ "CMD", "curl", "-f", "http://localhost:9001/"]
command: fides worker --queues=fides.dsr

redis:
image: "redis:8.0-alpine"
Expand Down
2 changes: 1 addition & 1 deletion src/fides/api/db/ctl_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ async def prewarmed_async_readonly_session() -> AsyncGenerator[Any, Any]:
)
ASYNC_READONLY_POOL_WARMED = True

session = readonly_async_session_factory()
session = readonly_async_session_factory()

try:
yield session
Expand Down
4 changes: 4 additions & 0 deletions src/fides/api/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
)

from fides.api.db.session import get_db_engine, get_db_session
from fides.api.tasks import celery_healthcheck
from fides.api.util.logger import setup as setup_logging
from fides.config import CONFIG, FidesConfig

Expand Down Expand Up @@ -102,6 +103,7 @@ def _create_celery(config: FidesConfig = CONFIG) -> Celery:
)

app = Celery(__name__)
celery_healthcheck.register(app) # type: ignore

celery_config: Dict[str, Any] = {
# Defaults for the celery config
Expand All @@ -112,6 +114,8 @@ def _create_celery(config: FidesConfig = CONFIG) -> Celery:
# Ops requires this to route emails to separate queues
"task_create_missing_queues": True,
"task_default_queue": "fides",
"healthcheck_port": config.celery.healthcheck_port,
"healthcheck_ping_timeout": config.celery.healthcheck_ping_timeout,
}

celery_config.update(config.celery)
Expand Down
11 changes: 11 additions & 0 deletions src/fides/api/tasks/celery_healthcheck/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# fmt: off
# type: ignore
# pylint: skip-file
# isort:off


from .server import HealthCheckServer


def register(celery_app):
celery_app.steps["worker"].add(HealthCheckServer)
116 changes: 116 additions & 0 deletions src/fides/api/tasks/celery_healthcheck/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import json
import threading
from http.server import HTTPServer, SimpleHTTPRequestHandler
from typing import Any, Optional

from celery import bootsteps
from celery.worker import WorkController
from loguru import logger

HEALTHCHECK_DEFAULT_PORT = 9000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use a consistent value, I think 9001 would be ok. It's 9000 in some places (celery_settings.py , server.py, tests) but 9001 in docker-compose.yml

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is 9001 in the compose file explicitly to ensure that the config would override it and it would work as-expected. But we can just keep it 9000 everywhere / default.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment in the docker-compose.yml file as to why it's 9001 there (since it seems valuable to ensure that override works correctly and it's easy to do there)

HEALTHCHECK_DEFAULT_PING_TIMEOUT = 2.0
HEALTHCHECK_DEFAULT_HTTP_SERVER_SHUTDOWN_TIMEOUT = 2.0


class HealthcheckHandler(SimpleHTTPRequestHandler):
"""HTTP request handler with additional properties and functions"""

def __init__(
self, parent: WorkController, healthcheck_ping_timeout: float, *args: Any
):
self.parent = parent
self.healthcheck_ping_timeout = healthcheck_ping_timeout
super().__init__(*args)

def do_GET(self) -> None:
"""Handle GET requests"""
try:
try:
parent = self.parent
insp = parent.app.control.inspect(
destination=[parent.hostname], timeout=self.healthcheck_ping_timeout
)
result = insp.ping()

data = json.dumps({"status": "ok", "data": result})
logger.debug(f"Healthcheck ping result: {data}")

self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(bytes(data, "utf-8"))
except Exception as e:
logger.warning(f"Healthcheck ping exception: {e}")
response = {"status": "error", "data": str(e)}
self.send_response(503)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(bytes(json.dumps(response), "utf-8"))
except Exception as ex:
logger.exception("HealthcheckHandler exception", exc_info=ex)
self.send_response(500)


class HealthCheckServer(bootsteps.StartStopStep):
# ignore kwargs type
def __init__(self, parent: WorkController, **kwargs): # type: ignore [arg-type, no-untyped-def]
self.thread: Optional[threading.Thread] = None
self.http_server: Optional[HTTPServer] = None

self.parent = parent

# config
self.healthcheck_port = int(
getattr(parent.app.conf, "healthcheck_port", HEALTHCHECK_DEFAULT_PORT)
)
self.healthcheck_ping_timeout = float(
getattr(
parent.app.conf,
"healthcheck_ping_timeout",
HEALTHCHECK_DEFAULT_PING_TIMEOUT,
)
)
self.shutdown_timeout = float(
getattr(
parent.app.conf,
"shutdown_timeout",
HEALTHCHECK_DEFAULT_HTTP_SERVER_SHUTDOWN_TIMEOUT,
)
)

super().__init__(parent, **kwargs)

# The mypy hints for an HTTP handler are strange, so ignoring them here
def http_handler(self, *args) -> None: # type: ignore [arg-type, no-untyped-def]
HealthcheckHandler(self.parent, self.healthcheck_ping_timeout, *args)

def start(self, parent: WorkController) -> None:
# Ignore mypy hints here as the constructed object immediately handles the request
# (if you look in the source code for SimpleHTTPRequestHandler, specifically the finalize request method)
self.http_server = HTTPServer(
("0.0.0.0", self.healthcheck_port), self.http_handler # type: ignore [arg-type]
)

self.thread = threading.Thread(
target=self.http_server.serve_forever, daemon=True
)
self.thread.start()

def stop(self, parent: WorkController) -> None:
if self.http_server is None:
logger.warning(
"Requested stop of HTTP healthcheck server, but no server was started"
)
else:
logger.info(
f"Stopping health check server with a timeout of {self.shutdown_timeout} seconds"
)
self.http_server.shutdown()

# Really this should not happen if the HTTP server is None, but just in case, we should check.
if self.thread is None:
logger.warning("No thread in HTTP healthcheck server to shutdown...")
else:
self.thread.join(self.shutdown_timeout)

logger.info(f"Health check server stopped on port {self.healthcheck_port}")
6 changes: 6 additions & 0 deletions src/fides/config/celery_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ class CelerySettings(FidesSettings):
description="If true, tasks are executed locally instead of being sent to the queue. "
"If False, tasks are sent to the queue.",
)
healthcheck_port: int = Field(
default=9000, description="The port to use for the health check endpoint"
)
healthcheck_ping_timeout: float = Field(
default=2.0, description="The timeout in seconds for the health check ping"
)
model_config = SettingsConfigDict(env_prefix=ENV_PREFIX)


Expand Down
25 changes: 10 additions & 15 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
from fides.api.schemas.messaging.messaging import MessagingServiceType
from fides.api.schemas.privacy_request import PrivacyRequestStatus
from fides.api.task.graph_runners import access_runner, consent_runner, erasure_runner
from fides.api.tasks import celery_app
from fides.api.tasks import celery_app, celery_healthcheck
from fides.api.tasks.scheduled.scheduler import async_scheduler, scheduler
from fides.api.util.cache import get_cache
from fides.api.util.collection_util import Row
Expand Down Expand Up @@ -767,6 +767,10 @@ def celery_enable_logging():
"""Turns on celery output logs."""
return True

@pytest.fixture(scope="session")
def celery_session_app(celery_session_app):
celery_healthcheck.register(celery_session_app)
return celery_session_app

# This is here because the test suite occasionally fails to teardown the
# Celery worker if it takes too long to terminate the worker thread. This
Expand All @@ -792,6 +796,7 @@ def celery_session_worker(
with worker.start_worker(
celery_session_app,
pool=celery_worker_pool,
shutdown_timeout=2.0,
**celery_worker_parameters,
) as w:
try:
Expand All @@ -802,18 +807,6 @@ def celery_session_worker(
except RuntimeError as re:
logger.warning("Failed to stop the celery worker: " + str(re))


@pytest.fixture(scope="session")
def celery_worker_parameters():
"""Configure celery worker parameters for testing.

Increase shutdown_timeout to avoid flaky test failures when the worker
takes longer to shut down, especially during parallel test runs with pytest-xdist.
The CI environment can be slow, so we use a generous timeout.
"""
return {"shutdown_timeout": 20.0}


Comment on lines -805 to -816
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yay!

@pytest.fixture(autouse=True, scope="session")
def celery_use_virtual_worker(celery_session_worker):
"""
Expand Down Expand Up @@ -918,7 +911,8 @@ def access_runner_tester(
connection_configs,
identity,
session,
privacy_request_proceed=False, # This allows the DSR 3.0 Access Runner to be tested in isolation, to just test running the access graph without queuing the privacy request
privacy_request_proceed=False,
# This allows the DSR 3.0 Access Runner to be tested in isolation, to just test running the access graph without queuing the privacy request
)
except PrivacyRequestExit:
# DSR 3.0 intentionally raises a PrivacyRequestExit status while it waits for
Expand Down Expand Up @@ -976,7 +970,8 @@ def consent_runner_tester(
connection_configs,
identity,
session,
privacy_request_proceed=False, # This allows the DSR 3.0 Consent Runner to be tested in isolation, to just test running the consent graph without queuing the privacy request
privacy_request_proceed=False,
# This allows the DSR 3.0 Consent Runner to be tested in isolation, to just test running the consent graph without queuing the privacy request
)
except PrivacyRequestExit:
# DSR 3.0 intentionally raises a PrivacyRequestExit status while it waits for
Expand Down
34 changes: 34 additions & 0 deletions tests/task/test_healthcheck_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import pytest
import requests
from loguru import logger


class TestCeleryHealthCheckServer:
def test_responds_to_ping_properly(self, celery_session_app, celery_session_worker):
try:
response = requests.get("http://127.0.0.1:9000/")
assert response.status_code == 200
assert response.json()["status"] == "ok"
except requests.exceptions.ConnectionError:
pytest.fail("Connection error")


class TestCeleryHealthCheckWorker:
@pytest.fixture(autouse=True)
def setup_teardown(self):
yield
with pytest.raises(Exception):
requests.get("http://127.0.0.1:9000/", timeout=1)

def test_shutdown_gracefully(self, celery_session_app, celery_session_worker):
try:
logger.info("Shutdown gracefully")
celery_session_worker.stop()
logger.info("Shutdown gracefully finished")
except Exception:
pytest.fail("Failed to stop health check server")





Loading