Skip to content

Commit d6df41b

Browse files
committed
Refactor listen/notify as a pubsub backend
1 parent b29f775 commit d6df41b

File tree

1 file changed

+97
-30
lines changed

1 file changed

+97
-30
lines changed

pulpcore/tasking/worker.py

Lines changed: 97 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,70 @@
5656
THRESHOLD_UNBLOCKED_WAITING_TIME = 5
5757

5858

59+
class BasePubSubBackend:
60+
WORKER_WAKEUP = "pulp_worker_wakeup"
61+
TASK_CANCELLATION = "pulp_worker_cancel"
62+
WORKER_METRIC = "pulp_worker_metrics_heartbeat"
63+
64+
def wakeup_workers(self):
65+
self.publish(self.WORKER_WAKEUP)
66+
67+
def cancel_task(self):
68+
self.publish(self.TASK_CANCELLATION)
69+
70+
def record_worker_metrics(self, now):
71+
self.publish(self.WORKER_METRIC, str(now))
72+
73+
# Specific implementation
74+
def subscribe(self, channel, callback):
75+
raise NotImplementedError()
76+
77+
def unsubscribe(self, channel):
78+
raise NotImplementedError()
79+
80+
def publish(self, channel, message=None):
81+
raise NotImplementedError()
82+
83+
def fileno(self):
84+
"""Add support for being used in select loop."""
85+
raise NotImplementedError()
86+
87+
def fetch(self):
88+
"""Fetch messages new message, if required."""
89+
raise NotImplementedError()
90+
91+
92+
class PostgresPubSub(BasePubSubBackend):
93+
94+
def __init__(self):
95+
self.cursor = connection.cursor()
96+
self.listening_callback = {}
97+
98+
def _notification_handler(self, notification):
99+
callback = self.listening_callback[notification.channel]
100+
callback(message=notification.payload)
101+
102+
def subscribe(self, channel, callback):
103+
self.listening_callback[channel] = callback
104+
self.cursor.execute(f"LISTEN {channel}")
105+
connection.connection.add_notify_handler(self._notification_handler)
106+
107+
def unsubscribe(self, channel):
108+
self.cursor.execute(f"UNLISTEN {channel}")
109+
110+
def publish(self, channel, message=None):
111+
if not message:
112+
self.cursor.execute(f"NOTIFY {channel}")
113+
else:
114+
self.cursor.execute(f"NOTIFY {channel}, {message}")
115+
116+
def fileno(self):
117+
return connection.connection.fileno()
118+
119+
def fetch(self):
120+
connection.connection.execute("SELECT 1")
121+
122+
59123
class PulpcoreWorker:
60124
def __init__(self):
61125
# Notification states from several signal handlers
@@ -68,7 +132,7 @@ def __init__(self):
68132
self.heartbeat_period = timedelta(seconds=settings.WORKER_TTL / 3)
69133
self.last_metric_heartbeat = timezone.now()
70134
self.versions = {app.label: app.version for app in pulp_plugin_configs()}
71-
self.cursor = connection.cursor()
135+
self.pubsub_backend = PostgresPubSub()
72136
self.worker = self.handle_worker_heartbeat()
73137
# This defaults to immediate task cancellation.
74138
# It will be set into the future on moderately graceful worker shutdown,
@@ -123,15 +187,6 @@ def _signal_handler(self, thesignal, frame):
123187
)
124188
self.shutdown_requested = True
125189

126-
def _pg_notify_handler(self, notification):
127-
if notification.channel == "pulp_worker_wakeup":
128-
self.wakeup = True
129-
elif notification.channel == "pulp_worker_metrics_heartbeat":
130-
self.last_metric_heartbeat = datetime.fromisoformat(notification.payload)
131-
elif self.task and notification.channel == "pulp_worker_cancel":
132-
if notification.payload == str(self.task.pk):
133-
self.cancel_task = True
134-
135190
def handle_worker_heartbeat(self):
136191
"""
137192
Create or update worker heartbeat records.
@@ -190,9 +245,6 @@ def beat(self):
190245
# to be able to report on a congested tasking system to produce reliable results.
191246
self.record_unblocked_waiting_tasks_metric()
192247

193-
def notify_workers(self):
194-
self.cursor.execute("NOTIFY pulp_worker_wakeup")
195-
196248
def cancel_abandoned_task(self, task, final_state, reason=None):
197249
"""Cancel and clean up an abandoned task.
198250
@@ -225,7 +277,7 @@ def cancel_abandoned_task(self, task, final_state, reason=None):
225277
delete_incomplete_resources(task)
226278
task.set_canceled(final_state=final_state, reason=reason)
227279
if task.reserved_resources_record:
228-
self.notify_workers()
280+
self.pubsub_backend.wakeup_workers()
229281
return True
230282

