Skip to content

Commit 88fc96d

Browse files
committed
Run immediate tasks on main worker process
1 parent 9705500 commit 88fc96d

File tree

2 files changed

+31
-7
lines changed

2 files changed

+31
-7
lines changed

pulpcore/tasking/worker.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
dispatch_scheduled_tasks,
3737
perform_task,
3838
startup_hook,
39+
execute_task,
3940
)
4041

4142
_logger = logging.getLogger(__name__)
@@ -484,7 +485,10 @@ def handle_available_tasks(self):
484485
keep_looping = True
485486
for task in self.iter_tasks():
486487
keep_looping = True
487-
self.supervise_task(task)
488+
if task.immediate:
489+
execute_task(task)
490+
else:
491+
self.supervise_task(task)
488492

489493
def _record_unblocked_waiting_tasks_metric(self):
490494
now = timezone.now()

pulpcore/tests/functional/api/test_tasking.py

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ def test_cancel_task_group(pulpcore_bindings, dispatch_task_group, gen_user):
449449

450450

451451
@pytest.mark.parallel
452-
def test_immediate_task_without_resource_locking(pulpcore_bindings, dispatch_task, monitor_task):
452+
def test_immediate_task_doesnt_require_resource(pulpcore_bindings, dispatch_task, monitor_task):
453453
"""
454454
GIVEN a task with no resource requirements
455455
@@ -495,7 +495,7 @@ def test_immediate_task_without_resource_locking(pulpcore_bindings, dispatch_tas
495495

496496

497497
@pytest.mark.parallel
498-
def test_immediate_task_with_resources_locking(pulpcore_bindings, dispatch_task, monitor_task):
498+
def test_immediate_task_requires_resource(pulpcore_bindings, dispatch_task, monitor_task):
499499
"""
500500
GIVEN an async task requiring busy resources
501501
@@ -508,15 +508,25 @@ def test_immediate_task_with_resources_locking(pulpcore_bindings, dispatch_task,
508508
THEN an error is raised
509509
510510
3
511-
WHEN dispatching as immediate
512511
AND it takes longer than timeout
513512
THEN an error is raised
514513
"""
515514
LT_TIMEOUT = IMMEDIATE_TIMEOUT / 2
516515
GT_TIMEOUT = IMMEDIATE_TIMEOUT + 1
516+
LONG_RUNNING = 5
517+
518+
def wait_until(state, task_href, timeout=10):
519+
for i in range(timeout):
520+
task = pulpcore_bindings.TasksApi.read(task_href)
521+
if task.state != state:
522+
break
523+
time.sleep(1)
517524

518525
# Case 1
519-
dispatch_task("pulpcore.app.tasks.test.sleep", args=(3,), exclusive_resources=["XYZ"])
526+
long_href = dispatch_task(
527+
"pulpcore.app.tasks.test.sleep", args=(LONG_RUNNING,), exclusive_resources=["XYZ"]
528+
)
529+
wait_until("running", long_href)
520530
task_href = dispatch_task(
521531
"pulpcore.app.tasks.test.asleep",
522532
args=(LT_TIMEOUT,),
@@ -528,7 +538,10 @@ def test_immediate_task_with_resources_locking(pulpcore_bindings, dispatch_task,
528538
assert task.worker is not None
529539

530540
# Case 2
531-
dispatch_task("pulpcore.app.tasks.test.sleep", args=(LT_TIMEOUT,), exclusive_resources=["XYZ"])
541+
long_href = dispatch_task(
542+
"pulpcore.app.tasks.test.sleep", args=(LONG_RUNNING,), exclusive_resources=["XYZ"]
543+
)
544+
wait_until("running", long_href)
532545
with pytest.raises(PulpTaskError):
533546
task_href = dispatch_task(
534547
"pulpcore.app.tasks.test.asleep",
@@ -540,9 +553,16 @@ def test_immediate_task_with_resources_locking(pulpcore_bindings, dispatch_task,
540553
monitor_task(task_href)
541554

542555
# Case 3
556+
long_href = dispatch_task(
557+
"pulpcore.app.tasks.test.sleep", args=(LONG_RUNNING,), exclusive_resources=["XYZ"]
558+
)
559+
wait_until("running", long_href)
543560
with pytest.raises(PulpTaskError) as ctx:
544561
task_href = dispatch_task(
545-
"pulpcore.app.tasks.test.asleep", args=(GT_TIMEOUT,), immediate=True
562+
"pulpcore.app.tasks.test.asleep",
563+
args=(GT_TIMEOUT,),
564+
immediate=True,
565+
exclusive_resources=["XYZ"],
546566
)
547567
monitor_task(task_href)
548568
assert "task timed out after" in ctx.value.task.error["description"]

0 commit comments

Comments
 (0)