Skip to content

Commit 9271613

Browse files
[Fixes #13478] Enhancement / bugfix of upsert functionality (#13482)
* [Fixes #13478] improvements on upsert * [Fixes #13478] add bulk update and insert in upsert * [Fixes #13478] add auto multi promotion for upsert
1 parent e21da3c commit 9271613

File tree

16 files changed

+254
-46
lines changed

16 files changed

+254
-46
lines changed

.env_dev

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,3 +210,4 @@ RESTART_POLICY_WINDOW=120s
210210

211211
DEFAULT_MAX_PARALLEL_UPLOADS_PER_USER=5
212212
IMPORTER_ENABLE_DYN_MODELS=True
213+
UPSERT_CHUNK_SIZE= 10

geonode/settings.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2319,5 +2319,8 @@ def get_geonode_catalogue_service():
23192319

23202320
# Group default logo url
23212321
GROUP_LOGO_URL = os.getenv("GROUP_LOGO_URL", "/geonode/img/group_logo.png")
2322+
2323+
UPSERT_CHUNK_SIZE = ast.literal_eval(os.getenv("UPSERT_CHUNK_SIZE", "1000"))
2324+
23222325
FILE_UPLOAD_DIRECTORY_PERMISSIONS = 0o777
23232326
FILE_UPLOAD_PERMISSIONS = 0o777

geonode/upload/celery_tasks.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -841,10 +841,13 @@ def upsert_data(self, execution_id, /, handler_module_path, action, **kwargs):
841841
if not is_valid:
842842
raise UpsertException(errors)
843843

844-
result = _datastore.upsert_data(execution_id, **kwargs)
844+
upsert_success, result = _datastore.upsert_data(execution_id, **kwargs)
845845

846846
orchestrator.update_execution_request_obj(_exec, {"output_params": {"upsert": result}})
847847

848+
if not upsert_success:
849+
raise UpsertException("Upsert has failed, please verify the log for more information")
850+
848851
resource = ResourceBase.objects.get(pk=_exec.input_params.get("resource_pk"))
849852

850853
task_params = (

geonode/upload/handlers/common/vector.py

Lines changed: 121 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
#
1818
#########################################################################
1919
import ast
20+
from datetime import datetime
21+
from itertools import islice
2022
from django.db import connections
2123
from geonode.security.permissions import _to_compact_perms
2224
from geonode.storage.manager import StorageManager
@@ -54,6 +56,7 @@
5456
from geonode.upload.models import ResourceHandlerInfo
5557
from geonode.upload.orchestrator import orchestrator
5658
from django.db.models import Q
59+
5760
import pyproj
5861
from geonode.geoserver.security import delete_dataset_cache, set_geowebcache_invalidate_cache
5962
from geonode.geoserver.helpers import get_time_info
@@ -600,6 +603,7 @@ def create_dynamic_model_fields(
600603
self.promote_to_multi(ogr.GeometryTypeToName(layer.GetGeomType()))
601604
),
602605
"dim": (2 if not ogr.GeometryTypeToName(layer.GetGeomType()).lower().startswith("3d") else 3),
606+
"authority": self.identify_authority(layer),
603607
}
604608
]
605609

@@ -627,6 +631,14 @@ def promote_to_multi(self, geometry_name: str):
627631
"""
628632
return geometry_name
629633

634+
def promote_geom_to_multi(self, geom):
635+
"""
636+
Convert the GetGeometryType object into Multi
637+
example if is Point -> MultiPoint
638+
Needed for the shapefiles
639+
"""
640+
return geom
641+
630642
def create_geonode_resource(
631643
self,
632644
layer_name: str,
@@ -963,19 +975,28 @@ def upsert_validation(self, files, execution_id, **kwargs: dict) -> Tuple[bool,
963975
raise UpsertException(
964976
f"The columns in the source and target do not match they must be equal. The following are not expected or missing: {differeces}"
965977
)
978+
skip_geom_eval = False
966979
for field in new_file_schema_fields:
967980
# check if the field exists in the previous schema
968981
target_field = target_schema_fields.filter(name=field["name"]).first()
969982
if target_field:
970983
# if is the primary key, we can skip the check
971984
# If the field exists the class name should be the same
985+
if "authority" in field and not skip_geom_eval:
986+
if db_value := target_field.model_schema.as_model().objects.first():
987+
skip_geom_eval = True
988+
if not str(db_value.geometry.srid) in field["authority"]:
989+
message = f"The file provided have a different authority ({field['authority']}) compared to the one in the DB: {db_value}"
990+
raise UpsertException(message)
991+
972992
if not target_field.class_name == field["class_name"] and not target_field.kwargs.get(
973993
"primary_key"
974994
):
975995
# if the class changes, is marked as error
976996
message = f"The type of the following field is changed and is prohibited: field: {field['name']}| current: {target_field.class_name}| new: {field['class_name']}"
977997
errors.append(message)
978998
logger.error(message)
999+
9791000
return not errors, errors
9801001
else:
9811002
raise UpsertException(
@@ -998,7 +1019,19 @@ def __get_new_and_original_schema(self, files, execution_id):
9981019
layers = self._select_valid_layers(all_layers)
9991020
if not layers:
10001021
raise UpsertException("No valid layers found in the provided file for upsert.")
1022+
10011023
layer = layers[0]
1024+
# evaluate if some of the ogc_fid entry is null. if is null we stop the workflow
1025+
# the user should provide the completed list with the ogc_fid set
1026+
sql_query = f'SELECT * FROM "{layer.GetName()}" WHERE "ogc_fid" IS NULL'
1027+
1028+
# Execute the SQL query to the layer
1029+
result = all_layers.ExecuteSQL(sql_query)
1030+
if not result or (result and result.GetFeatureCount() > 0):
1031+
raise UpsertException(
1032+
f"All the feature in the file must have the ogc_fid field correctly populated. Number of None value: {result.GetFeatureCount() if result else 'all'}"
1033+
)
1034+
10021035
# Will generate the same schema as the target_resource_schema
10031036
new_file_schema_fields = self.create_dynamic_model_fields(
10041037
layer,
@@ -1048,39 +1081,39 @@ def upsert_data(self, files, execution_id, **kwargs):
10481081
if not layers:
10491082
raise UpsertException("No valid layers were found in the file provided")
10501083
# we can upsert just 1 layer at time
1051-
for feature in layers[0]:
1052-
feature_as_dict = feature.items()
1053-
filter_dict = {field: value for field, value in feature_as_dict.items() if field == upsert_key}
1054-
if not filter_dict:
1055-
# if is not possible to extract the feature from ogr2ogr, better to ignore the layer
1056-
logger.error("was not possible to extract the feature of the dataset, skipping...")
1057-
update_error.append(f"Was not possible to manage the following data: {feature}")
1058-
continue
1059-
to_update = OriginalResource.objects.filter(**filter_dict)
1060-
geom = feature.GetGeometryRef()
1061-
feature_as_dict.update({self.default_geometry_column_name: geom.ExportToWkt()})
1062-
if to_update:
1063-
try:
1064-
to_update.update(**feature_as_dict)
1065-
valid_update += 1
1066-
except Exception as e:
1067-
logger.error(f"Error during update of {feature_as_dict} in upsert: {e}")
1068-
update_error.append(str(e))
1069-
else:
1070-
try:
1071-
OriginalResource.objects.create(**feature_as_dict)
1072-
valid_create += 1
1073-
except Exception as e:
1074-
logger.error(f"Error during creation of {feature_as_dict} in upsert: {e}")
1075-
create_error.append(str(e))
1084+
schema_fields = [f.name for f in model.fields.filter(kwargs__primary_key__isnull=True)]
1085+
total_feature = layers[0].GetFeatureCount()
1086+
layer_iterator = iter(layers[0])
1087+
while True:
1088+
# Create an iterator for the next chunk
1089+
data_chunk = list(islice(layer_iterator, settings.UPSERT_CHUNK_SIZE))
1090+
1091+
# If the chunk is empty, we've reached the end of the layer
1092+
if not data_chunk:
1093+
break
1094+
1095+
valid_create, valid_update, update_error, create_error = self._process_feature(
1096+
data_chunk,
1097+
model_instance=OriginalResource,
1098+
upsert_key=upsert_key,
1099+
update_error=update_error,
1100+
create_error=create_error,
1101+
valid_update=valid_update,
1102+
valid_create=valid_create,
1103+
schema_fields=schema_fields,
1104+
)
10761105

10771106
# generating the resorucehandler infor to track the changes on ther resource
10781107

10791108
self.create_resourcehandlerinfo(
10801109
handler_module_path=str(self), resource=original_resource, execution_id=exec_obj
10811110
)
10821111

1112+
if (total_feature - len(update_error) - len(create_error)) == 0:
1113+
raise UpsertException("All the entries provided raised error, execution is going to be stopped")
1114+
10831115
return not update_error and not create_error, {
1116+
"success": not update_error and not create_error,
10841117
"errors": {"create": create_error, "update": update_error},
10851118
"data": {
10861119
"total": {
@@ -1092,6 +1125,61 @@ def upsert_data(self, files, execution_id, **kwargs):
10921125
},
10931126
}
10941127

1128+
def _process_feature(
1129+
self,
1130+
data_chunk,
1131+
model_instance,
1132+
upsert_key,
1133+
update_error,
1134+
create_error,
1135+
valid_update,
1136+
valid_create,
1137+
schema_fields,
1138+
):
1139+
# getting all the upsert_key value from the data chunk
1140+
# retrieving the data from the DB
1141+
value_in_db = model_instance.objects.filter(
1142+
**{f"{upsert_key}__in": (getattr(feature, upsert_key) for feature in data_chunk)}
1143+
).in_bulk(field_name=upsert_key)
1144+
update_bulk = []
1145+
create_bulk = []
1146+
# looping over the chunk data
1147+
for feature in data_chunk:
1148+
feature_as_dict = feature.items()
1149+
# need to simulate the "promote to multi" used by the upload process.
1150+
# here we cannot rely on ogr2ogr so we need to do it manually
1151+
geom = feature.GetGeometryRef()
1152+
feature_as_dict.update({self.default_geometry_column_name: self.promote_geom_to_multi(geom).ExportToWkt()})
1153+
if feature_as_dict.get(upsert_key) in value_in_db:
1154+
# if the key is present, we need to update the object
1155+
# the geometry must be treated manually
1156+
obj = value_in_db[feature_as_dict.get(upsert_key)]
1157+
for key, value in feature_as_dict.items():
1158+
setattr(obj, key, value)
1159+
update_bulk.append(obj)
1160+
else:
1161+
# if the key is not present, we can create a new instance
1162+
create_bulk.append(model_instance(**feature_as_dict))
1163+
1164+
if update_bulk:
1165+
try:
1166+
# bulk update of the value. the schema is needed since is possible in django
1167+
# to update only specific columns of the schema. By default is updating all of them
1168+
model_instance.objects.bulk_update(update_bulk, fields=schema_fields)
1169+
valid_update += len(update_bulk)
1170+
except Exception as e:
1171+
logger.error("Error during update of feature in upsert")
1172+
update_error.append(str(e))
1173+
1174+
if create_bulk:
1175+
try:
1176+
model_instance.objects.bulk_create(create_bulk)
1177+
valid_create += len(update_bulk)
1178+
except Exception as e:
1179+
logger.error("Error during create of feature in upsert")
1180+
create_error.append(str(e))
1181+
return valid_create, valid_update, update_error, create_error
1182+
10951183
def extract_upsert_key(self, exec_obj, dynamic_model_instance):
10961184
# first we check if the upsert key is passed by the call
10971185
key = exec_obj.input_params.get("upsert_key")
@@ -1120,11 +1208,18 @@ def refresh_geonode_resource(self, execution_id, asset=None, dataset=None, creat
11201208
resource=dataset, files=exec_obj.input_params["files"], action=exec_obj.action
11211209
)
11221210
# but we need to delete the previous one associated to the resource
1123-
1211+
start = datetime.now()
11241212
delete_dataset_cache(dataset.alternate)
1213+
logging.debug(f"DATASET DELETE CACHE DONE {datetime.now() - start}")
1214+
11251215
# recalculate featuretype info
1216+
start = datetime.now()
11261217
DataPublisher(str(self)).recalculate_geoserver_featuretype(dataset)
1218+
logging.debug(f"recalculate_geoserver_featuretype DONE {datetime.now() - start}")
1219+
1220+
start = datetime.now()
11271221
set_geowebcache_invalidate_cache(dataset_alternate=dataset.alternate)
1222+
logging.debug(f"set_geowebcache_invalidate_cache DONE {datetime.now() - start}")
11281223

11291224
dataset = resource_manager.update(dataset.uuid, instance=dataset)
11301225

geonode/upload/handlers/shapefile/handler.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,30 @@ def promote_to_multi(self, geometry_name):
210210
If needed change the name of the geometry, by promoting it to Multi
211211
example if is Point -> MultiPoint
212212
Needed for the shapefiles
213+
Later this is used to map the geometry coming from ogr2ogr with a django class
213214
"""
214215
if "Multi" not in geometry_name and "Point" not in geometry_name:
215216
return f"Multi {geometry_name.title()}"
216217
return geometry_name
218+
219+
def promote_geom_to_multi(self, geom):
220+
"""
221+
Convert the GetGeometryType object into Multi
222+
example if is Point -> MultiPoint
223+
Needed for the shapefiles
224+
"""
225+
match geom.GetGeometryType():
226+
case ogr.wkbMultiLineString | ogr.wkbMultiPolygon:
227+
# if is already multi, we dont need to do anything
228+
return geom
229+
case ogr.wkbLineString:
230+
new_multi_geom = ogr.Geometry(ogr.wkbMultiLineString)
231+
new_multi_geom.AddGeometry(geom)
232+
return new_multi_geom
233+
case ogr.wkbPolygon:
234+
new_multi_geom = ogr.Geometry(ogr.wkbMultiPolygon)
235+
new_multi_geom.AddGeometry(geom)
236+
return new_multi_geom
237+
case _:
238+
# we dont convert points
239+
return geom

geonode/upload/tests/end2end/test_env2end_upsert.py

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,10 @@ def setUpClass(cls) -> None:
5454
super().setUpClass()
5555
cls.user = get_user_model().objects.exclude(username="Anonymous").first()
5656
cls.original = {
57-
"base_file": f"{project_dir}/tests/fixture/upsert/original.shp",
58-
"dbf_file": f"{project_dir}/tests/fixture/upsert/original.dbf",
59-
"prj_file": f"{project_dir}/tests/fixture/upsert/original.prj",
60-
"shx_file": f"{project_dir}/tests/fixture/upsert/original.shx",
57+
"base_file": f"{project_dir}/tests/fixture/upsert/original.json",
6158
}
62-
cls.upsert_shp = {
63-
"base_file": f"{project_dir}/tests/fixture/upsert/upsert.shp",
64-
"dbf_file": f"{project_dir}/tests/fixture/upsert/upsert.dbf",
65-
"prj_file": f"{project_dir}/tests/fixture/upsert/upsert.prj",
66-
"shx_file": f"{project_dir}/tests/fixture/upsert/upsert.shx",
59+
cls.upsert_geojson = {
60+
"base_file": f"{project_dir}/tests/fixture/upsert/upsert.json",
6761
}
6862
file_path = gisdata.VECTOR_DATA
6963
filename = os.path.join(file_path, "san_andres_y_providencia_natural.shp")
@@ -191,16 +185,15 @@ class ImporterShapefileImportTestUpsert(BaseImporterEndToEndTest):
191185
GEODATABASE_URL=f"{geourl.split('/geonode_data')[0]}/test_geonode_data", IMPORTER_ENABLE_DYN_MODELS=True
192186
)
193187
def test_import_shapefile_upsert(self):
194-
195188
self._cleanup_layers(name="original")
196189
payload = {_filename: open(_file, "rb") for _filename, _file in self.original.items()}
197190
payload["action"] = "upload"
198191
initial_name = "original"
199192
prev_dataset = self._assertimport(payload, initial_name, keep_resource=True)
200-
payload = {_filename: open(_file, "rb") for _filename, _file in self.upsert_shp.items()}
193+
payload = {_filename: open(_file, "rb") for _filename, _file in self.upsert_geojson.items()}
201194
payload["resource_pk"] = prev_dataset.pk
202195
payload["action"] = "upsert"
203-
payload["upsert_key"] = "unique"
196+
payload["upsert_key"] = "ogc_fid"
204197

205198
# time to upsert the data
206199
self.client.force_login(self.admin)
@@ -210,9 +203,10 @@ def test_import_shapefile_upsert(self):
210203
self.assertEqual(response.status_code, 201)
211204
exec_obj = ExecutionRequest.objects.get(exec_id=response.json().get("execution_id"))
212205
output_input = exec_obj.output_params
213-
self.assertTrue(output_input.get("upsert")[0])
214-
data = output_input.get("upsert")[1]
206+
self.assertTrue(output_input.get("upsert", {}).get("success"))
207+
data = output_input.get("upsert")
215208
expected = {
209+
"success": True,
216210
"data": {
217211
"error": {"create": 0, "update": 0},
218212
"total": {"error": 0, "success": 2},
@@ -228,7 +222,7 @@ def test_import_shapefile_upsert(self):
228222
self.assertIsNotNone(key)
229223
self.assertTrue(len(key) == 1)
230224
# evaluate if the dynamic model is correctly upserted, we expect 2 rows
231-
self.assertEqual(schema.as_model().objects.count(), 2)
225+
self.assertEqual(schema.as_model().objects.count(), 3)
232226

233227
@mock.patch.dict(os.environ, {"GEONODE_GEODATABASE": "test_geonode_data", "IMPORTER_ENABLE_DYN_MODELS": "True"})
234228
@override_settings(
@@ -244,7 +238,7 @@ def test_import_shapefile_upsert_raise_error_with_different_schemas(self):
244238
payload = {_filename: open(_file, "rb") for _filename, _file in self.default_shp.items()}
245239
payload["resource_pk"] = prev_dataset.pk
246240
payload["action"] = "upsert"
247-
payload["upsert_key"] = "unique"
241+
payload["upsert_key"] = "ogc_fid"
248242

249243
# time to upsert the data
250244
self.client.force_login(self.admin)
@@ -255,6 +249,6 @@ def test_import_shapefile_upsert_raise_error_with_different_schemas(self):
255249
self.assertFalse(response.json().get("success", True))
256250
self.assertTrue("errors" in response.json())
257251
self.assertTrue(
258-
"The columns in the source and target do not match they must be equal. The following are not expected or missing"
252+
"All the feature in the file must have the ogc_fid field correctly populated. Number of None value: all"
259253
in response.json()["errors"][0]
260254
)
-649 Bytes
Binary file not shown.
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
{
2+
"type": "FeatureCollection",
3+
"features": [
4+
{
5+
"type": "Feature",
6+
"geometry": {
7+
"type": "Point",
8+
"coordinates": [
9+
-47.20138110814478,
10+
-13.8281037379215
11+
]
12+
},
13+
"properties": {
14+
"ogc_fid": 999,
15+
"st_area_shape": 192656942.39615256,
16+
"st_length_shape": 117499.50756699985,
17+
"ra_cira": 9,
18+
"ra_nome": "THIS IS A CREATE",
19+
"ra_codigo": "THIS IS A CREATE",
20+
"ra_path": "http://localhost:8080",
21+
"ra_areakm2": 192.65694240000002
22+
}
23+
}
24+
]
25+
}

0 commit comments

Comments
 (0)