diff --git a/poetry.lock b/poetry.lock index a315c17..22d539a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.2.1 and should not be changed by hand. [[package]] name = "altgraph" @@ -153,6 +153,18 @@ files = [ {file = "blinker-1.9.0.tar.gz", hash = "sha256:b4ce2265a7abece45e7cc896e98dbebe6cead56bcf805a3d23136d145f5445bf"}, ] +[[package]] +name = "cachetools" +version = "6.2.1" +description = "Extensible memoizing collections and decorators" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "cachetools-6.2.1-py3-none-any.whl", hash = "sha256:09868944b6dde876dfd44e1d47e18484541eaf12f26f29b7af91b26cc892d701"}, + {file = "cachetools-6.2.1.tar.gz", hash = "sha256:3f391e4bd8f8bf0931169baf7456cc822705f4e2a31f840d218f445b9a854201"}, +] + [[package]] name = "celery" version = "5.5.3" @@ -1467,6 +1479,18 @@ files = [ {file = "tomlkit-0.13.3.tar.gz", hash = "sha256:430cf247ee57df2b94ee3fbe588e71d362a941ebb545dec29b53961d61add2a1"}, ] +[[package]] +name = "types-cachetools" +version = "6.2.0.20250827" +description = "Typing stubs for cachetools" +optional = false +python-versions = ">=3.9" +groups = ["dev"] +files = [ + {file = "types_cachetools-6.2.0.20250827-py3-none-any.whl", hash = "sha256:96ae5abcb5ea1e1f1faf811a2ff8b2ce7e6d820fc42c4fcb4b332b2da485de16"}, + {file = "types_cachetools-6.2.0.20250827.tar.gz", hash = "sha256:f27febfd1b5e517e3cb1ca6daf38ad6ddb4eeb1e29bdbd81a082971ba30c0d8e"}, +] + [[package]] name = "types-python-dateutil" version = "2.9.0.20250516" @@ -1688,4 +1712,4 @@ dev = ["black (>=19.3b0) ; python_version >= \"3.6\"", "pytest (>=4.6.2)"] [metadata] lock-version = "2.1" python-versions = ">=3.11,<3.14" -content-hash = "e773f36585ed1fdb9af35153eb292c58c8e5a5d688f6e7ff3981929f158dd173" +content-hash = "f5b462ca1171220afcc13afc75de2813d998875cbd69777ba95450d2745b3454" diff --git a/pyproject.toml b/pyproject.toml index ad2c8b5..bc8e0fa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,6 +43,7 @@ Flask = "^3.1.1" waitress = "^3.0.2" arrow = "^1.3.0" timy = "^0.4.2" +cachetools = "^6.2.0" [tool.poetry.group.dev.dependencies] pytest = "^8.2.2" @@ -64,6 +65,7 @@ pytest-celery = "^0.0.0" pylint = "^3.3.1" certifi = "^2024.8.30" idna = "^3.7" +types-cachetools = "^6.2.0" [build-system] requires = ["poetry-core>=1.0.0a5"] diff --git a/src/exporter.py b/src/exporter.py index 3b9577c..19ed9c1 100644 --- a/src/exporter.py +++ b/src/exporter.py @@ -4,8 +4,9 @@ import sys import time from collections import defaultdict -from typing import Callable, Optional +from typing import Callable, Dict, Optional +from cachetools import LRUCache from celery import Celery from celery.events.state import State # type: ignore from celery.utils import nodesplit # type: ignore @@ -31,6 +32,7 @@ def __init__( metric_prefix="celery_", default_queue_name="celery", static_label=None, + max_tasks_in_memory=50000, ): self.registry = CollectorRegistry(auto_describe=True) self.queue_cache = set(initial_queues or []) @@ -46,6 +48,11 @@ def __init__( self.static_label = static_label or {} self.static_label_keys = self.static_label.keys() + # Track task received timestamps for latency calculation + self.task_received_times: Dict[str, float] = LRUCache( + maxsize=max_tasks_in_memory + ) + self.state_counters = { "task-sent": Counter( f"{metric_prefix}task_sent", @@ -122,6 +129,13 @@ def __init__( registry=self.registry, buckets=buckets or Histogram.DEFAULT_BUCKETS, ) + self.celery_task_latency = Histogram( + f"{metric_prefix}task_latency", + "Histogram of task latency measurements (time between received and started).", + ["name", "hostname", "queue_name", *self.static_label_keys], + registry=self.registry, + buckets=buckets or Histogram.DEFAULT_BUCKETS, + ) self.celery_queue_length = Gauge( f"{metric_prefix}queue_length", "The number of message in broker queue.", @@ -194,6 +208,10 @@ def purge_worker_metrics(self, hostname): if hostname in label_seq: self.celery_task_runtime.remove(*label_seq) + for label_seq in list(self.celery_task_latency._metrics.keys()): + if hostname in label_seq: + self.celery_task_latency.remove(*label_seq) + del self.worker_last_seen[hostname] def track_timed_out_workers(self): @@ -287,6 +305,29 @@ def track_task_event(self, event): if event["type"] == "task-sent" and self.generic_hostname_task_sent_metric: labels["hostname"] = "generic" + # Store timestamp when task is received for latency calculation + if event["type"] == "task-received": + self.task_received_times[event["uuid"]] = event.get( + "local_received" + ) or event.get("timestamp") + logger.debug("Stored received timestamp for task uuid='{}'", event["uuid"]) + + # Calculate and observe latency when task starts + if event["type"] == "task-started": + received_time = self.task_received_times.get(event["uuid"]) + if received_time: + started_time = event.get("local_received") or event.get("timestamp") + latency = started_time - received_time + self.celery_task_latency.labels(**labels).observe(latency) + logger.debug( + "Observed metric='{}' labels='{}': {}s", + self.celery_task_latency._name, + labels, + latency, + ) + # Clean up stored timestamp + del self.task_received_times[event["uuid"]] + for counter_name, counter in self.state_counters.items(): _labels = labels.copy() diff --git a/src/test_metrics.py b/src/test_metrics.py index dfc35a8..630e48c 100644 --- a/src/test_metrics.py +++ b/src/test_metrics.py @@ -231,3 +231,40 @@ def succeed(): ) is None ) + + +@pytest.mark.celery() +def test_task_latency(threaded_exporter, celery_app, hostname): + time.sleep(5) + + @celery_app.task + def latency_task(): + time.sleep(0.1) + + with start_worker(celery_app, without_heartbeat=False): + latency_task.delay().get() + time.sleep(2) + + # Check that latency metric was recorded + latency_count = threaded_exporter.registry.get_sample_value( + "celery_task_latency_count", + labels={ + "hostname": hostname, + "name": "src.test_metrics.latency_task", + "queue_name": "celery", + }, + ) + print(latency_count) + assert latency_count == 1.0 + + # Check that latency value is reasonable (should be very small for in-memory broker) + latency_sum = threaded_exporter.registry.get_sample_value( + "celery_task_latency_sum", + labels={ + "hostname": hostname, + "name": "src.test_metrics.latency_task", + "queue_name": "celery", + }, + ) + assert latency_sum is not None + assert latency_sum >= 0 # Latency should be non-negative