Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## Changes in 1.9.1 (in development)

### Enhancements

* xcube supports reading from and writing to `kml`

## Changes in 1.9.0

### Enhancements
Expand Down
2 changes: 1 addition & 1 deletion docs/source/dataaccess.md
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ The following `storage_options` can be used for the `abfs` data store:
All filesystem data stores can open datasets from various data formats.
Datasets in Zarr, GeoTIFF / COG, or NetCDF format will be provided either by
[xarray.Dataset] or xcube [MultiLevelDataset] instances.
Datasets stored in GeoJSON or ESRI Shapefile will yield
Datasets stored in GeoJSON, KML or ESRI Shapefile will yield
[geopandas.GeoDataFrame] instances.

Common parameters for opening [xarray.Dataset] instances:
Expand Down
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dependencies:
- s3fs >=2021.6
- setuptools >=41.0
- shapely >=1.6
- simplekml
- tabulate >=0.9
- tornado >=6.0
- urllib3 >=2.0
Expand Down
162 changes: 161 additions & 1 deletion test/core/store/fs/test_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
from typing import Any, Callable, Optional, Union

import fsspec
import geopandas as gpd
import numpy as np
import pandas as pd
import pytest
import xarray as xr

Expand All @@ -25,6 +27,7 @@
DataDescriptor,
DatasetDescriptor,
DataStoreError,
GeoDataFrameDescriptor,
MultiLevelDatasetDescriptor,
MutableDataStore,
)
Expand Down Expand Up @@ -69,6 +72,18 @@ def new_cube_data():
return cube.chunk(dict(time=1, y=90, x=180))


def new_geodataframe():
time_data = pd.date_range(start="2010-01-01T00:00:00", periods=2, freq="D").values
return gpd.GeoDataFrame(
{"place_name": ["Place A", "Place B"],
"is_active": [True, False],
"timestamp": time_data,
"salinity [‰]": [10, 20],
"var_y": [0.5, 2.0]},
geometry=gpd.points_from_xy([8.0, 8.1], [50.0, 50.1]),
crs="EPSG:4326"
)

class NewCubeDataTestMixin(unittest.TestCase):
path = f"{DATA_PATH}/data.zarr"

Expand Down Expand Up @@ -225,7 +240,29 @@ def test_dataset_levels(self):
assert_data_ok=self._assert_zarr_store_direct_ok,
)

# TODO: add assertGeoDataFrameSupport
def test_geodataframe_geojson(self):
data_store = self.create_data_store()
self._assert_geodataframe_supported(
data_store,
filename_ext=".geojson",
requested_dtype_alias="geodataframe",
expected_dtype_aliases={"geodataframe"},
expected_return_type=gpd.GeoDataFrame,
expected_descriptor_type=GeoDataFrameDescriptor,
assert_data_ok = self._assert_geodataframe_ok
)

def test_geodataframe_kml(self):
data_store = self.create_data_store()
self._assert_geodataframe_supported(
data_store,
filename_ext=".kml",
requested_dtype_alias="geodataframe",
expected_dtype_aliases={"geodataframe"},
expected_return_type=gpd.GeoDataFrame,
expected_descriptor_type=GeoDataFrameDescriptor,
assert_data_ok=self._assert_geodataframe_ok
)

def _assert_multi_level_dataset_format_supported(self, data_store: FsDataStore):
self._assert_dataset_supported(
Expand Down Expand Up @@ -332,6 +369,19 @@ def _assert_multi_level_dataset_data_ok(self, ml_dataset):
)
self.assertNotIsInstance(dataset.zarr_store.get(), GenericZarrStore)

