Skip to content

Commit fb1224e

Browse files
committed
Add backing off to auxilary workers
1 parent 018c49f commit fb1224e

File tree

3 files changed

+66
-28
lines changed

3 files changed

+66
-28
lines changed

CHANGES/+worker_backoff.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Added backing off on auxiliary workers if wrongly alarmed on pending tasks.

pulpcore/tasking/entrypoint.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ def worker(
3434
auxiliary,
3535
):
3636
"""A Pulp worker."""
37-
3837
if reload:
3938
try:
4039
import hupper

pulpcore/tasking/worker.py

Lines changed: 65 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import functools
44
import logging
5+
import math
56
import os
67
import random
78
import select
@@ -89,9 +90,11 @@ def __init__(self, auxiliary=False):
8990
self.wakeup_unblock = False
9091
self.wakeup_handle = False
9192
self.cancel_task = False
93+
self.unblocked_count = 0
9294

9395
self.ignored_task_ids = []
9496
self.ignored_task_countdown = IGNORED_TASKS_CLEANUP_INTERVAL
97+
self.false_alarms = 0
9598

9699
self.auxiliary = auxiliary
97100
self.task = None
@@ -158,6 +161,14 @@ def _signal_handler(self, thesignal, frame):
158161
self.shutdown_requested = True
159162

160163
def _pg_notify_handler(self, notification):
164+
if notification.channel == "pulp_worker_broadcast":
165+
key, value = notification.payload.split(":", maxsplit=1)
166+
_logger.debug("broadcast message recieved: %s: %s", key, value)
167+
if key == "unblocked_count":
168+
self.unblocked_count = int(value)
169+
self.wakeup_handle = self.unblocked_count > 0
170+
elif key == "metrics_heartbeat":
171+
self.last_metric_heartbeat = datetime.fromisoformat(key)
161172
if notification.channel == "pulp_worker_wakeup":
162173
if notification.payload == TASK_WAKEUP_UNBLOCK:
163174
# Auxiliary workers don't do this.
@@ -171,6 +182,7 @@ def _pg_notify_handler(self, notification):
171182
self.wakeup_handle = True
172183

173184
elif notification.channel == "pulp_worker_metrics_heartbeat":
185+
# TODO (in one of the next releases) Remove that superseeded channel.
174186
self.last_metric_heartbeat = datetime.fromisoformat(notification.payload)
175187
elif self.task and notification.channel == "pulp_worker_cancel":
176188
if notification.payload == str(self.task.pk):
@@ -257,6 +269,7 @@ def record_unblocked_waiting_tasks_metric(self, now):
257269
)
258270

259271
self.cursor.execute(f"NOTIFY pulp_worker_metrics_heartbeat, '{str(now)}'")
272+
self.broadcast("metrics_heartbeat", now)
260273

261274
def beat(self):
262275
now = timezone.now()
@@ -278,6 +291,9 @@ def beat(self):
278291
if self.otel_enabled and now > self.last_metric_heartbeat + self.heartbeat_period:
279292
self.record_unblocked_waiting_tasks_metric(now)
280293

294+
def broadcast(self, key, value):
295+
self.cursor.execute("SELECT pg_notify('pulp_worker_broadcast', %s)", (f"{key}:{value}",))
296+
281297
def notify_workers(self, reason):
282298
self.cursor.execute("SELECT pg_notify('pulp_worker_wakeup', %s)", (reason,))
283299

@@ -345,14 +361,17 @@ def unblock_tasks(self):
345361

346362
self.wakeup_unblock = False
347363
result = self._unblock_tasks()
348-
if result is not None and (
349-
Task.objects.filter(
350-
state__in=[TASK_STATES.WAITING, TASK_STATES.CANCELING], app_lock=None
364+
if result is not None:
365+
unblocked_count = (
366+
Task.objects.filter(
367+
state__in=[TASK_STATES.WAITING, TASK_STATES.CANCELING], app_lock=None
368+
)
369+
.exclude(unblocked_at=None)
370+
.count()
351371
)
352-
.exclude(unblocked_at=None)
353-
.exists()
354-
):
355-
self.notify_workers(TASK_WAKEUP_HANDLE)
372+
if unblocked_count > 0:
373+
self.notify_workers(TASK_WAKEUP_HANDLE)
374+
self.broadcast("unblocked_count", unblocked_count)
356375
return True
357376

358377
return result
@@ -369,6 +388,7 @@ def _unblock_tasks(self):
369388
.order_by("pulp_created")
370389
.select_related("pulp_domain")
371390
):
391+
_logger.debug("Considering task %s for unblocking.", task.pk)
372392
reserved_resources_record = task.reserved_resources_record or []
373393
exclusive_resources = [
374394
resource
@@ -389,23 +409,26 @@ def _unblock_tasks(self):
389409
)
390410
task.unblock()
391411

