Skip to content

Commit 25e5bce

Browse files
Gpetrakgiohappy
authored andcommitted
adding a queue for harvesting async workflow
1 parent 47780c1 commit 25e5bce

File tree

2 files changed

+14
-13
lines changed

2 files changed

+14
-13
lines changed

geonode/harvesting/tasks.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
logger = logging.getLogger(__name__)
4343

4444

45-
@app.task(bind=True, queue="geonode", expires=30, time_limit=600, acks_late=False, ignore_result=False)
45+
@app.task(bind=True, queue="harvesting", expires=30, time_limit=600, acks_late=False, ignore_result=False)
4646
def harvesting_scheduler(self):
4747
"""Check whether any of the configured harvesters needs to be run or not.
4848
@@ -86,7 +86,7 @@ def harvesting_scheduler(self):
8686

8787
@app.task(
8888
bind=True,
89-
queue="geonode",
89+
queue="harvesting",
9090
expires=30,
9191
time_limit=600,
9292
acks_late=False,
@@ -130,7 +130,7 @@ def harvesting_dispatcher(self, harvesting_session_id: int):
130130

131131
@app.task(
132132
bind=True,
133-
queue="geonode",
133+
queue="harvesting",
134134
expires=120,
135135
time_limit=600,
136136
acks_late=False,
@@ -233,7 +233,7 @@ def harvest_resources(
233233

234234
@app.task(
235235
bind=True,
236-
queue="geonode",
236+
queue="harvesting",
237237
time_limit=600,
238238
acks_late=False,
239239
ignore_result=False,
@@ -370,7 +370,7 @@ def _harvest_resource(self, harvestable_resource_id: int, harvesting_session_id:
370370

371371
@app.task(
372372
bind=True,
373-
queue="geonode",
373+
queue="harvesting",
374374
time_limit=600,
375375
acks_late=False,
376376
ignore_result=False,
@@ -425,7 +425,7 @@ def _finish_harvesting(self, harvesting_session_id: int, execution_id: str):
425425
logger.exception(f"Failed to finalize harvesting session {harvesting_session_id}: {exc}")
426426

427427

428-
@app.task(bind=True, queue="geonode", time_limit=600, acks_late=False, ignore_result=False)
428+
@app.task(bind=True, queue="harvesting", time_limit=600, acks_late=False, ignore_result=False)
429429
def _finish_harvesting_chunk(self, _results, harvesting_session_id: int):
430430
"""
431431
Optionally log chunk completion, but do NOT change final status here
@@ -439,7 +439,7 @@ def _finish_harvesting_chunk(self, _results, harvesting_session_id: int):
439439

440440
@app.task(
441441
bind=True,
442-
queue="geonode",
442+
queue="harvesting",
443443
time_limit=600,
444444
acks_late=False,
445445
ignore_result=False,
@@ -508,7 +508,7 @@ def queue_next_chunk_batch(
508508

509509
@app.task(
510510
bind=True,
511-
queue="geonode",
511+
queue="harvesting",
512512
expires=30,
513513
time_limit=600,
514514
acks_late=False,
@@ -554,7 +554,7 @@ def _handle_harvesting_error(self, task_id, *args, **kwargs):
554554
@app.task(
555555
bind=True,
556556
# name='geonode.harvesting.tasks.check_harvester_available',
557-
queue="geonode",
557+
queue="harvesting",
558558
expires=30,
559559
time_limit=600,
560560
acks_late=False,
@@ -567,7 +567,7 @@ def check_harvester_available(self, harvester_id: int):
567567

568568
@app.task(
569569
bind=True,
570-
queue="geonode",
570+
queue="harvesting",
571571
time_limit=600,
572572
acks_late=False,
573573
ignore_result=False,
@@ -651,7 +651,7 @@ def update_harvestable_resources(self, refresh_session_id: int):
651651

652652
@app.task(
653653
bind=True,
654-
queue="geonode",
654+
queue="harvesting",
655655
expires=30,
656656
time_limit=600,
657657
acks_late=False,
@@ -693,7 +693,7 @@ def _update_harvestable_resources_batch(self, refresh_session_id: int, page: int
693693

694694
@app.task(
695695
bind=True,
696-
queue="geonode",
696+
queue="harvesting",
697697
expires=30,
698698
time_limit=600,
699699
acks_late=False,
@@ -719,7 +719,7 @@ def _finish_harvestable_resources_update(self, refresh_session_id: int):
719719

720720
@app.task(
721721
bind=True,
722-
queue="geonode",
722+
queue="harvesting",
723723
expires=30,
724724
time_limit=600,
725725
acks_late=False,

geonode/settings.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1784,6 +1784,7 @@ def get_geonode_catalogue_service():
17841784
Queue("security", GEONODE_EXCHANGE, routing_key="security", priority=0),
17851785
Queue("management_commands_http", GEONODE_EXCHANGE, routing_key="management_commands_http", priority=0),
17861786
Queue("clery_cleanup", GEONODE_EXCHANGE, routing_key="clery_cleanup", priority=0),
1787+
Queue("harvesting", GEONODE_EXCHANGE, routing_key="harvesting", priority=0),
17871788
)
17881789

17891790
if USE_GEOSERVER:

0 commit comments

Comments
 (0)