231283
def is_compatible(self, task):
@@ -369,11 +421,11 @@ def sleep(self):
369421
_logger.debug(_("Worker %s entering sleep state."), self.name)
370422
while not self.shutdown_requested and not self.wakeup:
371423
r, w, x = select.select(
372-
[self.sentinel, connection.connection], [], [], self.heartbeat_period.seconds
424+
[self.sentinel, self.pubsub_backend], [], [], self.heartbeat_period.seconds
373425
)
374426
self.beat()
375-
if connection.connection in r:
376-
connection.connection.execute("SELECT 1")
427+
if self.pubsub_backend in r:
428+
self.pubsub_backend.fetch()
377429
if self.sentinel in r:
378430
os.read(self.sentinel, 256)
379431
self.wakeup = False
@@ -409,14 +461,14 @@ def supervise_task(self, task):
409461
os.kill(task_process.pid, signal.SIGUSR1)
410462

411463
r, w, x = select.select(
412-
[self.sentinel, connection.connection, task_process.sentinel],
464+
[self.sentinel, self.pubsub_backend, task_process.sentinel],
413465
[],
414466
[],
415467
self.heartbeat_period.seconds,
416468
)
417469
self.beat()
418-
if connection.connection in r:
419-
connection.connection.execute("SELECT 1")
470+
if self.pubsub_backend in r:
471+
self.pubsub_backend.fetch()
420472
if self.cancel_task:
421473
_logger.info(
422474
_("Received signal to cancel current task %s in domain: %s."),
@@ -472,7 +524,7 @@ def supervise_task(self, task):
472524
if cancel_state:
473525
self.cancel_abandoned_task(task, cancel_state, cancel_reason)
474526
if task.reserved_resources_record:
475-
self.notify_workers()
527+
self.pubsub_backend.wakeup_workers()
476528
self.task = None
477529

478530
def handle_available_tasks(self):
@@ -529,21 +581,38 @@ def _record_unblocked_waiting_tasks_metric(self):
529581
unblocked_tasks_stats["longest_unblocked_waiting_time"].seconds
530582
)
531583

532-
self.cursor.execute(f"NOTIFY pulp_worker_metrics_heartbeat, '{str(now)}'")
584+
self.pubsub_backend.record_worker_metrics(str(now))
585+
586+
def pubsub_setup(self):
587+
def cancellation_callback(message):
588+
if message == str(self.task.pk):
589+
self.cancel_task = True
590+
591+
def wakeup_callback(message):
592+
self.wakeup = True
593+
594+
def metric_callback(message):
595+
self.last_metric_heartbeat = datetime.fromisoformat(message)
596+
597+
self.pubsub_backend.subscribe(self.pubsub_backend.WORKER_WAKEUP, cancellation_callback)
598+
self.pubsub_backend.subscribe(self.pubsub_backend.TASK_CANCELLATION, wakeup_callback)
599+
self.pubsub_backend.subscribe(self.pubsub_backend.WORKER_METRIC, metric_callback)
600+
601+
def pubsub_teardown(self):
602+
self.pubsub_backend.unsubscribe(self.pubsub_backend.WORKER_WAKEUP)
603+
self.pubsub_backend.unsubscribe(self.pubsub_backend.TASK_CANCELLATION)
604+
self.pubsub_backend.unsubscribe(self.pubsub_backend.WORKER_METRIC)
533605

534606
def run(self, burst=False):
535607
with WorkerDirectory(self.name):
536608
signal.signal(signal.SIGINT, self._signal_handler)
537609
signal.signal(signal.SIGTERM, self._signal_handler)
538610
signal.signal(signal.SIGHUP, self._signal_handler)
539-
# Subscribe to pgsql channels
540-
connection.connection.add_notify_handler(self._pg_notify_handler)
541-
self.cursor.execute("LISTEN pulp_worker_cancel")
542-
self.cursor.execute("LISTEN pulp_worker_metrics_heartbeat")
611+
self.pubsub_setup()
543612
if burst:
613+
self.pubsub_backend.unsubscribe(self.pubsub_backend.WORKER_WAKEUP)
544614
self.handle_available_tasks()
545615
else:
546-
self.cursor.execute("LISTEN pulp_worker_wakeup")
547616
while not self.shutdown_requested:
548617
# do work
549618
if self.shutdown_requested:
@@ -553,7 +622,5 @@ def run(self, burst=False):
553622
break
554623
# rest until notified to wakeup
555624
self.sleep()
556-
self.cursor.execute("UNLISTEN pulp_worker_wakeup")
557-
self.cursor.execute("UNLISTEN pulp_worker_metrics_heartbeat")
558-
self.cursor.execute("UNLISTEN pulp_worker_cancel")
625+
self.pubsub_teardown()
559626
self.shutdown()

0 commit comments

Comments
 (0)