Skip to content

Commit c02517d

Browse files
committed
fix:registries do not keep connections
1 parent a914987 commit c02517d

File tree

11 files changed

+77
-84
lines changed

11 files changed

+77
-84
lines changed

scheduler/helpers/queues/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
"InvalidJobOperation",
44
"get_queue",
55
"get_all_workers",
6-
"perform_job",
6+
"queue_perform_job",
77
]
88

99
from .getters import get_queue, get_all_workers
10-
from .queue_logic import Queue, InvalidJobOperation, perform_job
10+
from .queue_logic import Queue, InvalidJobOperation, queue_perform_job

scheduler/helpers/queues/getters.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ def _get_connection(config: QueueConfiguration, use_strict_broker: bool = False)
4646
)
4747

4848

49+
def get_queue_connection(queue_name: str) -> ConnectionType:
50+
queue_settings = get_queue_configuration(queue_name)
51+
connection = _get_connection(queue_settings)
52+
return connection
53+
54+
4955
def get_queue(name: str = "default") -> Queue:
5056
"""Returns an DjangoQueue using parameters defined in `SCHEDULER_QUEUES`"""
5157
queue_settings = get_queue_configuration(name)

scheduler/helpers/queues/queue_logic.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class NoSuchRegistryError(Exception):
3434
pass
3535

3636

37-
def perform_job(job_model: JobModel, connection: ConnectionType) -> Any: # noqa
37+
def queue_perform_job(job_model: JobModel, connection: ConnectionType) -> Any: # noqa
3838
"""The main execution method. Invokes the job function with the job arguments.
3939
4040
:returns: The job's return value
@@ -271,7 +271,7 @@ def run_sync(self, job: JobModel) -> JobModel:
271271
"""Run a job synchronously, meaning on the same process the method was called."""
272272
job.prepare_for_execution("sync", self.active_job_registry, self.connection)
273273
try:
274-
result = perform_job(job, self.connection)
274+
result = queue_perform_job(job, self.connection)
275275
self.job_handle_success(job, result=result, job_info_ttl=job.job_info_ttl, result_ttl=job.success_ttl)
276276
except Exception as e: # noqa
277277
logger.warning(f"Job {job.name} failed with exception: {e}")

scheduler/helpers/sentry_integration.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def setup_once() -> None:
3232
version = parse_version(scheduler.__version__)
3333
_check_minimum_version(SentryIntegration, version)
3434

35-
old_perform_job = Worker.perform_job
35+
old_perform_job = Worker.worker_perform_job
3636

3737
@ensure_integration_enabled(SentryIntegration, old_perform_job)
3838
def sentry_patched_perform_job(self: Any, job_model: JobModel, *args: Queue, **kwargs: Any) -> bool:
@@ -65,7 +65,7 @@ def sentry_patched_perform_job(self: Any, job_model: JobModel, *args: Queue, **k
6565

6666
return rv
6767

68-
Worker.perform_job = sentry_patched_perform_job # type: ignore[method-assign]
68+
Worker.worker_perform_job = sentry_patched_perform_job # type: ignore[method-assign]
6969

7070
old_handle_exception = Worker.handle_exception
7171

scheduler/redis_models/registry/base_registry.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ def key(self) -> str:
8686

8787
@classmethod
8888
def pop(
89-
cls, connection: ConnectionType, registries: Sequence[Self], timeout: Optional[int]
89+
cls, connection: ConnectionType, registries: Sequence[Self], timeout: Optional[int]
9090
) -> Tuple[Optional[str], Optional[str]]:
9191
"""Helper method to abstract away from some Redis API details
9292

