Skip to content

Commit fd3d0ce

Browse files
authored
Revert "fix: parallelize queue_dispatching in monitor loop #1357 (#1392)
Reverts commit 637aa34. https://issues.redhat.com/browse/AAP-52854 The previous commit may cause multiple failures in ATF Tier3 tests. Let's rollback this change for now and rework it with more thorough tests.
1 parent 0289064 commit fd3d0ce

File tree

2 files changed

+28
-99
lines changed

2 files changed

+28
-99
lines changed

src/aap_eda/tasks/orchestrator.py

Lines changed: 17 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -176,34 +176,10 @@ def _run_request(
176176

177177

178178
def queue_dispatch(
179-
process_parent_type: ProcessParentType,
180-
process_parent_id: int,
181-
request_type: Optional[ActivationRequest] = None,
182-
request_id: str = "",
183-
) -> None: # pragma: no cover
184-
job_id = _manage_process_job_id(process_parent_type, process_parent_id)
185-
with advisory_lock(job_id, wait=False) as acquired:
186-
if not acquired:
187-
LOGGER.debug(
188-
f"queue_dispatch({job_id}) already being ran, "
189-
f"not dispatching request {request_type}",
190-
)
191-
return
192-
queue_dispatch_no_lock(
193-
process_parent_type,
194-
process_parent_id,
195-
request_type,
196-
request_id,
197-
job_id,
198-
)
199-
200-
201-
def queue_dispatch_no_lock(
202179
process_parent_type: ProcessParentType,
203180
process_parent_id: int,
204181
request_type: Optional[ActivationRequest],
205182
request_id: str = "",
206-
job_id: str = "",
207183
):
208184
"""Dispatch the request to the right queue.
209185
@@ -213,6 +189,8 @@ def queue_dispatch_no_lock(
213189
checks the health of the queue before dispatching the request.
214190
Handles workers offline and unhealthy queues.
215191
"""
192+
job_id = _manage_process_job_id(process_parent_type, process_parent_id)
193+
216194
# TODO: add "monitor" type to ActivationRequestQueue
217195
if request_type is None:
218196
request_type = "Monitor"
@@ -231,6 +209,14 @@ def queue_dispatch_no_lock(
231209
assign_request_id(request_id)
232210
assign_log_tracking_id(process_parent.log_tracking_id)
233211

212+
with advisory_lock(job_id, wait=False) as acquired:
213+
if not acquired:
214+
LOGGER.debug(
215+
f"_manage({job_id}) already being ran, "
216+
f"not dispatching request {request_type}",
217+
)
218+
return
219+
234220
LOGGER.info(
235221
f"Dispatching request {request_type} for {process_parent_type} "
236222
f"{process_parent_id}",
@@ -563,10 +549,7 @@ def monitor_rulebook_processes_no_lock() -> None:
563549
"""
564550
# run pending user requests
565551
for request in requests_queue.list_requests():
566-
tasking.unique_enqueue(
567-
"default",
568-
"queue_dispatch_" + str(request.process_parent_id),
569-
queue_dispatch,
552+
queue_dispatch(
570553
request.process_parent_type,
571554
request.process_parent_id,
572555
request.request,
@@ -581,12 +564,12 @@ def monitor_rulebook_processes_no_lock() -> None:
581564
ActivationStatus.WORKERS_OFFLINE,
582565
]
583566
):
584-
tasking.unique_enqueue(
585-
"default",
586-
"queue_dispatch_" + str(process.activation_id),
587-
queue_dispatch,
588-
str(process.parent_type),
589-
process.activation_id,
567+
process_parent_type = str(process.parent_type)
568+
process_parent_id = process.activation_id
569+
570+
queue_dispatch(
571+
process_parent_type,
572+
process_parent_id,
590573
None,
591574
str(uuid.uuid4()),
592575
)

tests/integration/tasks/test_orchestrator.py

Lines changed: 11 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@
1313
# limitations under the License.
1414

1515
import logging
16-
import time
17-
from concurrent.futures import ThreadPoolExecutor
1816
from contextlib import contextmanager
1917
from unittest import mock
2018

@@ -204,24 +202,26 @@ def test_monitor_rulebook_processes(
204202
get_queue_name_mock.return_value = "activation"
205203
call_args = [
206204
mock.call(
207-
"default",
208-
"queue_dispatch_" + str(activation.id),
209-
orchestrator.queue_dispatch,
205+
"activation",
206+
orchestrator._manage_process_job_id(
207+
ProcessParentType.ACTIVATION, activation.id
208+
),
209+
orchestrator._manage,
210210
ProcessParentType.ACTIVATION,
211211
activation.id,
212-
ActivationRequest.START,
213212
"",
214213
)
215214
]
216215
for running in bulk_running_processes:
217216
call_args.append(
218217
mock.call(
219-
"default",
220-
"queue_dispatch_" + str(running.activation.id),
221-
orchestrator.queue_dispatch,
218+
"activation",
219+
orchestrator._manage_process_job_id(
220+
ProcessParentType.ACTIVATION, running.activation.id
221+
),
222+
orchestrator._manage,
222223
ProcessParentType.ACTIVATION,
223224
running.activation.id,
224-
ActivationRequest.START,
225225
"",
226226
)
227227
)
@@ -237,21 +237,6 @@ def test_monitor_rulebook_processes(
237237
)
238238
orchestrator.monitor_rulebook_processes()
239239

240-
# Also expect calls for running processes
241-
# (these will have None as request type)
242-
for running in bulk_running_processes:
243-
call_args.append(
244-
mock.call(
245-
"default",
246-
"queue_dispatch_" + str(running.activation.id),
247-
orchestrator.queue_dispatch,
248-
str(running.parent_type),
249-
running.activation.id,
250-
None,
251-
mock.ANY, # UUID string
252-
)
253-
)
254-
255240
enqueue_mock.assert_has_calls(call_args, any_order=True)
256241

257242

@@ -283,45 +268,6 @@ def advisory_lock_mock(*args, **kwargs):
283268
)
284269

285270
enqueue_mock.assert_not_called()
286-
assert (
287-
f"queue_dispatch({job_id}) already being ran, " in eda_caplog.text
288-
)
271+
assert f"_manage({job_id}) already being ran, " in eda_caplog.text
289272
activation.refresh_from_db()
290273
assert activation.status == ActivationStatus.STOPPED
291-
292-
293-
@pytest.mark.django_db
294-
def test_queue_dispatch_advisory_lock(activation, eda_caplog):
295-
"""Test that queue_dispatch advisory lock prevents duplicate execution."""
296-
execution_count = 0
297-
298-
def mock_queue_dispatch_no_lock(*args, **kwargs):
299-
nonlocal execution_count
300-
execution_count += 1
301-
time.sleep(1.0)
302-
return True
303-
304-
def concurrent_dispatch():
305-
"""Function to run queue_dispatch concurrently."""
306-
with mock.patch(
307-
"aap_eda.tasks.orchestrator.queue_dispatch_no_lock",
308-
side_effect=mock_queue_dispatch_no_lock,
309-
):
310-
orchestrator.queue_dispatch(
311-
ProcessParentType.ACTIVATION,
312-
activation.id,
313-
ActivationRequest.START,
314-
"test-request",
315-
)
316-
317-
with ThreadPoolExecutor(max_workers=3) as executor:
318-
futures = [executor.submit(concurrent_dispatch) for _ in range(3)]
319-
for future in futures:
320-
future.result()
321-
322-
assert execution_count == 1, f"Expected 1 execution, got {execution_count}"
323-
324-
assert (
325-
"queue_dispatch" in eda_caplog.text
326-
and "already being ran" in eda_caplog.text
327-
)

0 commit comments

Comments
 (0)