Skip to content

Commit 637aa34

Browse files
fix: parallelize queue_dispatching in monitor loop - AAP-37345 (#1357)
Try to solve the issue of the bottleneck that supposes the superlinear complexity of the current sequential and no scalable monitor_rulebook loop. Each user request and monitor needs to be queue-dispatched before being processed by the manager. I follow the same approach as for the rest of tasks by implementing a "lock" and "no_lock" versions of `queue_dispatch` and scheduling it per every user and monitor request in the monitor for the default worker. In this way we can parallelize the queue_dispatcher while preserving the consistency, leaving the current monitor loop as a lightweight iterator that schedules all the subsequent tasks. The monitor can scale better in big clusters by just increasing the number of workers. Jira: https://issues.redhat.com/browse/AAP-37345 --------- Signed-off-by: Alex <[email protected]>
1 parent 1246879 commit 637aa34

File tree

2 files changed

+99
-28
lines changed

2 files changed

+99
-28
lines changed

src/aap_eda/tasks/orchestrator.py

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -176,10 +176,34 @@ def _run_request(
176176

177177

178178
def queue_dispatch(
179+
process_parent_type: ProcessParentType,
180+
process_parent_id: int,
181+
request_type: Optional[ActivationRequest] = None,
182+
request_id: str = "",
183+
) -> None: # pragma: no cover
184+
job_id = _manage_process_job_id(process_parent_type, process_parent_id)
185+
with advisory_lock(job_id, wait=False) as acquired:
186+
if not acquired:
187+
LOGGER.debug(
188+
f"queue_dispatch({job_id}) already being ran, "
189+
f"not dispatching request {request_type}",
190+
)
191+
return
192+
queue_dispatch_no_lock(
193+
process_parent_type,
194+
process_parent_id,
195+
request_type,
196+
request_id,
197+
job_id,
198+
)
199+
200+
201+
def queue_dispatch_no_lock(
179202
process_parent_type: ProcessParentType,
180203
process_parent_id: int,
181204
request_type: Optional[ActivationRequest],
182205
request_id: str = "",
206+
job_id: str = "",
183207
):
184208
"""Dispatch the request to the right queue.
185209
@@ -189,8 +213,6 @@ def queue_dispatch(
189213
checks the health of the queue before dispatching the request.
190214
Handles workers offline and unhealthy queues.
191215
"""
192-
job_id = _manage_process_job_id(process_parent_type, process_parent_id)
193-
194216
# TODO: add "monitor" type to ActivationRequestQueue
195217
if request_type is None:
196218
request_type = "Monitor"
@@ -209,14 +231,6 @@ def queue_dispatch(
209231
assign_request_id(request_id)
210232
assign_log_tracking_id(process_parent.log_tracking_id)
211233

212-
with advisory_lock(job_id, wait=False) as acquired:
213-
if not acquired:
214-
LOGGER.debug(
215-
f"_manage({job_id}) already being ran, "
216-
f"not dispatching request {request_type}",
217-
)
218-
return
219-
220234
LOGGER.info(
221235
f"Dispatching request {request_type} for {process_parent_type} "
222236
f"{process_parent_id}",
@@ -549,7 +563,10 @@ def monitor_rulebook_processes_no_lock() -> None:
549563
"""
550564
# run pending user requests
551565
for request in requests_queue.list_requests():
552-
queue_dispatch(
566+
tasking.unique_enqueue(
567+
"default",
568+
"queue_dispatch_" + str(request.process_parent_id),
569+
queue_dispatch,
553570
request.process_parent_type,
554571
request.process_parent_id,
555572
request.request,
@@ -564,12 +581,12 @@ def monitor_rulebook_processes_no_lock() -> None:
564581
ActivationStatus.WORKERS_OFFLINE,
565582
]
566583
):
567-
process_parent_type = str(process.parent_type)
568-
process_parent_id = process.activation_id
569-
570-
queue_dispatch(
571-
process_parent_type,
572-
process_parent_id,
584+
tasking.unique_enqueue(
585+
"default",
586+
"queue_dispatch_" + str(process.activation_id),
587+
queue_dispatch,
588+
str(process.parent_type),
589+
process.activation_id,
573590
None,
574591
str(uuid.uuid4()),
575592
)

tests/integration/tasks/test_orchestrator.py

Lines changed: 65 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
# limitations under the License.
1414

1515
import logging
16+
import time
17+
from concurrent.futures import ThreadPoolExecutor
1618
from contextlib import contextmanager
1719
from unittest import mock
1820

@@ -202,26 +204,24 @@ def test_monitor_rulebook_processes(
202204
get_queue_name_mock.return_value = "activation"
203205
call_args = [
204206
mock.call(
205-
"activation",
206-
orchestrator._manage_process_job_id(
207-
ProcessParentType.ACTIVATION, activation.id
208-
),
209-
orchestrator._manage,
207+
"default",
208+
"queue_dispatch_" + str(activation.id),
209+
orchestrator.queue_dispatch,
210210
ProcessParentType.ACTIVATION,
211211
activation.id,
212+
ActivationRequest.START,
212213
"",
213214
)
214215
]
215216
for running in bulk_running_processes:
216217
call_args.append(
217218
mock.call(
218-
"activation",
219-
orchestrator._manage_process_job_id(
220-
ProcessParentType.ACTIVATION, running.activation.id
221-
),
222-
orchestrator._manage,
219+
"default",
220+
"queue_dispatch_" + str(running.activation.id),
221+
orchestrator.queue_dispatch,
223222
ProcessParentType.ACTIVATION,
224223
running.activation.id,
224+
ActivationRequest.START,
225225
"",
226226
)
227227
)
@@ -237,6 +237,21 @@ def test_monitor_rulebook_processes(
237237
)
238238
orchestrator.monitor_rulebook_processes()
239239

240+
# Also expect calls for running processes
241+
# (these will have None as request type)
242+
for running in bulk_running_processes:
243+
call_args.append(
244+
mock.call(
245+
"default",
246+
"queue_dispatch_" + str(running.activation.id),
247+
orchestrator.queue_dispatch,
248+
str(running.parent_type),
249+
running.activation.id,
250+
None,
251+
mock.ANY, # UUID string
252+
)
253+
)
254+
240255
enqueue_mock.assert_has_calls(call_args, any_order=True)
241256

242257

@@ -268,6 +283,45 @@ def advisory_lock_mock(*args, **kwargs):
268283
)
269284

270285
enqueue_mock.assert_not_called()
271-
assert f"_manage({job_id}) already being ran, " in eda_caplog.text
286+
assert (
287+
f"queue_dispatch({job_id}) already being ran, " in eda_caplog.text
288+
)
272289
activation.refresh_from_db()
273290
assert activation.status == ActivationStatus.STOPPED
291+
292+
293+
@pytest.mark.django_db
294+
def test_queue_dispatch_advisory_lock(activation, eda_caplog):
295+
"""Test that queue_dispatch advisory lock prevents duplicate execution."""
296+
execution_count = 0
297+
298+
def mock_queue_dispatch_no_lock(*args, **kwargs):
299+
nonlocal execution_count
300+
execution_count += 1
301+
time.sleep(1.0)
302+
return True
303+
304+
def concurrent_dispatch():
305+
"""Function to run queue_dispatch concurrently."""
306+
with mock.patch(
307+
"aap_eda.tasks.orchestrator.queue_dispatch_no_lock",
308+
side_effect=mock_queue_dispatch_no_lock,
309+
):
310+
orchestrator.queue_dispatch(
311+
ProcessParentType.ACTIVATION,
312+
activation.id,
313+
ActivationRequest.START,
314+
"test-request",
315+
)
316+
317+
with ThreadPoolExecutor(max_workers=3) as executor:
318+
futures = [executor.submit(concurrent_dispatch) for _ in range(3)]
319+
for future in futures:
320+
future.result()
321+
322+
assert execution_count == 1, f"Expected 1 execution, got {execution_count}"
323+
324+
assert (
325+
"queue_dispatch" in eda_caplog.text
326+
and "already being ran" in eda_caplog.text
327+
)

0 commit comments

Comments
 (0)