Skip to content

Commit fe6fd07

Browse files
authored
[Fixes #13382] Harvesting scheduler: Fixing expiration issue of the discovery task (#13383)
[Fixes #13382] adding dynamic expiration time to the discovery subtasks
1 parent 149869f commit fe6fd07

File tree

3 files changed

+108
-52
lines changed

3 files changed

+108
-52
lines changed

geonode/harvesting/models.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ def initiate(self, harvestable_resource_ids: typing.Optional[typing.List[int]] =
368368
raise RuntimeError("Invalid selection")
369369
self.status = self.STATUS_PENDING
370370
self.save()
371-
task_signature.apply_async(args=(), expiration=30)
371+
task_signature.apply_async(args=())
372372

373373
def abort(self):
374374
"""Abort a pending or on-going session."""

geonode/harvesting/tasks.py

Lines changed: 66 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -567,7 +567,6 @@ def check_harvester_available(self, harvester_id: int):
567567
@app.task(
568568
bind=True,
569569
queue="geonode",
570-
expires=30,
571570
time_limit=600,
572571
acks_late=False,
573572
ignore_result=False,
@@ -578,49 +577,75 @@ def update_harvestable_resources(self, refresh_session_id: int):
578577
# of resources because these have potentially been individually selected by the
579578
# user, which means we are not interested in all of them
580579
session = models.AsynchronousHarvestingSession.objects.get(pk=refresh_session_id)
581-
if session.status != session.STATUS_ABORTED:
582-
session.status = session.STATUS_ON_GOING
583-
session.save()
584-
harvester = session.harvester
585-
if harvester.update_availability():
586-
harvester.status = harvester.STATUS_UPDATING_HARVESTABLE_RESOURCES
587-
harvester.save()
588-
worker = harvester.get_harvester_worker()
589-
try:
590-
num_resources = worker.get_num_available_resources()
591-
except (NotImplementedError, base.HarvestingException) as exc:
592-
_handle_harvestable_resources_update_error(
593-
self.request.id, refresh_session_id=refresh_session_id, raised_exception=exc
594-
)
595-
else:
596-
harvester.num_harvestable_resources = num_resources
597-
harvester.save()
598-
session.total_records_to_process = num_resources
599-
session.save()
600-
page_size = 10
601-
total_pages = math.ceil(num_resources / page_size)
602-
batches = []
603-
for page in range(total_pages):
604-
batches.append(
605-
_update_harvestable_resources_batch.signature(
606-
args=(refresh_session_id, page, page_size),
607-
)
608-
)
609-
update_finalizer = _finish_harvestable_resources_update.signature(
610-
args=(refresh_session_id,), immutable=True
611-
).on_error(
612-
_handle_harvestable_resources_update_error.signature(
613-
kwargs={"refresh_session_id": refresh_session_id}
614-
)
615-
)
616-
update_workflow = chord(batches, body=update_finalizer)
617-
update_workflow.apply_async(args=(), expiration=30)
618-
else:
580+
581+
if session.status == session.STATUS_ABORTED:
582+
logger.debug("Session has been aborted, skipping...")
583+
return
584+
585+
session.status = session.STATUS_ON_GOING
586+
session.save()
587+
588+
harvester = session.harvester
589+
590+
try:
591+
592+
if not harvester.update_availability():
619593
finish_asynchronous_session(
620594
refresh_session_id, session.STATUS_FINISHED_ALL_FAILED, final_details="Harvester is not available"
621595
)
622-
else:
623-
logger.debug("Session has been aborted, skipping...")
596+
return
597+
598+
harvester.status = harvester.STATUS_UPDATING_HARVESTABLE_RESOURCES
599+
harvester.save()
600+
601+
worker = harvester.get_harvester_worker()
602+
try:
603+
num_resources = worker.get_num_available_resources()
604+
# definition of the dynamic expiration time
605+
task_expiration_time = calculate_dynamic_expiration(num_resources)
606+
except (NotImplementedError, base.HarvestingException) as exc:
607+
logger.exception("Failed to get number of available resources for harvester %s", harvester.id)
608+
details = f"Failed to get number of available resources: {exc}"
609+
finish_asynchronous_session(
610+
refresh_session_id,
611+
models.AsynchronousHarvestingSession.STATUS_FINISHED_ALL_FAILED,
612+
final_details=details,
613+
)
614+
return
615+
616+
harvester.num_harvestable_resources = num_resources
617+
harvester.save()
618+
session.total_records_to_process = num_resources
619+
session.save()
620+
page_size = 10
621+
total_pages = math.ceil(num_resources / page_size)
622+
batches = []
623+
for page in range(total_pages):
624+
batches.append(
625+
_update_harvestable_resources_batch.signature(
626+
args=(refresh_session_id, page, page_size),
627+
).set(expires=task_expiration_time)
628+
)
629+
update_finalizer = (
630+
_finish_harvestable_resources_update.signature(args=(refresh_session_id,), immutable=True)
631+
.on_error(
632+
_handle_harvestable_resources_update_error.signature(kwargs={"refresh_session_id": refresh_session_id})
633+
)
634+
.set(expires=task_expiration_time)
635+
)
636+
637+
update_workflow = chord(batches, body=update_finalizer)
638+
update_workflow.apply_async(args=(), expires=task_expiration_time)
639+
logger.info(f"Applying harvesting chord with expiration in {task_expiration_time} seconds")
640+
641+
except Exception as exc:
642+
logger.exception("Unexpected error during update_harvestable_resources for harvester %s", harvester.id)
643+
details = f"Unexpected error: {exc}"
644+
finish_asynchronous_session(
645+
refresh_session_id,
646+
models.AsynchronousHarvestingSession.STATUS_FINISHED_ALL_FAILED,
647+
final_details=details,
648+
)
624649

625650

626651
@app.task(

geonode/harvesting/tests/test_tasks.py

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -188,28 +188,59 @@ def test_check_harvester_available(self, mock_harvester_model):
188188
tasks.check_harvester_available(1000)
189189
mock_harvester.update_availability.assert_called()
190190

191+
@mock.patch("geonode.harvesting.tasks.calculate_dynamic_expiration")
191192
@mock.patch("geonode.harvesting.tasks._handle_harvestable_resources_update_error")
192193
@mock.patch("geonode.harvesting.tasks._finish_harvestable_resources_update")
193194
@mock.patch("geonode.harvesting.tasks._update_harvestable_resources_batch")
194195
@mock.patch("geonode.harvesting.tasks.chord")
195196
@mock.patch("geonode.harvesting.tasks.models")
196197
def test_update_harvestable_resources_sends_batched_requests(
197-
self, mock_models, mock_chord, mock_batch, mock_finalizer, mock_error_handler
198+
self,
199+
mock_models,
200+
mock_chord,
201+
mock_batch,
202+
mock_finalizer,
203+
mock_error_handler,
204+
mock_calculate_expiration,
198205
):
199-
"""Verify that the `update_harvestable_resources` task creates a celery chord with the batched task, a finalizer and an error handler."""
206+
"""Verify that `update_harvestable_resources` creates a celery chord with expiration."""
207+
208+
# Set up mock harvester and session
200209
mock_worker = mock.MagicMock()
201210
mock_worker.get_num_available_resources.return_value = 1
202-
mock_harvester = mock.MagicMock(models.Harvester)
203-
mock_models.Harvester.objects.get.return_value = mock_harvester
211+
212+
mock_harvester = mock.MagicMock()
204213
mock_harvester.get_harvester_worker.return_value = mock_worker
214+
mock_harvester.update_availability.return_value = True
215+
216+
mock_session = mock.MagicMock()
217+
mock_session.status = models.AsynchronousHarvestingSession.STATUS_ON_GOING
218+
mock_session.harvester = mock_harvester
219+
220+
mock_models.AsynchronousHarvestingSession.objects.get.return_value = mock_session
221+
222+
mock_calculate_expiration.return_value = 123
223+
224+
# Setup the finalizer signature and its chained call
225+
mock_finalizer_sig = mock.MagicMock(name="finalizer_signature")
226+
mock_finalizer_on_error = mock.MagicMock(name="finalizer_on_error")
227+
mock_finalizer_sig.on_error.return_value = mock_finalizer_on_error
228+
mock_finalizer.signature.return_value = mock_finalizer_sig
229+
230+
# Run the task
231+
tasks.update_harvestable_resources("fake_session_id")
232+
233+
# Assert expiration was calculated
234+
mock_calculate_expiration.assert_called_once_with(1)
235+
236+
# Assert batch task had expiration set
237+
mock_batch.signature.return_value.set.assert_any_call(expires=123)
205238

206-
tasks.update_harvestable_resources("fake harvester id")
239+
# Assert finalizer had expiration set via the on_error chain
240+
mock_finalizer_on_error.set.assert_called_once_with(expires=123)
207241

208-
mock_batch.signature.assert_called()
209-
mock_finalizer.signature.assert_called()
210-
mock_error_handler.signature.assert_called()
211-
mock_chord.assert_called()
212-
mock_chord.return_value.apply_async.assert_called()
242+
# Assert chord was called with expiration
243+
mock_chord.return_value.apply_async.assert_called_once_with(args=(), expires=123)
213244

214245
def test_harvesting_scheduler(self):
215246
mock_harvester = mock.MagicMock(spec=models.Harvester).return_value

0 commit comments

Comments
 (0)