Skip to content

Commit 67ba8c6

Browse files
Enable dispatcherd under feature flag 1st iteration - AAP-46009 (#1308)
Initial minimal implementation of dispatcherd under the feature flag. Current rq deployment tested Still missing tests for dispatcherd and proxies that we are going to address in later iterations as part of https://issues.redhat.com/browse/AAP-46008 Dispatcherd is widely tested as well but not guaranteed a this point. ~WE NEED TO CONFIRM THE AVAILABILITY OF THE RPM PRIOR TO MERGE THIS~, (tracked internally in https://issues.redhat.com/browse/AAP-45706) Update: RPM done. Jira: https://issues.redhat.com/browse/AAP-46009 --------- Signed-off-by: Alex <[email protected]> Co-authored-by: Bill Wei <[email protected]>
1 parent 049def3 commit 67ba8c6

30 files changed

+723
-260
lines changed

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,7 @@ Taskfile.yaml
2727
venv/
2828
.coverage*
2929
coverage.xml
30-
test-results.xml
30+
test-results.xml
31+
32+
# Dispatcherd development
33+
dispatcherd/

poetry.lock

Lines changed: 104 additions & 72 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ cryptography = ">=42,<43"
4747
kubernetes = "26.1.*"
4848
podman = "5.4.*"
4949
rq-scheduler = "^0.10"
50-
django-ansible-base = { git = "https://github.com/ansible/django-ansible-base.git", tag = "2025.3.7", extras = [
50+
django-ansible-base = { git = "https://github.com/ansible/django-ansible-base.git", tag = "2025.5.8", extras = [
5151
"channel-auth",
5252
"rbac",
5353
"redis-client",
@@ -70,6 +70,8 @@ validators = "^0.34.0"
7070
django-flags = "^5.0.13"
7171
insights-analytics-collector = "^0.3.2"
7272
distro = "^1.9.0"
73+
dispatcherd = { version = "v2025.05.19", extras = ["pg_notify"] }
74+
7375

7476
[tool.poetry.group.test.dependencies]
7577
pytest = "*"

src/aap_eda/api/views/eda_credential.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ def partial_update(self, request, pk):
214214
eda_credential.refresh_from_db()
215215
new_interval = get_analytics_interval_if_exist(eda_credential)
216216
if new_interval != old_interval:
217-
reschedule_gather_analytics.delay()
217+
reschedule_gather_analytics()
218218
return Response(
219219
serializers.EdaCredentialSerializer(eda_credential).data,
220220
)
@@ -307,7 +307,7 @@ def _create_eda_credential(self, request, data):
307307
)
308308

309309
if get_analytics_interval_if_exist(response) > 0:
310-
reschedule_gather_analytics.delay()
310+
reschedule_gather_analytics()
311311

312312
return Response(
313313
serializers.EdaCredentialSerializer(response).data,

src/aap_eda/api/views/mixins.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
from aap_eda.api import exceptions as api_exc
2828
from aap_eda.core import tasking
29+
from aap_eda.settings import features
2930

3031

3132
# TODO: need revisit from cuwater
@@ -147,5 +148,8 @@ def redis_is_available(
147148
self,
148149
message: Optional[str] = "Redis is required but unavailable.",
149150
):
151+
# dispatcherd does not need redis
152+
if features.DISPATCHERD:
153+
return True
150154
if tasking.is_redis_failed():
151155
raise api_exc.Conflict(message)

src/aap_eda/api/views/project.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -135,15 +135,14 @@ def create(self, request):
135135
request.user, serializer.instance
136136
)
137137

138-
job = tasks.import_project.delay(project_id=project.id)
138+
job_id = tasks.import_project(project.id)
139139
except redis.ConnectionError:
140140
return RedisDependencyMixin.redis_unavailable_response()
141141

142-
# Atomically update `import_task_id` field only.
143142
models.Project.objects.filter(pk=project.id).update(
144-
import_task_id=job.id
143+
import_task_id=job_id
145144
)
146-
project.import_task_id = job.id
145+
project.import_task_id = job_id
147146
serializer = self.get_serializer(project)
148147
headers = self.get_success_headers(serializer.data)
149148
logger.info(
@@ -271,16 +270,22 @@ def sync(self, request, pk):
271270
detail="Project import or sync is already running."
272271
)
273272

273+
# check if redis is available
274274
self.redis_is_available()
275275

276276
try:
277-
job = tasks.sync_project.delay(project_id=project.id)
277+
job_id = tasks.sync_project(project.id)
278278
except redis.ConnectionError:
279279
return RedisDependencyMixin.redis_unavailable_response()
280280

281281
project.import_state = models.Project.ImportState.PENDING
282-
project.import_task_id = job.id
283-
project.import_error = None
282+
283+
# job_id can be none if there is already a task running.
284+
# this is unlikely since we check the state above
285+
# but safety first
286+
if job_id:
287+
project.import_task_id = job_id
288+
284289
project.save()
285290

286291
logger.info(

src/aap_eda/core/apps.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
import logging
1616
import sys
1717

18+
from dispatcherd.config import setup as dispatcher_setup
1819
from django.apps import AppConfig
20+
from django.conf import settings
1921

2022
logger = logging.getLogger(__name__)
2123

@@ -29,8 +31,11 @@ def ready(self):
2931
from aap_eda.api.views import dab_decorate # noqa: F401
3032

3133
# Run the startup logging for rq worker
32-
34+
# WARNING: rqworker can run rq workers or dispatcherd workers
3335
if "rqworker" in sys.argv:
3436
from aap_eda.utils.logging import startup_logging
3537

3638
startup_logging(logger)
39+
40+
# Enable default dispatcher config. Workers may override this
41+
dispatcher_setup(settings.DISPATCHERD_DEFAULT_SETTINGS)
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# Copyright 2025 Red Hat, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import django
16+
from django.core.cache import cache
17+
from django.db import connection
18+
19+
"""This module is an optimization for dispatcherd workers
20+
21+
This sets up Django pre-fork, which must be implemented as a module to run
22+
on-import for compatibility with multiprocessing forkserver.
23+
This should never be imported by other modules, which is why it is called
24+
hazmat.
25+
"""
26+
27+
28+
django.setup()
29+
30+
# connections may or may not be open, but
31+
# before forking, all connections should be closed
32+
33+
cache.close()
34+
connection.close()

src/aap_eda/core/management/commands/rqworker.py

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,18 @@
1414

1515
"""Wrapper for rqworker command."""
1616

17+
import logging
18+
19+
from dispatcherd import run_service as run_dispatcherd_service
20+
from dispatcherd.config import setup as dispatcherd_setup
1721
from django.conf import settings
1822
from django.core.management.base import BaseCommand, CommandParser
1923
from django_rq.management.commands import rqworker
2024

2125
from aap_eda.settings import features
2226

27+
logger = logging.getLogger(__name__)
28+
2329

2430
class Command(BaseCommand):
2531
"""Wrapper for rqworker command.
@@ -35,12 +41,33 @@ def add_arguments(self, parser: CommandParser) -> None:
3541

3642
def handle(self, *args, **options) -> None:
3743
if features.DISPATCHERD:
38-
self.stderr.write(
39-
self.style.ERROR(
40-
"DISPATCHERD feature not implemented yet. "
41-
f"Please disable {settings.DISPATCHERD_FEATURE_FLAG_NAME} "
42-
"in your settings.",
43-
)
44+
return self._handle_dispatcherd(*args, **options)
45+
46+
# run rqworker command if dispatcherd is not enabled
47+
logger.info("Starting worker with rqworker.")
48+
return rqworker.Command.handle(self, *args, **options)
49+
50+
def _handle_dispatcherd(self, *args, **options) -> None:
51+
"""Handle dispatcherd service."""
52+
if "worker_class" not in options:
53+
self.style.ERROR("Missing required argument: --worker-class")
54+
raise SystemExit(1)
55+
56+
# Use rqworker expected args to determine worker type
57+
if "ActivationWorker" in options["worker_class"]:
58+
dispatcherd_setup(
59+
settings.DISPATCHERD_ACTIVATION_WORKER_SETTINGS,
60+
)
61+
62+
elif "DefaultWorker" in options["worker_class"]:
63+
dispatcherd_setup(settings.DISPATCHERD_DEFAULT_WORKER_SETTINGS)
64+
else:
65+
self.style.ERROR(
66+
"Invalid worker class. "
67+
"Please use either ActivationWorker or DefaultWorker."
4468
)
4569
raise SystemExit(1)
46-
return rqworker.Command.handle(self, *args, **options)
70+
71+
logger.info("Starting worker with dispatcherd.")
72+
run_dispatcherd_service()
73+
return None

0 commit comments

Comments
 (0)