scheduler/redis_models/worker.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,10 @@ def set_current_job_working_time(self, job_execution_time: float, connection: Co
8484
self.set_field("current_job_working_time", job_execution_time, connection=connection)
8585

8686
def heartbeat(self, connection: ConnectionType, timeout: Optional[int] = None) -> None:
87-
with connection.pipeline() as pipeline:
88-
timeout = timeout or DEFAULT_WORKER_TTL + 60
89-
pipeline.expire(self._key, timeout)
90-
now = utcnow()
91-
self.set_field("last_heartbeat", now, connection=pipeline)
92-
pipeline.execute()
87+
self.last_heartbeat = utcnow()
88+
self.save(connection)
89+
timeout = timeout or DEFAULT_WORKER_TTL + 60
90+
connection.expire(self._key, timeout)
9391
logger.debug(f"Next heartbeat for worker {self._key} should arrive in {timeout} seconds.")
9492

9593
@classmethod

scheduler/templates/admin/scheduler/jobs.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@
111111
<td>
112112
{{ job.ended_at|date:"Y-m-d, H:i:s" }}
113113
</td>
114-
<td>{{ job.get_status }}</td>
114+
<td>{{ job.status }}</td>
115115
<td>{{ job|show_func_name }}</td>
116116
<td>{{ job.worker_name|default:'-' }}</td>
117117
{% block extra_columns_values %}

scheduler/tests/test_task_types/test_task_model.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
from scheduler import settings
1212
from scheduler.helpers.queues import get_queue
13-
from scheduler.helpers.queues import perform_job
13+
from scheduler.helpers.queues import queue_perform_job
1414
from scheduler.models import TaskType, Task, TaskArg, TaskKwarg, run_task
1515
from scheduler.redis_models import JobStatus, JobModel
1616
from scheduler.tests import jobs, conf # noqa
@@ -191,13 +191,13 @@ def test_callable_result(self):
191191
task = task_factory(self.task_type)
192192
entry = _get_task_scheduled_job_from_registry(task)
193193
queue = get_queue("default")
194-
self.assertEqual(perform_job(entry, connection=queue.connection), 2)
194+
self.assertEqual(queue_perform_job(entry, connection=queue.connection), 2)
195195

196196
def test_callable_empty_args_and_kwargs(self):
197197
task = task_factory(self.task_type, callable="scheduler.tests.jobs.test_args_kwargs")
198198
entry = _get_task_scheduled_job_from_registry(task)
199199
queue = get_queue("default")
200-
self.assertEqual(perform_job(entry, connection=queue.connection), "test_args_kwargs()")
200+
self.assertEqual(queue_perform_job(entry, connection=queue.connection), "test_args_kwargs()")
201201

202202
def test_delete_args(self):
203203
task = task_factory(self.task_type)
@@ -244,7 +244,7 @@ def test_callable_args_and_kwargs(self):
244244
entry = _get_task_scheduled_job_from_registry(task)
245245
queue = get_queue("default")
246246
self.assertEqual(
247-
perform_job(entry, connection=queue.connection),
247+
queue_perform_job(entry, connection=queue.connection),
248248
f"test_args_kwargs('one', key1=2, key2={date}, key3=False)",
249249
)
250250

scheduler/tests/test_views/test_queue_actions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,4 +106,4 @@ def test_job_list_action_stop_jobs__move_to_finished_registry(self):
106106
self.assertEqual(len(job_names), queue.finished_job_registry.count(queue.connection))
107107

108108
for job_name in job_names:
109-
self.assertTrue(queue.finished_job_registry.exists(queue.connection,job_name))
109+
self.assertTrue(queue.finished_job_registry.exists(queue.connection, job_name))

scheduler/views/queue_views.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,7 @@ def queue_workers(request: HttpRequest, queue_name: str) -> HttpResponse:
8484
def stats_json(request: HttpRequest) -> Union[JsonResponse, HttpResponseNotFound]:
8585
auth_token = request.headers.get("Authorization")
8686
token_validation_func = settings.SCHEDULER_CONFIG.TOKEN_VALIDATION_METHOD
87-
if request.user.is_staff or (
88-
token_validation_func and auth_token and token_validation_func(auth_token)): # type: ignore
87+
if request.user.is_staff or (token_validation_func and auth_token and token_validation_func(auth_token)): # type: ignore
8988
return JsonResponse(get_statistics())
9089

9190
return HttpResponseNotFound()

0 commit comments

Comments
 (0)