|
35 | 35 | from dynamic_models.models import ModelSchema |
36 | 36 | from osgeo import ogr |
37 | 37 | from django.test.utils import override_settings |
| 38 | +from geonode.upload.utils import create_vrt_file, has_incompatible_field_names |
38 | 39 |
|
39 | 40 |
|
40 | 41 | class TestBaseVectorFileHandler(TestCase): |
@@ -69,6 +70,100 @@ def test_create_error_log(self): |
69 | 70 | expected = "Task: foo_task_name raised an error during actions for layer: alternate: my exception" |
70 | 71 | self.assertEqual(expected, actual) |
71 | 72 |
|
| 73 | + def test_create_vrt_file_with_special_chars(self): |
| 74 | + """ |
| 75 | + Test that create_vrt_file correctly sanitizes layer and field names |
| 76 | + with spaces and special characters, and generates a valid VRT file. |
| 77 | + """ |
| 78 | + source_filepath = "source file.shp" |
| 79 | + |
| 80 | + mock_layer = MagicMock(spec=ogr.Layer) |
| 81 | + mock_layer.GetName.return_value = "Layer With Spaces" |
| 82 | + mock_layer.GetGeomType.return_value = ogr.wkbPolygon |
| 83 | + |
| 84 | + mock_field_1 = MagicMock() |
| 85 | + mock_field_1.GetName.return_value = "Field 1 (m)" |
| 86 | + mock_field_1.GetTypeName.return_value = "String" |
| 87 | + |
| 88 | + mock_field_2 = MagicMock() |
| 89 | + mock_field_2.GetName.return_value = "another-field" |
| 90 | + mock_field_2.GetTypeName.return_value = "Real" |
| 91 | + |
| 92 | + mock_field_3 = MagicMock() |
| 93 | + mock_field_3.GetName.return_value = "field/with/slash" |
| 94 | + mock_field_3.GetTypeName.return_value = "Integer" |
| 95 | + |
| 96 | + mock_layer_defn = MagicMock() |
| 97 | + mock_layer_defn.GetFieldCount.return_value = 3 |
| 98 | + mock_layer_defn.GetFieldDefn.side_effect = [mock_field_1, mock_field_2, mock_field_3].__getitem__ |
| 99 | + |
| 100 | + mock_layer.GetLayerDefn.return_value = mock_layer_defn |
| 101 | + |
| 102 | + vrt_filename, vrt_layer_name = None, None |
| 103 | + try: |
| 104 | + vrt_filename, vrt_layer_name = create_vrt_file(mock_layer, source_filepath) |
| 105 | + |
| 106 | + self.assertIsNotNone(vrt_filename) |
| 107 | + self.assertTrue(os.path.exists(vrt_filename)) |
| 108 | + self.assertEqual(vrt_layer_name, "layer_with_spaces") |
| 109 | + |
| 110 | + with open(vrt_filename, "r") as f: |
| 111 | + vrt_content = f.read() |
| 112 | + |
| 113 | + self.assertIn('<OGRVRTLayer name="layer_with_spaces">', vrt_content) |
| 114 | + self.assertIn(f"<SrcDataSource>{source_filepath}</SrcDataSource>", vrt_content) |
| 115 | + self.assertIn("<SrcLayer>Layer With Spaces</SrcLayer>", vrt_content) |
| 116 | + |
| 117 | + self.assertIn('<Field name="field_1_m" src="Field 1 (m)" type="String" />', vrt_content) |
| 118 | + self.assertIn('<Field name="another_field" src="another-field" type="Real" />', vrt_content) |
| 119 | + self.assertIn('<Field name="fieldwithslash" src="field/with/slash" type="Integer" />', vrt_content) |
| 120 | + |
| 121 | + finally: |
| 122 | + if vrt_filename and os.path.exists(vrt_filename): |
| 123 | + os.remove(vrt_filename) |
| 124 | + |
| 125 | + def test_has_incompatible_field_names(self): |
| 126 | + """ |
| 127 | + Test that has_incompatible_field_names correctly identifies layers |
| 128 | + with field names that need sanitization. |
| 129 | + """ |
| 130 | + |
| 131 | + # layer with incompatible field names |
| 132 | + mock_layer_incompatible = MagicMock(spec=ogr.Layer) |
| 133 | + mock_layer_incompatible.GetName.return_value = "Layer With Spaces" |
| 134 | + |
| 135 | + mock_field_1 = MagicMock() |
| 136 | + mock_field_1.GetName.return_value = "Field 1 (m)" |
| 137 | + |
| 138 | + mock_field_2 = MagicMock() |
| 139 | + mock_field_2.GetName.return_value = "another-#field" |
| 140 | + |
| 141 | + mock_layer_defn_incompatible = MagicMock() |
| 142 | + mock_layer_defn_incompatible.GetFieldCount.return_value = 2 |
| 143 | + mock_layer_defn_incompatible.GetFieldDefn.side_effect = [mock_field_1, mock_field_2].__getitem__ |
| 144 | + |
| 145 | + mock_layer_incompatible.GetLayerDefn.return_value = mock_layer_defn_incompatible |
| 146 | + |
| 147 | + self.assertTrue(has_incompatible_field_names(mock_layer_incompatible)) |
| 148 | + |
| 149 | + # layer with compatible field names |
| 150 | + mock_layer_compatible = MagicMock(spec=ogr.Layer) |
| 151 | + mock_layer_compatible.GetName.return_value = "compatible_layer" |
| 152 | + |
| 153 | + mock_field_3 = MagicMock() |
| 154 | + mock_field_3.GetName.return_value = "field_one" |
| 155 | + |
| 156 | + mock_field_4 = MagicMock() |
| 157 | + mock_field_4.GetName.return_value = "field_two" |
| 158 | + |
| 159 | + mock_layer_defn_compatible = MagicMock() |
| 160 | + mock_layer_defn_compatible.GetFieldCount.return_value = 2 |
| 161 | + mock_layer_defn_compatible.GetFieldDefn.side_effect = [mock_field_3, mock_field_4].__getitem__ |
| 162 | + |
| 163 | + mock_layer_compatible.GetLayerDefn.return_value = mock_layer_defn_compatible |
| 164 | + |
| 165 | + self.assertFalse(has_incompatible_field_names(mock_layer_compatible)) |
| 166 | + |
72 | 167 | def test_create_dynamic_model_fields(self): |
73 | 168 | try: |
74 | 169 | # Prepare the test |
|
0 commit comments