Skip to content

Commit e498fd7

Browse files
authored
Adds task_queue_size OTEL metric. (#826)
This metric represents the total number of running and waiting tasks that are older than 5 seconds.
1 parent 7217dba commit e498fd7

File tree

1 file changed

+52
-3
lines changed

1 file changed

+52
-3
lines changed

pulp_service/pulp_service/tasking/new_worker.py

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,12 @@
88

99
from gettext import gettext as _
1010
import functools
11-
import hashlib
1211
import logging
1312
import os
1413
import random
1514
import select
1615
import signal
1716
import time
18-
import uuid
1917
from datetime import timedelta
2018
from multiprocessing import Process
2119
from tempfile import TemporaryDirectory
@@ -30,7 +28,9 @@
3028
TASK_FINAL_STATES,
3129
TASK_SCHEDULING_LOCK,
3230
WORKER_CLEANUP_LOCK,
31+
TASK_METRICS_LOCK,
3332
)
33+
from pulpcore.metrics import init_otel_meter
3434
from pulpcore.app.apps import pulp_plugin_configs
3535
from pulpcore.app.util import get_worker_name
3636
from pulpcore.app.models import Task, AppStatus
@@ -40,7 +40,6 @@
4040
dispatch_scheduled_tasks,
4141
perform_task,
4242
startup_hook,
43-
REDIS_LOCK_PREFIX,
4443
resource_to_lock_key,
4544
release_resource_locks,
4645
acquire_locks,
@@ -126,6 +125,9 @@ def __init__(self):
126125
int(WORKER_CLEANUP_INTERVAL / 10), WORKER_CLEANUP_INTERVAL
127126
)
128127

128+
# Metric recording interval (every 3 heartbeats)
129+
self.metric_heartbeat_countdown = 3
130+
129131
# Cache worker count for sleep calculation (updated during beat)
130132
self.num_workers = 1
131133

@@ -138,10 +140,25 @@ def __init__(self):
138140
os.set_blocking(sentinel_w, False)
139141
signal.set_wakeup_fd(sentinel_w)
140142

143+
self._init_instrumentation()
144+
141145
startup_hook()
142146

143147
_logger.info("Initialized NewPulpcoreWorker with Redis lock-based algorithm")
144148

149+
def _init_instrumentation(self):
150+
"""Initialize OpenTelemetry instrumentation if enabled."""
151+
if settings.OTEL_ENABLED:
152+
meter = init_otel_meter("pulp-worker")
153+
self.task_queue_size_meter = meter.create_gauge(
154+
name="task_queue_size",
155+
description="Number of running and waiting tasks older than 5 seconds.",
156+
unit="tasks",
157+
)
158+
self.otel_enabled = True
159+
else:
160+
self.otel_enabled = False
161+
145162
def _signal_handler(self, thesignal, frame):
146163
"""Handle shutdown signals."""
147164
if thesignal in (signal.SIGHUP, signal.SIGTERM):
@@ -290,6 +307,31 @@ def dispatch_scheduled_tasks(self):
290307
"""Dispatch scheduled tasks."""
291308
dispatch_scheduled_tasks()
292309

310+
@exclusive(TASK_METRICS_LOCK)
311+
def record_task_queue_size_metric(self):
312+
"""
313+
Record metrics for running and waiting tasks older than 5 seconds.
314+
315+
This method counts tasks in RUNNING or WAITING state that were created
316+
more than 5 seconds ago, providing visibility into task queue depth.
317+
"""
318+
# Calculate the cutoff time (5 seconds ago)
319+
cutoff_time = timezone.now() - timedelta(seconds=5)
320+
321+
# Count tasks in RUNNING or WAITING state older than 5 seconds
322+
task_count = Task.objects.filter(
323+
state__in=[TASK_STATES.RUNNING, TASK_STATES.WAITING],
324+
pulp_created__lt=cutoff_time
325+
).count()
326+
327+
# Set the metric value
328+
self.task_queue_size_meter.set(task_count)
329+
330+
_logger.debug(
331+
"Task queue size metric: %d tasks (running or waiting, older than 5s)",
332+
task_count
333+
)
334+
293335
def beat(self):
294336
"""Periodic worker maintenance tasks (heartbeat, cleanup, etc.)."""
295337
now = timezone.now()
@@ -308,6 +350,13 @@ def beat(self):
308350

309351
self.dispatch_scheduled_tasks()
310352

353+
# Record metrics every 3 heartbeats
354+
if self.otel_enabled:
355+
self.metric_heartbeat_countdown -= 1
356+
if self.metric_heartbeat_countdown <= 0:
357+
self.metric_heartbeat_countdown = 3
358+
self.record_task_queue_size_metric()
359+
311360
# Update cached worker count for sleep calculation
312361
self.num_workers = AppStatus.objects.online().filter(app_type='worker').count()
313362

0 commit comments

Comments
 (0)