Skip to content

Commit 3e37faa

Browse files
committed
wip
1 parent fcd365e commit 3e37faa

File tree

3 files changed

+22
-16
lines changed

3 files changed

+22
-16
lines changed

scheduler/views/queue_views.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from django.contrib.admin.views.decorators import staff_member_required
77
from django.http import HttpResponse, HttpRequest, HttpResponseNotFound, JsonResponse
88
from django.shortcuts import render, redirect
9-
from django.urls import reverse
9+
from django.urls import reverse, resolve
1010
from django.utils.http import url_has_allowed_host_and_scheme
1111
from django.views.decorators.cache import never_cache
1212

@@ -105,10 +105,7 @@ def clear_queue_registry(request: HttpRequest, queue_name: str, registry_name: s
105105
registry = queue.get_registry(registry_name)
106106
if registry is None:
107107
return HttpResponseNotFound()
108-
109-
next_url = request.META.get("HTTP_REFERER") or reverse("queue_registry_jobs", args=[queue_name, registry_name])
110-
if not url_has_allowed_host_and_scheme(next_url, allowed_hosts=None):
111-
messages.warning(request, "Bad followup URL")
108+
next_url = _check_next_url(request, reverse("queue_registry_jobs", args=[queue_name, registry_name]))
112109
if request.method == "POST":
113110
try:
114111
if registry is queue:
@@ -204,15 +201,23 @@ def queue_confirm_action(request: HttpRequest, queue_name: str) -> HttpResponse:
204201
return redirect(next_url)
205202

206203

204+
def _check_next_url(request: HttpRequest, default_next_url: str) -> str:
205+
next_url = request.POST.get("next_url", default_next_url)
206+
if not url_has_allowed_host_and_scheme(next_url, allowed_hosts=None):
207+
messages.warning(request, "Bad followup URL")
208+
try:
209+
resolve(next_url)
210+
except Exception:
211+
messages.warning(request, "Bad followup URL")
212+
next_url = default_next_url
213+
return next_url
214+
215+
207216
@never_cache
208217
@staff_member_required
209218
def queue_actions(request: HttpRequest, queue_name: str) -> HttpResponse:
210219
queue = get_queue(queue_name)
211-
next_url = request.POST.get("next_url") or reverse("queue_registry_jobs", args=[queue_name, "queued"])
212-
if not url_has_allowed_host_and_scheme(next_url, allowed_hosts=None):
213-
messages.warning(request, "Bad followup URL")
214-
next_url = reverse("queue_registry_jobs", args=[queue_name, "queued"])
215-
220+
next_url = _check_next_url(request, reverse("queue_registry_jobs", args=[queue_name, "queued"]))
216221
action = request.POST.get("action", False)
217222
job_names = request.POST.get("job_names", False)
218223
if request.method != "POST" or not action or not job_names:

scheduler/worker/scheduler.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from datetime import datetime
66
from enum import Enum
77
from multiprocessing import Process
8+
from threading import Thread
89
from typing import List, Set, Optional, Sequence, Dict
910

1011
import django
@@ -52,7 +53,7 @@ def __init__(
5253
self.interval = interval
5354
self._stop_requested = False
5455
self._status = SchedulerStatus.STOPPED
55-
self._process = None
56+
self._thread = None
5657
self._pid: Optional[int] = None
5758
self.worker_name = worker_name
5859

@@ -96,8 +97,8 @@ def start(self, burst=False) -> None:
9697
self.release_locks()
9798
return
9899
self._status = SchedulerStatus.STARTED
99-
self._process = Process(target=run_scheduler, args=(self,), name="Scheduler")
100-
self._process.start()
100+
self._thread = Thread(target=run_scheduler, args=(self,), name="scheduler-thread")
101+
self._thread.start()
101102

102103
def _install_signal_handlers(self):
103104
"""Installs signal handlers for handling SIGINT and SIGTERM"""

scheduler/worker/worker.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ def dequeue_job_and_maintain_ttl(
401401

402402
self._model.set_field("state", WorkerStatus.IDLE, connection=self.connection)
403403
self.procline(f"Listening on {qnames}")
404-
logger.debug(f"*** Worker {self.name} listening on {qnames}...")
404+
logger.debug(f"[Worker {self.name}/{self._pid}]: listening on {qnames}...")
405405
connection_wait_time = 1.0
406406
idle_since = utcnow()
407407
idle_time_left = max_idle_time
@@ -593,8 +593,8 @@ def stop_scheduler(self):
593593
os.kill(self.scheduler.pid, signal.SIGTERM)
594594
except OSError:
595595
pass
596-
if self.scheduler._process is not None:
597-
self.scheduler._process.join()
596+
if self.scheduler._thread is not None:
597+
self.scheduler._thread.join()
598598

599599
def refresh(self, update_queues: bool = False):
600600
"""Refreshes the worker data.

0 commit comments

Comments
 (0)