Skip to content
This repository was archived by the owner on Feb 20, 2025. It is now read-only.

Commit 56fde44

Browse files
Feat: Worker Health Checks (hatchet-dev#289)
* feat: add prom client * feat: naive healthcheck * feat: add health check prom support + custom port * fix: lint * fix: prom gauge * feat: minor version * feat: conditionally run health check * fix: use -1 for prom gauge * fix: revert back to 0
1 parent fe66c0c commit 56fde44

File tree

4 files changed

+84
-3
lines changed

4 files changed

+84
-3
lines changed

hatchet_sdk/loader.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ def __init__(
4343
otel_service_name: str | None = None,
4444
otel_exporter_oltp_headers: dict[str, str] | None = None,
4545
otel_exporter_oltp_protocol: str | None = None,
46+
worker_healthcheck_port: int | None = None,
47+
worker_healthcheck_enabled: bool | None = None,
4648
):
4749
self.tenant_id = tenant_id
4850
self.tls_config = tls_config
@@ -57,6 +59,8 @@ def __init__(
5759
self.otel_service_name = otel_service_name
5860
self.otel_exporter_oltp_headers = otel_exporter_oltp_headers
5961
self.otel_exporter_oltp_protocol = otel_exporter_oltp_protocol
62+
self.worker_healthcheck_port = worker_healthcheck_port
63+
self.worker_healthcheck_enabled = worker_healthcheck_enabled
6064

6165
if not self.logInterceptor:
6266
self.logInterceptor = getLogger()
@@ -163,6 +167,23 @@ def get_config_value(key, env_var):
163167
"otel_exporter_oltp_protocol", "HATCHET_CLIENT_OTEL_EXPORTER_OTLP_PROTOCOL"
164168
)
165169

170+
worker_healthcheck_port = int(
171+
get_config_value(
172+
"worker_healthcheck_port", "HATCHET_CLIENT_WORKER_HEALTHCHECK_PORT"
173+
)
174+
or 8001
175+
)
176+
177+
worker_healthcheck_enabled = (
178+
str(
179+
get_config_value(
180+
"worker_healthcheck_port",
181+
"HATCHET_CLIENT_WORKER_HEALTHCHECK_ENABLED",
182+
)
183+
)
184+
== "True"
185+
)
186+
166187
return ClientConfig(
167188
tenant_id=tenant_id,
168189
tls_config=tls_config,
@@ -178,6 +199,8 @@ def get_config_value(key, env_var):
178199
otel_service_name=otel_service_name,
179200
otel_exporter_oltp_headers=otel_exporter_oltp_headers,
180201
otel_exporter_oltp_protocol=otel_exporter_oltp_protocol,
202+
worker_healthcheck_port=worker_healthcheck_port,
203+
worker_healthcheck_enabled=worker_healthcheck_enabled,
181204
)
182205

183206
def _load_tls_config(self, tls_data: Dict, host_port) -> ClientTLSConfig:

hatchet_sdk/worker/worker.py

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@
1212
from types import FrameType
1313
from typing import Any, Callable, TypeVar, get_type_hints
1414

15-
from pydantic import BaseModel
15+
from aiohttp import web
16+
from aiohttp.web_request import Request
17+
from aiohttp.web_response import Response
18+
from prometheus_client import CONTENT_TYPE_LATEST, Gauge, generate_latest
1619

1720
from hatchet_sdk import Context
1821
from hatchet_sdk.client import Client, new_client_raw
@@ -88,6 +91,10 @@ def __init__(
8891

8992
self._setup_signal_handlers()
9093

94+
self.worker_status_gauge = Gauge(
95+
"hatchet_worker_status", "Current status of the Hatchet worker"
96+
)
97+
9198
def register_function(self, action: str, func: Callable[[Context], Any]) -> None:
9299
self.action_registry[action] = func
93100

@@ -155,6 +162,39 @@ def setup_loop(self, loop: asyncio.AbstractEventLoop | None = None) -> bool:
155162
created_loop = True
156163
return created_loop
157164

165+
async def health_check_handler(self, request: Request) -> Response:
166+
status = self.status()
167+
168+
return web.json_response({"status": status.name})
169+
170+
async def metrics_handler(self, request: Request) -> Response:
171+
self.worker_status_gauge.set(1 if self.status() == WorkerStatus.HEALTHY else 0)
172+
173+
return web.Response(body=generate_latest(), content_type="text/plain")
174+
175+
async def start_health_server(self) -> None:
176+
port = self.config.worker_healthcheck_port or 8001
177+
178+
app = web.Application()
179+
app.add_routes(
180+
[
181+
web.get("/health", self.health_check_handler),
182+
web.get("/metrics", self.metrics_handler),
183+
]
184+
)
185+
186+
runner = web.AppRunner(app)
187+
188+
try:
189+
await runner.setup()
190+
await web.TCPSite(runner, "0.0.0.0", port).start()
191+
except Exception as e:
192+
logger.error("failed to start healthcheck server")
193+
logger.error(str(e))
194+
return
195+
196+
logger.info(f"healthcheck server running on port {port}")
197+
158198
def start(
159199
self, options: WorkerStartOptions = WorkerStartOptions()
160200
) -> Future[asyncio.Task[Any] | None]:
@@ -196,6 +236,9 @@ async def async_start(
196236
if not _from_start:
197237
self.setup_loop(options.loop)
198238

239+
if self.config.worker_healthcheck_enabled:
240+
await self.start_health_server()
241+
199242
self.action_listener_process = self._start_listener()
200243

201244
self.action_runner = self._run_action_runner()

poetry.lock

Lines changed: 15 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "hatchet-sdk"
3-
version = "0.42.5"
3+
version = "0.43.0"
44
description = ""
55
authors = ["Alexander Belanger <[email protected]>"]
66
readme = "README.md"
@@ -28,6 +28,7 @@ opentelemetry-instrumentation = "^0.48b0"
2828
opentelemetry-distro = "^0.48b0"
2929
opentelemetry-exporter-otlp = "^1.27.0"
3030
opentelemetry-exporter-otlp-proto-http = "^1.27.0"
31+
prometheus-client = "^0.21.1"
3132

3233
[tool.poetry.group.dev.dependencies]
3334
pytest = "^8.2.2"

0 commit comments

Comments
 (0)