392-
elif (
393-
task.state == TASK_STATES.WAITING
394-
and task.unblocked_at is None
395-
# No exclusive resource taken?
396-
and not any(
397-
resource in taken_exclusive_resources or resource in taken_shared_resources
398-
for resource in exclusive_resources
399-
)
400-
# No shared resource exclusively taken?
401-
and not any(resource in taken_exclusive_resources for resource in shared_resources)
402-
):
403-
_logger.debug(
404-
"Marking waiting task %s in domain: %s unblocked.",
405-
task.pk,
406-
task.pulp_domain.name,
407-
)
408-
task.unblock()
412+
elif task.state == TASK_STATES.WAITING and task.unblocked_at is None:
413+
if (
414+
# No exclusive resource taken?
415+
not any(
416+
resource in taken_exclusive_resources or resource in taken_shared_resources
417+
for resource in exclusive_resources
418+
)
419+
# No shared resource exclusively taken?
420+
and not any(
421+
resource in taken_exclusive_resources for resource in shared_resources
422+
)
423+
):
424+
_logger.debug(
425+
"Marking waiting task %s in domain: %s unblocked.",
426+
task.pk,
427+
task.pulp_domain.name,
428+
)
429+
task.unblock()
430+
else:
431+
_logger.debug("Task %s is still blocked.", task.pk)
409432
elif task.state == TASK_STATES.RUNNING and task.unblocked_at is None:
410433
# This should not happen in normal operation.
411434
# And it is only an issue if the worker running that task died, because it will
@@ -421,13 +444,14 @@ def _unblock_tasks(self):
421444
# Record the resources of the pending task
422445
taken_exclusive_resources.update(exclusive_resources)
423446
taken_shared_resources.update(shared_resources)
447+
# raise Exception()
424448
return False
425449

426450
def sleep(self):
427451
"""Wait for signals on the wakeup channel while heart beating."""
428452

429-
_logger.debug(_("Worker %s entering sleep state."), self.name)
430-
while not self.shutdown_requested and not self.wakeup_handle:
453+
_logger.debug("Worker %s entering sleep state.", self.name)
454+
while not self.shutdown_requested:
431455
r, w, x = select.select(
432456
[self.sentinel, connection.connection],
433457
[],
@@ -441,7 +465,15 @@ def sleep(self):
441465
self.unblock_tasks()
442466
if self.sentinel in r:
443467
os.read(self.sentinel, 256)
444-
_logger.debug(_("Worker %s leaving sleep state."), self.name)
468+
if self.wakeup_handle:
469+
# There is probably a busy loop hidden here.
470+
if not self.auxiliary or random.random() < math.exp(
471+
self.unblocked_count - self.false_alarms
472+
):
473+
_logger.debug("Worker %s leaving sleep state.", self.name)
474+
break
475+
else:
476+
_logger.debug("Worker %s backing off", self.name)
445477

446478
def supervise_task(self, task):
447479
"""Call and supervise the task process while heart beating.
@@ -586,8 +618,12 @@ def handle_unblocked_tasks(self):
586618
task = self.fetch_task()
587619
if task is None:
588620
# No task found
621+
self.false_alarms += 1
622+
_logger.debug("False Alarms: %s", self.false_alarms)
589623
break
590624
try:
625+
self.false_alarms //= 2
626+
_logger.debug("False Alarms: %s", self.false_alarms)
591627
if task.state == TASK_STATES.CANCELING:
592628
# No worker picked this task up before being canceled.
593629
# Or the worker disappeared before handling the canceling.
@@ -615,6 +651,7 @@ def run(self, burst=False):
615651
signal.signal(signal.SIGHUP, self._signal_handler)
616652
# Subscribe to pgsql channels
617653
connection.connection.add_notify_handler(self._pg_notify_handler)
654+
self.cursor.execute("LISTEN pulp_worker_broadcast")
618655
self.cursor.execute("LISTEN pulp_worker_cancel")
619656
self.cursor.execute("LISTEN pulp_worker_metrics_heartbeat")
620657
if burst:
@@ -638,4 +675,5 @@ def run(self, burst=False):
638675
self.cursor.execute("UNLISTEN pulp_worker_wakeup")
639676
self.cursor.execute("UNLISTEN pulp_worker_metrics_heartbeat")
640677
self.cursor.execute("UNLISTEN pulp_worker_cancel")
678+
self.cursor.execute("UNLISTEN pulp_worker_broadcast")
641679
self.shutdown()

0 commit comments

Comments
 (0)