Skip to content

Commit e817494

Browse files
[Fixes #13719] Upsert / replace workflow bugfixes (#13720)
* [Fixes #13719] Upsert with authority different that 4326 raise error * [Fixes #13719] fix field creation on dynamic model for upsert/replace * [Fixes #13719] Cannot upsert a dataset after replace * [Fixes #13719] Make all geom to be promoted to multi
1 parent 36654b9 commit e817494

File tree

6 files changed

+121
-61
lines changed

6 files changed

+121
-61
lines changed

geonode/upload/celery_tasks.py

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -533,6 +533,9 @@ def create_geonode_resource(
533533
else:
534534
handler.create_resourcehandlerinfo(handler_module_path, resource, _exec, **kwargs)
535535

536+
if _overwrite and handler.have_table:
537+
handler.fixup_dynamic_model_fields(_exec, _files)
538+
536539
# at the end recall the import_orchestrator for the next step
537540
import_orchestrator.apply_async(
538541
(
@@ -714,6 +717,9 @@ def _create_field(dynamic_model_schema, field, _kwargs):
714717

715718
dynamic_model_schema = dynamic_model_schema.first()
716719

720+
# clearing existing fields for this chunk
721+
FieldSchema.objects.filter(model_schema=dynamic_model_schema, name__in=(x["name"] for x in fields)).delete()
722+
717723
row_to_insert = []
718724
for field in fields:
719725
# setup kwargs for the class provided
@@ -733,20 +739,13 @@ def _create_field(dynamic_model_schema, field, _kwargs):
733739
# setting the dimension for the gemetry. So that we can handle also 3d geometries
734740
_kwargs = {**_kwargs, **{"dim": field.get("dim")}}
735741

742+
if authority := field.get("authority"):
743+
srid_str = authority.split(":")[-1]
744+
if srid_str.isdigit():
745+
_kwargs["srid"] = int(srid_str)
746+
736747
# if is a new creation we generate the field model from scratch
737-
if not overwrite:
738-
row_to_insert.append(_create_field(dynamic_model_schema, field, _kwargs))
739-
else:
740-
# otherwise if is an overwrite, we update the existing one and create the one that does not exists
741-
_field_exists = FieldSchema.objects.filter(name=field["name"], model_schema=dynamic_model_schema)
742-
if _field_exists.exists():
743-
_field_exists.update(
744-
class_name=field["class_name"],
745-
model_schema=dynamic_model_schema,
746-
kwargs=_kwargs,
747-
)
748-
else:
749-
row_to_insert.append(_create_field(dynamic_model_schema, field, _kwargs))
748+
row_to_insert.append(_create_field(dynamic_model_schema, field, _kwargs))
750749

751750
if row_to_insert:
752751
if dynamic_model_schema.managed:
@@ -765,7 +764,12 @@ def _create_field(dynamic_model_schema, field, _kwargs):
765764
field.save()
766765
else:
767766
# the build creation improves the overall permformance with the DB
768-
FieldSchema.objects.bulk_create(row_to_insert, 30)
767+
FieldSchema.objects.bulk_create(
768+
row_to_insert,
769+
update_conflicts=True,
770+
update_fields=["name", "model_schema_id", "class_name", "kwargs"],
771+
unique_fields=["id"],
772+
)
769773
# fixing the schema model in django
770774

771775
return "dynamic_model", layer_name, execution_id

geonode/upload/handlers/common/raster.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,11 @@ def _publish_resource_rollback(self, exec_id, instance_name=None, *args, **kwarg
569569
publisher = DataPublisher(handler_module_path=handler_module_path)
570570
publisher.delete_resource(instance_name)
571571

572+
def fixup_dynamic_model_fields(self, _exec, files):
573+
"""
574+
Raster dataset does not have the dynamic model, so this can be skept
575+
"""
576+
572577

573578
@importer_app.task(
574579
base=UpdateTaskClass,

geonode/upload/handlers/common/tests_vector.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -643,7 +643,6 @@ def test_validate_single_feature_raise_error(self):
643643

644644
with self.assertRaises(Exception) as exp:
645645
self.json_handler.upsert_data(self.original, exec_id)
646-
647646
self.assertEqual(
648647
str(exp.exception), "An internal error occurred during upsert save. All features are rolled back."
649648
)

geonode/upload/handlers/common/vector.py

Lines changed: 96 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
#########################################################################
1919
import ast
2020
import csv
21-
21+
import re
2222
from datetime import datetime
2323
from itertools import islice
2424
from pathlib import Path
@@ -258,6 +258,8 @@ def create_ogr2ogr_command(files, original_name, ovverwrite_layer, alternate, **
258258
This is a default command that is needed to import a vector file
259259
"""
260260
_datastore = settings.DATABASES["datastore"]
261+
layers = ogr.Open(files.get("base_file"))
262+
layer = layers.GetLayer(original_name)
261263

262264
options = "--config PG_USE_COPY YES"
263265
copy_with_dump = ast.literal_eval(os.getenv("OGR2OGR_COPY_WITH_DUMP", "False"))
@@ -281,6 +283,9 @@ def create_ogr2ogr_command(files, original_name, ovverwrite_layer, alternate, **
281283

282284
options += f'-nln {alternate} "{original_name}"'
283285

286+
if layer is not None and "Point" not in ogr.GeometryTypeToName(layer.GetGeomType()):
287+
options += " -nlt PROMOTE_TO_MULTI"
288+
284289
if ovverwrite_layer:
285290
options += " -overwrite"
286291

@@ -734,12 +739,15 @@ def _define_dynamic_layer_schema(self, layer, **kwargs):
734739

735740
return layer_schema
736741

737-
def promote_to_multi(self, geometry_name: str):
742+
def promote_to_multi(self, geometry_name):
738743
"""
739744
If needed change the name of the geometry, by promoting it to Multi
740745
example if is Point -> MultiPoint
741746
Needed for the shapefiles
747+
Later this is used to map the geometry coming from ogr2ogr with a django class
742748
"""
749+
if "Multi" not in geometry_name and "Point" not in geometry_name and "3D" not in geometry_name:
750+
return f"Multi {geometry_name.title()}"
743751
return geometry_name
744752

745753
def promote_geom_to_multi(self, geom):
@@ -748,7 +756,21 @@ def promote_geom_to_multi(self, geom):
748756
example if is Point -> MultiPoint
749757
Needed for the shapefiles
750758
"""
751-
return geom
759+
match geom.GetGeometryType():
760+
case ogr.wkbMultiLineString | ogr.wkbMultiPolygon:
761+
# if is already multi, we dont need to do anything
762+
return geom
763+
case ogr.wkbLineString:
764+
new_multi_geom = ogr.Geometry(ogr.wkbMultiLineString)
765+
new_multi_geom.AddGeometry(geom)
766+
return new_multi_geom
767+
case ogr.wkbPolygon:
768+
new_multi_geom = ogr.Geometry(ogr.wkbMultiPolygon)
769+
new_multi_geom.AddGeometry(geom)
770+
return new_multi_geom
771+
case _:
772+
# we dont convert points
773+
return geom
752774

753775
def create_geonode_resource(
754776
self,
@@ -796,37 +818,7 @@ def create_geonode_resource(
796818

797819
saved_dataset.refresh_from_db()
798820

799-
# if dynamic model is enabled, we can save up with is the primary key of the table
800-
if settings.IMPORTER_ENABLE_DYN_MODELS and self.have_table:
801-
from django.db import connections
802-
803-
# then we can check for the PK
804-
column = None
805-
connection = connections["datastore"]
806-
table_name = saved_dataset.alternate.split(":")[1]
807-
808-
schema = ModelSchema.objects.filter(name=table_name).first()
809-
schema.managed = False
810-
schema.save()
811-
812-
with connection.cursor() as cursor:
813-
column = connection.introspection.get_primary_key_columns(cursor, table_name)
814-
if column:
815-
# getting the relative model schema
816-
# better to always ensure that the schema is NOT managed
817-
field = FieldSchema.objects.filter(name=column[0], model_schema__name=table_name).first()
818-
if field:
819-
field.kwargs.update({"primary_key": True})
820-
field.save()
821-
else:
822-
# creating the field needed as primary key
823-
pk_field = FieldSchema(
824-
name=column[0],
825-
model_schema=schema,
826-
class_name="django.db.models.BigAutoField",
827-
kwargs={"null": False, "primary_key": True},
828-
)
829-
pk_field.save()
821+
self.__fixup_primary_key(saved_dataset)
830822

831823
return saved_dataset
832824

@@ -1159,6 +1151,40 @@ def __get_new_and_original_schema(self, files, execution_id):
11591151

11601152
return target_schema_fields, new_file_schema_fields
11611153

1154+
def __fixup_primary_key(self, saved_dataset):
1155+
1156+
# if dynamic model is enabled, we can save up with is the primary key of the table
1157+
if settings.IMPORTER_ENABLE_DYN_MODELS and self.have_table:
1158+
from django.db import connections
1159+
1160+
# then we can check for the PK
1161+
column = None
1162+
connection = connections["datastore"]
1163+
table_name = saved_dataset.alternate.split(":")[1]
1164+
1165+
schema = ModelSchema.objects.filter(name=table_name).first()
1166+
schema.managed = False
1167+
schema.save()
1168+
1169+
with connection.cursor() as cursor:
1170+
column = connection.introspection.get_primary_key_columns(cursor, table_name)
1171+
if column:
1172+
# getting the relative model schema
1173+
# better to always ensure that the schema is NOT managed
1174+
field = FieldSchema.objects.filter(name=column[0], model_schema__name=table_name).first()
1175+
if field:
1176+
field.kwargs.update({"primary_key": True})
1177+
field.save()
1178+
else:
1179+
# creating the field needed as primary key
1180+
pk_field = FieldSchema(
1181+
name=column[0],
1182+
model_schema=schema,
1183+
class_name="django.db.models.BigAutoField",
1184+
kwargs={"null": False, "primary_key": True},
1185+
)
1186+
pk_field.save()
1187+
11621188
def upsert_data(self, files, execution_id, **kwargs):
11631189
"""
11641190
Function used to upsert the data for a vector resource.
@@ -1175,9 +1201,7 @@ def upsert_data(self, files, execution_id, **kwargs):
11751201
exec_obj = orchestrator.get_execution_object(execution_id)
11761202

11771203
# getting the related model schema for the resource
1178-
original_resource = ResourceBase.objects.filter(pk=exec_obj.input_params.get("resource_pk")).first()
1179-
self.real_instance = original_resource.get_real_instance()
1180-
model = ModelSchema.objects.filter(name=original_resource.alternate.split(":")[-1]).first()
1204+
original_resource, model = self.___get_dynamic_schema(exec_obj)
11811205
if not model:
11821206
raise UpsertException(
11831207
"This dataset does't support updates. Please upload the dataset again to have the upsert operations enabled"
@@ -1218,6 +1242,12 @@ def upsert_data(self, files, execution_id, **kwargs):
12181242
"layer_name": original_resource.title,
12191243
}
12201244

1245+
def ___get_dynamic_schema(self, exec_obj):
1246+
original_resource = ResourceBase.objects.filter(pk=exec_obj.input_params.get("resource_pk")).first()
1247+
self.real_instance = original_resource.get_real_instance()
1248+
model = ModelSchema.objects.filter(name=original_resource.alternate.split(":")[-1]).first()
1249+
return original_resource, model
1250+
12211251
def _commit_upsert(self, model_obj, OriginalResource, upsert_key, layer_iterator):
12221252
valid_create = 0
12231253
valid_update = 0
@@ -1310,7 +1340,11 @@ def _validate_feature(self, data_chunk, model_instance, upsert_key, errors):
13101340
# need to simulate the "promote to multi" used by the upload process.
13111341
# here we cannot rely on ogr2ogr so we need to do it manually
13121342
geom = feature.GetGeometryRef()
1313-
feature_as_dict.update({self.default_geometry_column_name: self.promote_geom_to_multi(geom).ExportToWkt()})
1343+
if geom:
1344+
wkt = self.promote_geom_to_multi(geom).ExportToWkt()
1345+
if code := geom.GetSpatialReference().GetAuthorityCode(None):
1346+
wkt = f"SRID={code};{wkt}"
1347+
feature_as_dict.update({self.default_geometry_column_name: wkt})
13141348

13151349
feature_as_dict, is_valid = self.validate_feature(feature_as_dict)
13161350
if not is_valid:
@@ -1330,10 +1364,23 @@ def _save_feature(self, data_chunk, model_obj, model_instance, upsert_key, valid
13301364
feature_to_save = []
13311365
for feature in data_chunk:
13321366
feature_as_dict = feature.items()
1367+
# evaluate if there is any date in the schema of the feature
1368+
schema = feature.DumpReadableAsString().split("\n")
1369+
if any(date_fields := [f for f in schema if ("(Date)" in f or "(DateTime)" in f) and "(null)" not in f]):
1370+
# if any field schema as date is found, we can normalize the date
1371+
pattern = re.compile(r"^\s*(?P<label>.+?)\s*\(\s*(?P<type>.+?)\s*\)\s*=\s*(?P<date_value>.+)\s*$")
1372+
for fields in date_fields:
1373+
match = pattern.search(fields)
1374+
label = match.group("label")
1375+
date_value = match.group("date_value")
1376+
feature_as_dict[label] = date_value.replace("/", "-")
13331377
# need to simulate the "promote to multi" used by the upload process.
13341378
# here we cannot rely on ogr2ogr so we need to do it manually
13351379
geom = feature.GetGeometryRef()
1336-
feature_as_dict.update({self.default_geometry_column_name: self.promote_geom_to_multi(geom).ExportToWkt()})
1380+
code = geom.GetSpatialReference().GetAuthorityCode(None)
1381+
feature_as_dict.update(
1382+
{self.default_geometry_column_name: f"SRID={code};{self.promote_geom_to_multi(geom).ExportToWkt()}"}
1383+
)
13371384
to_process.append(feature_as_dict)
13381385

13391386
for feature_as_dict in to_process:
@@ -1420,8 +1467,18 @@ def refresh_geonode_resource(self, execution_id, asset=None, dataset=None, creat
14201467

14211468
orchestrator.update_execution_request_obj(exec_obj, {"geonode_resource": dataset})
14221469

1470+
self.__fixup_primary_key(dataset)
14231471
return dataset
14241472

1473+
def fixup_dynamic_model_fields(self, _exec, files):
1474+
"""
1475+
Utility needed during the replace workflow,
1476+
it will sync all the FieldSchema along with the current resource uploaded.
1477+
This is mandatory in order to have a reliable field structure in the DB
1478+
"""
1479+
fields_schema, needed_field_schema = self.__get_new_and_original_schema(files, str(_exec.exec_id))
1480+
fields_schema.filter(~Q(name__in=(x["name"] for x in needed_field_schema))).delete()
1481+
14251482

14261483
@importer_app.task(
14271484
base=ErrorBaseTaskClass,

geonode/upload/handlers/shapefile/handler.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -170,14 +170,9 @@ def create_ogr2ogr_command(files, original_name, ovverwrite_layer, alternate, **
170170
This is a default command that is needed to import a vector file
171171
"""
172172
base_command = BaseVectorFileHandler.create_ogr2ogr_command(files, original_name, ovverwrite_layer, alternate)
173-
layers = ogr.Open(files.get("base_file"))
174-
layer = layers.GetLayer(original_name)
175-
173+
additional_options = []
176174
encoding = ShapeFileHandler._get_encoding(files)
177175

178-
additional_options = []
179-
if layer is not None and "Point" not in ogr.GeometryTypeToName(layer.GetGeomType()):
180-
additional_options.append("-nlt PROMOTE_TO_MULTI")
181176
if encoding:
182177
additional_options.append(f"--config SHAPE_ENCODING {encoding}")
183178

geonode/upload/handlers/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040

4141

4242
STANDARD_TYPE_MAPPING = {
43-
"Integer64": "django.db.models.IntegerField",
43+
"Integer64": "django.db.models.FloatField",
4444
"Integer": "django.db.models.IntegerField",
4545
"DateTime": "django.db.models.DateTimeField",
4646
"Date": "django.db.models.DateField",

0 commit comments

Comments
 (0)