Skip to content

Commit 46ab758

Browse files
support append for STACAPIJobDatabase
1 parent 7ae446e commit 46ab758

File tree

1 file changed

+30
-16
lines changed

1 file changed

+30
-16
lines changed

openeo/extra/stac_job_db.py

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from typing import Iterable, List, Union
55

66
import geopandas as gpd
7+
import numpy as np
78
import pandas as pd
89
import pystac
910
import requests
@@ -27,21 +28,28 @@ class STACAPIJobDatabase(JobDatabaseInterface):
2728
"""
2829

2930
def __init__(
30-
self, collection_id: str, stac_root_url: str, auth: requests.auth.AuthBase, has_geometry: bool = False
31+
self,
32+
collection_id: str,
33+
stac_root_url: str,
34+
auth: requests.auth.AuthBase,
35+
has_geometry: bool = False,
36+
geometry_column: str = "geometry",
3137
):
3238
"""
3339
Initialize the STACAPIJobDatabase.
3440
3541
:param collection_id: The ID of the STAC collection.
3642
:param stac_root_url: The root URL of the STAC API.
3743
:param auth: requests AuthBase that will be used to authenticate, e.g. OAuth2ResourceOwnerPasswordCredentials
44+
:param has_geometry: Whether the job metadata supports any geometry that implements __geo_interface__.
45+
:param geometry_column: The name of the geometry column in the job metadata that implements __geo_interface__.
3846
"""
3947
self.collection_id = collection_id
4048
self.client = Client.open(stac_root_url)
4149

4250
self._auth = auth
4351
self.has_geometry = has_geometry
44-
self.geometry_column = "geometry"
52+
self.geometry_column = geometry_column
4553
self.base_url = stac_root_url
4654
self.bulk_size = 500
4755

@@ -60,25 +68,33 @@ def initialize_from_df(self, df: pd.DataFrame, *, on_exists: str = "error"):
6068
:param on_exists: what to do when the job database already exists (persisted on disk):
6169
- "error": (default) raise an exception
6270
- "skip": work with existing database, ignore given dataframe and skip any initialization
71+
- "append": add given dataframe to existing database
6372
6473
:return: initialized job database.
6574
"""
75+
if isinstance(df, gpd.GeoDataFrame):
76+
df = df.copy()
77+
_log.warning("Job Database is initialized from GeoDataFrame. Converting geometries to GeoJSON.")
78+
self.geometry_column = df.geometry.name
79+
df[self.geometry_column] = df[self.geometry_column].apply(lambda x: mapping(x))
80+
df = pd.DataFrame(df)
81+
self.has_geometry = True
82+
6683
if self.exists():
6784
if on_exists == "skip":
6885
return self
6986
elif on_exists == "error":
7087
raise FileExistsError(f"Job database {self!r} already exists.")
88+
elif on_exists == "append":
89+
existing_df = self.get_by_status([])
90+
df = MultiBackendJobManager._normalize_df(df)
91+
df = pd.concat([existing_df, df], ignore_index=True).replace({np.nan: None})
92+
self.persist(df)
93+
return self
94+
7195
else:
72-
# TODO handle other on_exists modes: e.g. overwrite, merge, ...
7396
raise ValueError(f"Invalid on_exists={on_exists!r}")
7497

75-
if isinstance(df, gpd.GeoDataFrame):
76-
_log.warning("Job Database is initialized from GeoDataFrame. Converting geometries to GeoJSON.")
77-
self.geometry_column = df.geometry.name
78-
df["geometry"] = df["geometry"].apply(lambda x: mapping(x))
79-
df = pd.DataFrame(df)
80-
self.has_geometry = True
81-
8298
df = MultiBackendJobManager._normalize_df(df)
8399
self.persist(df)
84100
# Return self to allow chaining with constructor.
@@ -98,7 +114,7 @@ def series_from(self, item: pystac.Item) -> pd.Series:
98114

99115
return pd.Series(item_dict["properties"], name=item_id)
100116

101-
def item_from(self, series: pd.Series, geometry_name: str = "geometry") -> pystac.Item:
117+
def item_from(self, series: pd.Series) -> pystac.Item:
102118
"""
103119
Convert a pandas.Series to a STAC Item.
104120
@@ -123,15 +139,15 @@ def item_from(self, series: pd.Series, geometry_name: str = "geometry") -> pysta
123139
item_dict["properties"]["datetime"] = pystac.utils.datetime_to_str(datetime.now())
124140

125141
if self.has_geometry:
126-
item_dict["geometry"] = series[geometry_name]
142+
item_dict["geometry"] = series[self.geometry_column]
127143
else:
128144
item_dict["geometry"] = None
129145

130146
# from_dict handles associating any Links and Assets with the Item
131147
item_dict["id"] = series.name
132148
item = pystac.Item.from_dict(item_dict)
133149
if self.has_geometry:
134-
item.bbox = shape(series[geometry_name]).bounds
150+
item.bbox = shape(series[self.geometry_column]).bounds
135151
else:
136152
item.bbox = None
137153
return item
@@ -167,8 +183,6 @@ def get_by_status(self, statuses: Iterable[str], max=None) -> pd.DataFrame:
167183
) # Even for an empty dataframe the default columns are required
168184
return df
169185

170-
171-
172186
def persist(self, df: pd.DataFrame):
173187
if not self.exists():
174188
spatial_extent = pystac.SpatialExtent([[-180, -90, 180, 90]])
@@ -181,7 +195,7 @@ def persist(self, df: pd.DataFrame):
181195
if not df.empty:
182196

183197
def handle_row(series):
184-
item = self.item_from(series, self.geometry_column)
198+
item = self.item_from(series)
185199
all_items.append(item)
186200

187201

0 commit comments

Comments
 (0)