diff --git a/src/exporter.py b/src/exporter.py index 3e7f8f1..9893243 100644 --- a/src/exporter.py +++ b/src/exporter.py @@ -1,4 +1,5 @@ # pylint: disable=protected-access,,attribute-defined-outside-init +from functools import lru_cache import json import re import sys @@ -7,7 +8,7 @@ from typing import Callable, Optional from celery import Celery -from celery.events.state import State # type: ignore +from celery.events.state import State, Task # type: ignore from celery.utils import nodesplit # type: ignore from celery.utils.time import utcoffset # type: ignore from kombu.exceptions import ChannelError # type: ignore @@ -141,6 +142,27 @@ def scrape(self): self.track_timed_out_workers() self.track_queue_metrics() + @lru_cache(maxsize=32) + def find_queue_by_task(self, target: Task) -> str: + """Provider a queue name based on metadata coming from eiether a worker or a task being processed by it""" + + try: + queue_name = None + # https://github.com/celery/celery/issues/5321 + # task_info = self.app.control.inspect().query_task(task.id) + # As the received tasks are considered as active ones + task_set = self.app.control.inspect().registered().get(target.hostname) + task_info = [task for task in task_set if task['id'] == target.id].pop() + queue_name = task_info["delivery_info"]["routing_key"] + except TimeoutError as error: + # the broker doesn't respond + logger.error(f"Couldn't fetch the task info of {target.id}: {error.strerror}") + except IndexError as error: + # couldn't find the target task by id + # the dictionary path is missing + logger.warning(f"Couldn't find the target task by its id: {target.id}") + return queue_name + def forget_worker(self, hostname): if hostname in self.worker_last_seen: self.celery_worker_up.labels(hostname=hostname).set(0) @@ -263,7 +285,9 @@ def track_task_event(self, event): labels = { "name": task.name, "hostname": get_hostname(task.hostname), - "queue_name": getattr(task, "queue", "celery"), + # queue property should be available when a task is called with a passed queue name + # otherwise, we need to query its meta using the celery instance + "queue_name": getattr(task, "queue", self.find_queue_by_task(task) or "celery"), } if event["type"] == "task-sent" and self.generic_hostname_task_sent_metric: labels["hostname"] = "generic"