-
Notifications
You must be signed in to change notification settings - Fork 761
Additional metrics exported from Celery workers #3463
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -76,6 +76,7 @@ def add(x, y): | |
from opentelemetry.metrics import get_meter | ||
from opentelemetry.propagate import extract, inject | ||
from opentelemetry.propagators.textmap import Getter | ||
from opentelemetry.semconv._incubating.metrics import messaging_metrics | ||
from opentelemetry.semconv.trace import SpanAttributes | ||
from opentelemetry.trace.status import Status, StatusCode | ||
|
||
|
@@ -96,6 +97,12 @@ def add(x, y): | |
_TASK_REVOKED_TERMINATED_SIGNAL_KEY = "celery.terminated.signal" | ||
_TASK_NAME_KEY = "celery.task_name" | ||
|
||
# Metric names | ||
_TASK_COUNT_ACTIVE = "messaging.client.active_tasks" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These 3 new metrics are not in semantic conventions right? I don't think we should add them to the same namespace as the others. If adding unspecified metrics at all. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just to summarize my viewpoint
By different namespace, do you mean replacing |
||
_TASK_COUNT_PREFETCHED = "messaging.client.prefetched_tasks" | ||
_TASK_PROCESSING_TIME = messaging_metrics.MESSAGING_PROCESS_DURATION | ||
_TASK_PREFETCH_TIME = "messaging.prefetch.duration" | ||
|
||
|
||
class CeleryGetter(Getter): | ||
def get(self, carrier, key): | ||
|
@@ -113,10 +120,36 @@ def keys(self, carrier): | |
celery_getter = CeleryGetter() | ||
|
||
|
||
class CeleryInstrumentor(BaseInstrumentor): | ||
metrics = None | ||
task_id_to_start_time = {} | ||
class TaskDurationTracker: | ||
def __init__(self, metrics): | ||
self.metrics = metrics | ||
self.tracker = {} | ||
|
||
def record_start(self, key, step): | ||
self.tracker.setdefault(key, {})[step] = default_timer() | ||
|
||
def record_finish(self, key, metric_name, attributes): | ||
try: | ||
time_elapsed = self._time_elapsed(key, metric_name) | ||
self.metrics[metric_name].record( | ||
max(0, time_elapsed), attributes=attributes | ||
) | ||
except KeyError: | ||
logger.warning("Failed to record %s for task %s", metric_name, key) | ||
|
||
def _time_elapsed(self, key, step): | ||
end_time = default_timer() | ||
try: | ||
start_time = self.tracker.get(key, {}).pop(step) | ||
time_elapsed = end_time - start_time | ||
return time_elapsed | ||
finally: | ||
# Cleanup operation | ||
if key in self.tracker and not self.tracker.get(key): | ||
self.tracker.pop(key) | ||
|
||
|
||
class CeleryInstrumentor(BaseInstrumentor): | ||
def instrumentation_dependencies(self) -> Collection[str]: | ||
return _instruments | ||
|
||
|
@@ -139,8 +172,10 @@ def _instrument(self, **kwargs): | |
schema_url="https://opentelemetry.io/schemas/1.11.0", | ||
) | ||
|
||
self.create_celery_metrics(meter) | ||
self.metrics = _create_celery_worker_metrics(meter) | ||
self.time_tracker = TaskDurationTracker(self.metrics) | ||
|
||
signals.task_received.connect(self._trace_received, weak=False) | ||
signals.task_prerun.connect(self._trace_prerun, weak=False) | ||
signals.task_postrun.connect(self._trace_postrun, weak=False) | ||
signals.before_task_publish.connect( | ||
|
@@ -153,27 +188,52 @@ def _instrument(self, **kwargs): | |
signals.task_retry.connect(self._trace_retry, weak=False) | ||
|
||
def _uninstrument(self, **kwargs): | ||
signals.task_received.disconnect(self._trace_received) | ||
signals.task_prerun.disconnect(self._trace_prerun) | ||
signals.task_postrun.disconnect(self._trace_postrun) | ||
signals.before_task_publish.disconnect(self._trace_before_publish) | ||
signals.after_task_publish.disconnect(self._trace_after_publish) | ||
signals.task_failure.disconnect(self._trace_failure) | ||
signals.task_retry.disconnect(self._trace_retry) | ||
|
||
def _trace_received(self, *args, **kwargs): | ||
""" | ||
On receive signal, task is prefetched and prefetch timer starts | ||
""" | ||
|
||
request = utils.retrieve_request(kwargs) | ||
|
||
metrics_attributes = utils.get_metrics_attributes_from_request(request) | ||
self.metrics[_TASK_COUNT_PREFETCHED].add( | ||
1, attributes=metrics_attributes | ||
) | ||
self.time_tracker.record_start(request.task_id, _TASK_PREFETCH_TIME) | ||
|
||
def _trace_prerun(self, *args, **kwargs): | ||
""" | ||
On prerun signal, task is no longer prefetched, and execution timer | ||
starts along with the task span | ||
""" | ||
|
||
task = utils.retrieve_task(kwargs) | ||
task_id = utils.retrieve_task_id(kwargs) | ||
|
||
if task is None or task_id is None: | ||
return | ||
|
||
self.update_task_duration_time(task_id) | ||
metrics_attributes = utils.get_metrics_attributes_from_task(task) | ||
self.metrics[_TASK_COUNT_PREFETCHED].add( | ||
-1, attributes=metrics_attributes | ||
) | ||
self.time_tracker.record_finish( | ||
task_id, _TASK_PREFETCH_TIME, metrics_attributes | ||
) | ||
self.time_tracker.record_start(task_id, _TASK_PROCESSING_TIME) | ||
|
||
request = task.request | ||
tracectx = extract(request, getter=celery_getter) or None | ||
token = context_api.attach(tracectx) if tracectx is not None else None | ||
|
||
logger.debug("prerun signal start task_id=%s", task_id) | ||
|
||
operation_name = f"{_TASK_RUN}/{task.name}" | ||
span = self._tracer.start_span( | ||
operation_name, context=tracectx, kind=trace.SpanKind.CONSUMER | ||
|
@@ -183,14 +243,24 @@ def _trace_prerun(self, *args, **kwargs): | |
activation.__enter__() # pylint: disable=E1101 | ||
utils.attach_context(task, task_id, span, activation, token) | ||
|
||
self.metrics[_TASK_COUNT_ACTIVE].add(1, attributes=metrics_attributes) | ||
|
||
def _trace_postrun(self, *args, **kwargs): | ||
""" | ||
On postrun signal, task is no longer being executed | ||
""" | ||
|
||
task = utils.retrieve_task(kwargs) | ||
task_id = utils.retrieve_task_id(kwargs) | ||
|
||
if task is None or task_id is None: | ||
return | ||
|
||
logger.debug("postrun signal task_id=%s", task_id) | ||
metrics_attributes = utils.get_metrics_attributes_from_task(task) | ||
self.metrics[_TASK_COUNT_ACTIVE].add(-1, attributes=metrics_attributes) | ||
self.time_tracker.record_finish( | ||
task_id, _TASK_PROCESSING_TIME, metrics_attributes | ||
) | ||
|
||
# retrieve and finish the Span | ||
ctx = utils.retrieve_context(task, task_id) | ||
|
@@ -210,10 +280,8 @@ def _trace_postrun(self, *args, **kwargs): | |
|
||
activation.__exit__(None, None, None) | ||
utils.detach_context(task, task_id) | ||
self.update_task_duration_time(task_id) | ||
labels = {"task": task.name, "worker": task.request.hostname} | ||
self._record_histograms(task_id, labels) | ||
# if the process sending the task is not instrumented | ||
|
||
# If the process sending the task is not instrumented, | ||
# there's no incoming context and no token to detach | ||
if token is not None: | ||
context_api.detach(token) | ||
|
@@ -345,29 +413,29 @@ def _trace_retry(*args, **kwargs): | |
# something that isn't an `Exception` | ||
span.set_attribute(_TASK_RETRY_REASON_KEY, str(reason)) | ||
|
||
def update_task_duration_time(self, task_id): | ||
cur_time = default_timer() | ||
task_duration_time_until_now = ( | ||
cur_time - self.task_id_to_start_time[task_id] | ||
if task_id in self.task_id_to_start_time | ||
else cur_time | ||
) | ||
self.task_id_to_start_time[task_id] = task_duration_time_until_now | ||
|
||
def _record_histograms(self, task_id, metric_attributes): | ||
if task_id is None: | ||
return | ||
|
||
self.metrics["flower.task.runtime.seconds"].record( | ||
self.task_id_to_start_time.get(task_id), | ||
attributes=metric_attributes, | ||
) | ||
|
||
def create_celery_metrics(self, meter) -> None: | ||
self.metrics = { | ||
"flower.task.runtime.seconds": meter.create_histogram( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. need to discuss if we can keep this and the new metric to avoid breakage. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's fine of course to keep this for backwards compatibility, albeit not sure it's worth it in this case. Metric name has nothing to do with OTel or semantic conventions, and references an unrelated project. :/ |
||
name="flower.task.runtime.seconds", | ||
unit="seconds", | ||
description="The time it took to run the task.", | ||
) | ||
} | ||
def _create_celery_worker_metrics(meter) -> None: | ||
metrics = { | ||
_TASK_COUNT_ACTIVE: meter.create_up_down_counter( | ||
name=_TASK_COUNT_ACTIVE, | ||
unit="{message}", | ||
description="Number of tasks currently being executed by the worker", | ||
), | ||
_TASK_COUNT_PREFETCHED: meter.create_up_down_counter( | ||
name=_TASK_COUNT_PREFETCHED, | ||
unit="{message}", | ||
description="Number of tasks prefetched by the worker", | ||
), | ||
_TASK_PREFETCH_TIME: meter.create_histogram( | ||
name=_TASK_PREFETCH_TIME, | ||
unit="s", | ||
description="The time the task spent in prefetch mode", | ||
), | ||
_TASK_PROCESSING_TIME: meter.create_histogram( | ||
name=_TASK_PROCESSING_TIME, | ||
unit="s", | ||
description="The time it took to run the task.", | ||
), | ||
} | ||
|
||
return metrics |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please extract this change in another PR so we can get this reviewed and hopefully merged before next release?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say the same. The memory leak seems more feasible to review and merge.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I opened this small PR to just fix the leak.