Skip to content

Commit 6867ab1

Browse files
authored
[Fixes #13609] Tracking the failures of OGR2OGR related tasks (#13613)
* adding 'Multi 3D Polygon' geometry type * tracking the failures of the ogr2ogr tasks * removing the alternate as an extra key * fixing flake issues * excluding the 3d in the promote_to_multi * fixing formatting issues
1 parent ccae985 commit 6867ab1

File tree

3 files changed

+54
-5
lines changed

3 files changed

+54
-5
lines changed

geonode/upload/celery_tasks.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,39 @@ def set_task_status(self, task_name, execution_id, layer_key, status):
192192
)
193193

194194

195+
class UpdateDynamicTaskClass(Task):
196+
max_retries = 3
197+
track_started = True
198+
199+
def on_failure(self, exc, task_id, args, kwargs, einfo):
200+
"""
201+
Called when the task fails.
202+
Updates the ExecutionRequest.tasks dict and delegates logging/error handling to evaluate_error.
203+
"""
204+
task_name = self.name
205+
execution_id = args[0]
206+
layer_key = find_key_recursively(kwargs, "layer_key")
207+
208+
_exec = orchestrator.get_execution_object(execution_id)
209+
tasks_status = _exec.tasks or {}
210+
211+
if layer_key is not None:
212+
# Ensure the layer exists
213+
if layer_key not in tasks_status:
214+
tasks_status[layer_key] = {}
215+
216+
tasks_status[layer_key][task_name] = "FAILED"
217+
218+
# Update ExecutionRequest tasks dict first
219+
orchestrator.update_execution_request_status(
220+
execution_id=execution_id,
221+
tasks=tasks_status,
222+
)
223+
224+
# Delegate the rest (errors, failed_layers, status) to evaluate_error
225+
evaluate_error(self, exc, task_id, args, kwargs, einfo)
226+
227+
195228
@importer_app.task(
196229
bind=True,
197230
base=ErrorBaseTaskClass,
@@ -646,7 +679,7 @@ def copy_geonode_resource(self, exec_id, actual_step, layer_name, alternate, han
646679

647680

648681
@importer_app.task(
649-
base=ErrorBaseTaskClass,
682+
base=UpdateDynamicTaskClass,
650683
name="geonode.upload.create_dynamic_structure",
651684
queue="geonode.upload.create_dynamic_structure",
652685
max_retries=1,
@@ -660,6 +693,7 @@ def create_dynamic_structure(
660693
dynamic_model_schema_id: int,
661694
overwrite: bool,
662695
layer_name: str,
696+
**kwargs,
663697
):
664698
def _create_field(dynamic_model_schema, field, _kwargs):
665699
# common method to define the Field Schema object

geonode/upload/handlers/common/vector.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,18 @@
4343
from geonode.base.models import ResourceBase
4444
from geonode.resource.enumerator import ExecutionRequestAction as exa
4545
from geonode.layers.models import Dataset
46-
from geonode.upload.celery_tasks import ErrorBaseTaskClass, FieldSchema, create_dynamic_structure
46+
from geonode.upload.celery_tasks import (
47+
ErrorBaseTaskClass,
48+
FieldSchema,
49+
create_dynamic_structure,
50+
UpdateDynamicTaskClass,
51+
)
4752
from geonode.upload.handlers.base import BaseHandler
4853
from geonode.upload.handlers.utils import (
4954
GEOM_TYPE_MAPPING,
5055
STANDARD_TYPE_MAPPING,
5156
drop_dynamic_model_schema,
57+
create_layer_key,
5258
)
5359
from geonode.resource.manager import resource_manager
5460
from geonode.resource.models import ExecutionRequest
@@ -652,7 +658,14 @@ def create_dynamic_model_fields(
652658
# definition of the celery group needed to run the async workflow.
653659
# in this way each task of the group will handle only 30 field
654660
celery_group = group(
655-
create_dynamic_structure.s(execution_id, schema, dynamic_model_schema.id, overwrite, layer_name)
661+
create_dynamic_structure.s(
662+
execution_id,
663+
schema,
664+
dynamic_model_schema.id,
665+
overwrite,
666+
layer_name,
667+
layer_key=create_layer_key(layer.GetName(), str(execution_id)),
668+
)
656669
for schema in list_chunked
657670
)
658671

@@ -921,6 +934,7 @@ def get_ogr2ogr_task_group(
921934
handler_module_path,
922935
should_be_overwritten,
923936
alternate,
937+
layer_key=create_layer_key(layer.lower(), str(execution_id)),
924938
)
925939

926940
def _get_execution_request_object(self, execution_id: str):
@@ -1391,7 +1405,7 @@ def import_next_step(
13911405

13921406

13931407
@importer_app.task(
1394-
base=ErrorBaseTaskClass,
1408+
base=UpdateDynamicTaskClass,
13951409
name="geonode.upload.import_with_ogr2ogr",
13961410
queue="geonode.upload.import_with_ogr2ogr",
13971411
max_retries=1,
@@ -1406,6 +1420,7 @@ def import_with_ogr2ogr(
14061420
handler_module_path: str,
14071421
ovverwrite_layer=False,
14081422
alternate=None,
1423+
**kwargs,
14091424
):
14101425
"""
14111426
Perform the ogr2ogr command to import he gpkg inside geonode_data

geonode/upload/handlers/shapefile/handler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ def promote_to_multi(self, geometry_name):
212212
Needed for the shapefiles
213213
Later this is used to map the geometry coming from ogr2ogr with a django class
214214
"""
215-
if "Multi" not in geometry_name and "Point" not in geometry_name:
215+
if "Multi" not in geometry_name and "Point" not in geometry_name and "3D" not in geometry_name:
216216
return f"Multi {geometry_name.title()}"
217217
return geometry_name
218218

0 commit comments

Comments
 (0)