-
Notifications
You must be signed in to change notification settings - Fork 91
Expand file tree
/
Copy pathtest_healthcheck_server.py
More file actions
36 lines (25 loc) · 1016 Bytes
/
test_healthcheck_server.py
File metadata and controls
36 lines (25 loc) · 1016 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import time
import pytest
import requests
from loguru import logger
class TestCeleryHealthCheckServer:
def test_responds_to_ping_properly(self, celery_session_app, celery_session_worker):
try:
response = requests.get("http://127.0.0.1:9000/")
assert response.status_code == 200
assert response.json()["status"] == "ok"
except requests.exceptions.ConnectionError:
pytest.fail("Connection error")
class TestCeleryHealthCheckWorker:
@pytest.fixture(autouse=True)
def setup_teardown(self):
yield
with pytest.raises(Exception):
requests.get("http://127.0.0.1:9000/", timeout=1)
def test_shutdown_gracefully(self, celery_session_app, celery_session_worker):
try:
logger.info("Shutdown gracefully")
celery_session_worker.stop()
logger.info("Shutdown gracefully finished")
except Exception:
pytest.fail("Failed to stop health check server")