def _assert_geodataframe_ok(self, gdf: gpd.GeoDataFrame):
self.assertIn("place_name", gdf.columns)
self.assertIn("is_active", gdf.columns)
self.assertIn("salinity [‰]", gdf.columns)
self.assertIn("var_y", gdf.columns)
self.assertIn("geometry", gdf.columns)
self.assertIn("timestamp", gdf.columns)
self.assertEqual("object", str(gdf.place_name.dtype))
self.assertEqual("bool", str(gdf.is_active.dtype))
self.assertEqual("int32", gdf["salinity [‰]"].dtype)
self.assertEqual("float64", str(gdf.var_y.dtype))
self.assertTrue(pd.api.types.is_datetime64_any_dtype(gdf.timestamp))

def _assert_multi_level_dataset_format_with_tile_size(
self, data_store: FsDataStore
):
Expand Down Expand Up @@ -481,6 +531,116 @@ def _assert_dataset_supported(
self.assertNotIn(data_id, set(data_store.get_data_ids()))
self.assertNotIn(data_id, data_store.list_data_ids())

def _assert_geodataframe_supported(
self,
data_store: FsDataStore,
filename_ext: str,
requested_dtype_alias: Optional[str],
expected_dtype_aliases: set[str],
expected_return_type: type[gpd.GeoDataFrame],
expected_descriptor_type: Optional[type[GeoDataFrameDescriptor]] = None,
opener_id: str = None,
write_params: Optional[dict[str, Any]] = None,
open_params: Optional[dict[str, Any]] = None,
assert_data_ok: Optional[Callable[[Any], Any]] = None,
assert_warnings: bool = False,
warning_msg: str = None,
):
"""Call all DataStore operations to ensure data of type
gpd.GeoDataFrame is supported by *data_store*.

Args:
data_store: The filesystem data store instance.
filename_ext: Filename extension that identifies
a supported dataset format.
expected_data_type_alias: The expected data type alias.
expected_return_type: The expected data type.
expected_descriptor_type: The expected data descriptor type.
opener_id: Optional opener identifier
write_params: Optional write parameters
open_params: Optional open parameters
assert_data_ok: Optional function to assert read data is ok
assert_warnings: Optional boolean if test may check for warnings
warning_msg: Optional warning message to be checked if
assert_warnings is True
"""

data_id = f"{DATA_PATH}/ds{filename_ext}"

write_params = write_params or {}
open_params = open_params or {}

self.assertIsInstance(data_store, MutableDataStore)

self.assertEqual(
{"dataset", "mldataset", "geodataframe"}, set(data_store.get_data_types())
)

with pytest.raises(
DataStoreError, match=f'Data resource "{data_id}" does not exist in store'
):
data_store.get_data_types_for_data(data_id)
self.assertEqual(False, data_store.has_data(data_id))
self.assertNotIn(data_id, set(data_store.get_data_ids()))
self.assertNotIn(data_id, data_store.list_data_ids())

data = new_geodataframe()
written_data_id = data_store.write_data(data, data_id, **write_params)
self.assertEqual(data_id, written_data_id)

self.assertEqual(
expected_dtype_aliases, set(data_store.get_data_types_for_data(data_id))
)
self.assertEqual(True, data_store.has_data(data_id))
self.assertIn(data_id, set(data_store.get_data_ids()))
self.assertIn(data_id, data_store.list_data_ids())

if expected_descriptor_type is not None:
data_descriptors = list(
data_store.search_data(data_type=expected_return_type)
)
self.assertEqual(1, len(data_descriptors))
self.assertIsInstance(data_descriptors[0], DataDescriptor)
self.assertIsInstance(data_descriptors[0], expected_descriptor_type)

if assert_warnings:
with warnings.catch_warnings(record=True) as w:
data = data_store.open_data(
data_id,
opener_id=opener_id,
data_type=requested_dtype_alias,
**open_params,
)
# if "s3" data store is tested, warnings from other
# libraries like botocore occur
if data_store.protocol != "s3":
self.assertEqual(1, len(w))
self.assertEqual(w[0].category, UserWarning)
self.assertEqual(warning_msg, w[0].message.args[0])
else:
data = data_store.open_data(
data_id,
opener_id=opener_id,
data_type=requested_dtype_alias,
**open_params,
)
self.assertIsInstance(data, expected_return_type)
if assert_data_ok is not None:
assert_data_ok(data)

try:
data_store.delete_data(data_id)
except PermissionError as e: # May occur on win32 due to fsspec
warnings.warn(f"{e}")
return
with pytest.raises(
DataStoreError,
match=f'Data resource "{data_id}" does not exist in store',
):
data_store.get_data_types_for_data(data_id)
self.assertEqual(False, data_store.has_data(data_id))
self.assertNotIn(data_id, set(data_store.get_data_ids()))
self.assertNotIn(data_id, data_store.list_data_ids())

class FileFsDataStoresTest(FsDataStoresTestMixin, unittest.TestCase):
def create_data_store(self) -> FsDataStore:
Expand Down
100 changes: 99 additions & 1 deletion xcube/core/store/fs/impl/geodataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@

import geopandas as gpd
import pandas as pd
import simplekml

from xcube.util.assertions import assert_instance
from xcube.util.fspath import is_local_fs
from xcube.util.jsonschema import JsonObjectSchema
from xcube.util.temp import new_temp_file

from ... import DataStoreError
from ...datatype import GEO_DATA_FRAME_TYPE, DataType
from ..accessor import FsDataAccessor

Expand Down Expand Up @@ -62,13 +64,17 @@
assert_instance(data, (gpd.GeoDataFrame, pd.DataFrame), "data")
fs, root, write_params = self.load_fs(write_params)
is_local = is_local_fs(fs)
replace = write_params.pop("replace", False)
if is_local:
file_path = data_id
if not replace and fs.exists(file_path):
raise DataStoreError(f"Data '{data_id}' already exists.")

Check warning on line 71 in xcube/core/store/fs/impl/geodataframe.py

View check run for this annotation

Codecov / codecov/patch

xcube/core/store/fs/impl/geodataframe.py#L71

Added line #L71 was not covered by tests
else:
_, file_path = new_temp_file()
data.to_file(file_path, driver=self.get_driver_name(), **write_params)
if not is_local:
fs.put_file(file_path, data_id)
mode = "overwrite" if replace else "create"
fs.put_file(file_path, data_id, mode=mode)
return data_id


Expand All @@ -94,3 +100,95 @@
@classmethod
def get_driver_name(cls) -> str:
return "GeoJSON"


class GeoDataFrameKmlFsDataAccessor(GeoDataFrameFsDataAccessor):
"""Extension name: 'geodataframe:kml:<protocol>'."""

@classmethod
def get_format_id(cls) -> str:
return "kml"

@classmethod
def get_driver_name(cls) -> str:
return "KML"

def open_data(self, data_id: str, **open_params) -> gpd.GeoDataFrame:
gdf = super().open_data(data_id, **open_params)
kml_nan_columns = [
"Name", "description", "timestamp", "begin", "end", "altitudeMode",
"drawOrder", "icon"
]
kml_number_columns = {
"tessellate": -1,
"extrude": 0,
"visibility": -1,
}
for col in gdf.columns:
if ((col in kml_nan_columns and pd.isna(gdf[col]).all()) or
(col in kml_number_columns.keys() and
len(gdf[col].unique()) == 1 and
gdf[col].unique()[0] == kml_number_columns[col])):
del gdf[col]
continue
if col not in ["geometry"]:
if pd.api.types.is_datetime64_any_dtype(gdf[col]):
continue
try:
gdf[col] = pd.to_numeric(gdf[col])
except ValueError:
if gdf[col].isin(["True", "False"]).all():
gdf[col] = gdf[col].map({"True": True, "False": False})
return gdf

def write_data(self, data: gpd.GeoDataFrame, data_id: str, **write_params) -> str:
assert_instance(data, (gpd.GeoDataFrame, pd.DataFrame), "data")
fs, root, write_params = self.load_fs(write_params)
is_local = is_local_fs(fs)
replace = write_params.pop("replace", False)
if is_local:
file_path = data_id
if not replace and fs.exists(file_path):
raise DataStoreError(f"Data '{data_id}' already exists.")

Check warning on line 152 in xcube/core/store/fs/impl/geodataframe.py

View check run for this annotation

Codecov / codecov/patch

xcube/core/store/fs/impl/geodataframe.py#L152

Added line #L152 was not covered by tests
else:
_, file_path = new_temp_file()

kml = simplekml.Kml()
kml_schema = kml.newschema(name="kmlschema")
append_cols = {}

for _, row in data.iterrows():
geom = row.geometry

if geom.geom_type == "Point":
entry = kml.newpoint(coords=[(geom.x, geom.y)])
elif geom.geom_type == "LineString":
entry = kml.newlinestring(coords=list(geom.coords))
elif geom.geom_type == "Polygon":
entry = kml.newpolygon(outerboundaryis=list(geom.exterior.coords))

Check warning on line 168 in xcube/core/store/fs/impl/geodataframe.py

View check run for this annotation

Codecov / codecov/patch

xcube/core/store/fs/impl/geodataframe.py#L165-L168

Added lines #L165 - L168 were not covered by tests
else:
continue

Check warning on line 170 in xcube/core/store/fs/impl/geodataframe.py

View check run for this annotation

Codecov / codecov/patch

xcube/core/store/fs/impl/geodataframe.py#L170

Added line #L170 was not covered by tests
schema = simplekml.SchemaData("kmlschema")
for col in data.columns:
if col != "geometry":
schema.newsimpledata(col, str(row[col]))
if col not in append_cols:
dtype_str = str(data[col].dtype)
if dtype_str == "object" or dtype_str == "bool":
dtype_str = "string"
elif dtype_str.startswith("int"):
dtype_str = "int"
elif dtype_str.startswith("float"):
dtype_str = "float"
append_cols[col] = dtype_str
entry.extendeddata.schemadata = schema

for col, typ in append_cols.items():
kml_schema.newsimplefield(col, type=typ)

kml.save(file_path)

if not is_local:
mode = "overwrite" if replace else "create"
fs.put_file(file_path, data_id, mode=mode)
return data_id
4 changes: 3 additions & 1 deletion xcube/core/store/fs/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
)
from .impl.geodataframe import (
GeoDataFrameGeoJsonFsDataAccessor,
GeoDataFrameKmlFsDataAccessor,
GeoDataFrameShapefileFsDataAccessor,
)
from .impl.geotiff import MultiLevelDatasetGeoTiffFsDataAccessor
Expand Down Expand Up @@ -125,8 +126,9 @@ def register_fs_data_accessor_class(fs_data_accessor_class: type[FsDataAccessor]
DatasetLevelsFsDataAccessor,
MultiLevelDatasetGeoTiffFsDataAccessor,
MultiLevelDatasetLevelsFsDataAccessor,
GeoDataFrameShapefileFsDataAccessor,
GeoDataFrameGeoJsonFsDataAccessor,
GeoDataFrameKmlFsDataAccessor,
GeoDataFrameShapefileFsDataAccessor,
):
register_fs_data_accessor_class(cls)

Expand Down
4 changes: 3 additions & 1 deletion xcube/core/store/fs/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@
".tif": "geotiff",
".tiff": "geotiff",
".geotiff": "geotiff",
".shp": "shapefile",
".geojson": "geojson",
".kml": "kml",
".shp": "shapefile",
}

_FORMAT_TO_DATA_TYPE_ALIASES = {
Expand All @@ -80,6 +81,7 @@
"levels": (MULTI_LEVEL_DATASET_TYPE.alias, DATASET_TYPE.alias),
"geotiff": (DATASET_TYPE.alias, MULTI_LEVEL_DATASET_TYPE.alias),
"geojson": (GEO_DATA_FRAME_TYPE.alias,),
"kml": (GEO_DATA_FRAME_TYPE.alias,),
"shapefile": (GEO_DATA_FRAME_TYPE.alias,),
}

Expand Down
Loading