Skip to content

Commit c81fa6b

Browse files
bountxAndrzej Pijanowskijonhealy1
authored
Fix Item Duplicate Detection Across Datetime Indexes (#575)
**Description:** When ENABLE_DATETIME_INDEX_FILTERING=true, items with different datetime values are stored in different datetime-partitioned indexes. The bulk insert methods (`bulk_async_prep_create_item`, `bulk_sync_prep_create_item`) were using `client.exists(index=alias)` directly, which could fail to detect existing items stored in other datetime indexes. Scenario that caused duplicates: - Item product-123 with datetime: 2024-01-15 is inserted into stac_items_collection_2024-01 - POST same product-123 with datetime: 2024-06-20 - alias check misses it - Item is inserted into stac_items_collection_2024-06 - duplicate created Solution Extracted the duplicate check logic from `async_prep_create_item` into reusable helper methods that properly iterate through all indexes in the collection alias: - `_check_item_exists_in_collection` (async) - `_check_item_exists_in_collection_sync` (sync) Updated all three prep methods to use these helpers: - `async_prep_create_item` - `bulk_async_prep_create_item` - `bulk_sync_prep_create_item` **PR Checklist:** - [x] Code is formatted and linted (run `pre-commit run --all-files`) - [x] Tests pass (run `make test`) - [x] Documentation has been updated to reflect changes, if applicable - [x] Changes are added to the changelog --------- Co-authored-by: Andrzej Pijanowski <apijanowski@cloudferro.com> Co-authored-by: Jonathan Healy <jonathan.d.healy@gmail.com>
1 parent 66cffcb commit c81fa6b

File tree

8 files changed

+628
-105
lines changed

8 files changed

+628
-105
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
1313

1414
### Fixed
1515

16+
- Fixed bulk_sync_prep_create_item to properly detect duplicates across indexes. [#575](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/575)
17+
1618
### Removed
1719

1820
### Updated

stac_fastapi/core/stac_fastapi/core/core.py

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1006,14 +1006,31 @@ async def create_item(
10061006
database=self.database, settings=self.settings
10071007
)
10081008
features = item_dict["features"]
1009-
processed_items = [
1009+
all_prepped = [
10101010
bulk_client.preprocess_item(
10111011
feature, base_url, BulkTransactionMethod.INSERT
10121012
)
10131013
for feature in features
10141014
]
1015+
# Filter out None values (skipped duplicates from DB check)
1016+
processed_items = [item for item in all_prepped if item is not None]
1017+
skipped_db_duplicates = len(all_prepped) - len(processed_items)
1018+
1019+
# Deduplicate items within the batch by ID (keep last occurrence)
1020+
# This matches ES behavior where later items overwrite earlier ones
1021+
seen_ids: dict = {}
1022+
for item in processed_items:
1023+
seen_ids[item["id"]] = item
1024+
unique_items = list(seen_ids.values())
1025+
skipped_batch_duplicates = len(processed_items) - len(unique_items)
1026+
processed_items = unique_items
1027+
1028+
skipped = skipped_db_duplicates + skipped_batch_duplicates
10151029
attempted = len(processed_items)
10161030

1031+
if not processed_items:
1032+
return f"No items to insert. {skipped} items were skipped (duplicates)."
1033+
10171034
success, errors = await self.database.bulk_async(
10181035
collection_id=collection_id,
10191036
processed_items=processed_items,
@@ -1027,7 +1044,7 @@ async def create_item(
10271044
logger.info(
10281045
f"Bulk async operation succeeded with {success} actions for collection {collection_id}."
10291046
)
1030-
return f"Successfully added {success} Items. {attempted - success} errors occurred."
1047+
return f"Successfully added {success} Items. {skipped} skipped (duplicates). {attempted - success} errors occurred."
10311048

10321049
# Handle single item
10331050
await self.database.create_item(
@@ -1340,18 +1357,35 @@ def bulk_item_insert(
13401357
base_url = ""
13411358

13421359
processed_items = []
1360+
skipped_db_duplicates = 0
13431361
for item in items.items.values():
13441362
try:
13451363
validated = Item(**item) if not isinstance(item, Item) else item
1346-
processed_items.append(
1347-
self.preprocess_item(
1348-
validated.model_dump(mode="json"), base_url, items.method
1349-
)
1364+
prepped = self.preprocess_item(
1365+
validated.model_dump(mode="json"), base_url, items.method
13501366
)
1367+
if prepped is not None:
1368+
processed_items.append(prepped)
1369+
else:
1370+
skipped_db_duplicates += 1
13511371
except ValidationError:
13521372
# Immediately raise on the first invalid item (strict mode)
13531373
raise
13541374

1375+
# Deduplicate items within the batch by ID (keep last occurrence)
1376+
# This matches ES behavior where later items overwrite earlier ones
1377+
seen_ids: dict = {}
1378+
for item in processed_items:
1379+
seen_ids[item["id"]] = item
1380+
unique_items = list(seen_ids.values())
1381+
skipped_batch_duplicates = len(processed_items) - len(unique_items)
1382+
processed_items = unique_items
1383+
1384+
skipped = skipped_db_duplicates + skipped_batch_duplicates
1385+
1386+
if not processed_items:
1387+
return f"No items to insert. {skipped} items were skipped (duplicates)."
1388+
13551389
collection_id = processed_items[0]["collection"]
13561390
attempted = len(processed_items)
13571391
success, errors = self.database.bulk_sync(
@@ -1364,4 +1398,4 @@ def bulk_item_insert(
13641398
else:
13651399
logger.info(f"Bulk sync operation succeeded with {success} actions.")
13661400

1367-
return f"Successfully added/updated {success} Items. {attempted - success} errors occurred."
1401+
return f"Successfully added/updated {success} Items. {skipped} skipped (duplicates). {attempted - success} errors occurred."

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py

Lines changed: 67 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,14 @@
3333
PatchOperation,
3434
)
3535
from stac_fastapi.sfeos_helpers.database import (
36+
ItemAlreadyExistsError,
3637
add_bbox_shape_to_collection,
3738
apply_collections_bbox_filter_shared,
3839
apply_collections_datetime_filter_shared,
3940
apply_free_text_filter_shared,
4041
apply_intersects_filter_shared,
42+
check_item_exists_in_alias,
43+
check_item_exists_in_alias_sync,
4144
create_index_templates_shared,
4245
delete_item_index_shared,
4346
get_queryables_mapping_shared,
@@ -996,6 +999,44 @@ async def check_collection_exists(self, collection_id: str):
996999
if not await self.client.exists(index=COLLECTIONS_INDEX, id=collection_id):
9971000
raise NotFoundError(f"Collection {collection_id} does not exist")
9981001

1002+
async def _check_item_exists_in_collection(
1003+
self, collection_id: str, item_id: str
1004+
) -> bool:
1005+
"""Check if an item exists across all indexes for a collection.
1006+
1007+
Args:
1008+
collection_id (str): The collection identifier.
1009+
item_id (str): The item identifier.
1010+
1011+
Returns:
1012+
bool: True if the item exists in any index, False otherwise.
1013+
"""
1014+
alias = index_alias_by_collection_id(collection_id)
1015+
doc_id = mk_item_id(item_id, collection_id)
1016+
try:
1017+
return await check_item_exists_in_alias(self.client, alias, doc_id)
1018+
except Exception:
1019+
return False
1020+
1021+
def _check_item_exists_in_collection_sync(
1022+
self, collection_id: str, item_id: str
1023+
) -> bool:
1024+
"""Check if an item exists across all indexes for a collection (sync version).
1025+
1026+
Args:
1027+
collection_id (str): The collection identifier.
1028+
item_id (str): The item identifier.
1029+
1030+
Returns:
1031+
bool: True if the item exists in any index, False otherwise.
1032+
"""
1033+
alias = index_alias_by_collection_id(collection_id)
1034+
doc_id = mk_item_id(item_id, collection_id)
1035+
try:
1036+
return check_item_exists_in_alias_sync(self.sync_client, alias, doc_id)
1037+
except Exception:
1038+
return False
1039+
9991040
async def async_prep_create_item(
10001041
self, item: Item, base_url: str, exist_ok: bool = False
10011042
) -> Item:
@@ -1011,31 +1052,21 @@ async def async_prep_create_item(
10111052
Item: The prepped item.
10121053
10131054
Raises:
1014-
ConflictError: If the item already exists in the database.
1055+
ItemAlreadyExistsError: If the item already exists in the database.
10151056
10161057
"""
10171058
await self.check_collection_exists(collection_id=item["collection"])
1018-
alias = index_alias_by_collection_id(item["collection"])
1019-
doc_id = mk_item_id(item["id"], item["collection"])
1020-
1021-
if not exist_ok:
1022-
alias_exists = await self.client.indices.exists_alias(name=alias)
1023-
1024-
if alias_exists:
1025-
alias_info = await self.client.indices.get_alias(name=alias)
1026-
indices = list(alias_info.keys())
10271059

1028-
for index in indices:
1029-
if await self.client.exists(index=index, id=doc_id):
1030-
raise ConflictError(
1031-
f"Item {item['id']} in collection {item['collection']} already exists"
1032-
)
1060+
if not exist_ok and await self._check_item_exists_in_collection(
1061+
item["collection"], item["id"]
1062+
):
1063+
raise ItemAlreadyExistsError(item["id"], item["collection"])
10331064

10341065
return self.item_serializer.stac_to_db(item, base_url)
10351066

10361067
async def bulk_async_prep_create_item(
10371068
self, item: Item, base_url: str, exist_ok: bool = False
1038-
) -> Item:
1069+
) -> Optional[Item]:
10391070
"""
10401071
Prepare an item for insertion into the database.
10411072
@@ -1063,20 +1094,18 @@ async def bulk_async_prep_create_item(
10631094
# Check if the collection exists
10641095
await self.check_collection_exists(collection_id=item["collection"])
10651096

1066-
# Check if the item already exists in the database
1067-
if not exist_ok and await self.client.exists(
1068-
index=index_alias_by_collection_id(item["collection"]),
1069-
id=mk_item_id(item["id"], item["collection"]),
1097+
# Check if the item already exists in the database (across all datetime indexes)
1098+
if not exist_ok and await self._check_item_exists_in_collection(
1099+
item["collection"], item["id"]
10701100
):
1071-
error_message = (
1072-
f"Item {item['id']} in collection {item['collection']} already exists."
1073-
)
10741101
if self.async_settings.raise_on_bulk_error:
1075-
raise ConflictError(error_message)
1102+
raise ItemAlreadyExistsError(item["id"], item["collection"])
10761103
else:
10771104
logger.warning(
1078-
f"{error_message} Continuing as `RAISE_ON_BULK_ERROR` is set to false."
1105+
f"Item {item['id']} in collection {item['collection']} already exists. "
1106+
"Skipping as `RAISE_ON_BULK_ERROR` is set to false."
10791107
)
1108+
return None
10801109

10811110
# Serialize the item into a database-compatible format
10821111
prepped_item = self.item_serializer.stac_to_db(item, base_url)
@@ -1085,7 +1114,7 @@ async def bulk_async_prep_create_item(
10851114

10861115
def bulk_sync_prep_create_item(
10871116
self, item: Item, base_url: str, exist_ok: bool = False
1088-
) -> Item:
1117+
) -> Optional[Item]:
10891118
"""
10901119
Prepare an item for insertion into the database.
10911120
@@ -1114,26 +1143,18 @@ def bulk_sync_prep_create_item(
11141143
if not self.sync_client.exists(index=COLLECTIONS_INDEX, id=item["collection"]):
11151144
raise NotFoundError(f"Collection {item['collection']} does not exist")
11161145

1117-
# Check if the item already exists in the database
1118-
alias = index_alias_by_collection_id(item["collection"])
1119-
doc_id = mk_item_id(item["id"], item["collection"])
1120-
1121-
if not exist_ok:
1122-
alias_exists = self.sync_client.indices.exists_alias(name=alias)
1123-
1124-
if alias_exists:
1125-
alias_info = self.sync_client.indices.get_alias(name=alias)
1126-
indices = list(alias_info.keys())
1127-
1128-
for index in indices:
1129-
if self.sync_client.exists(index=index, id=doc_id):
1130-
error_message = f"Item {item['id']} in collection {item['collection']} already exists."
1131-
if self.sync_settings.raise_on_bulk_error:
1132-
raise ConflictError(error_message)
1133-
else:
1134-
logger.warning(
1135-
f"{error_message} Continuing as `RAISE_ON_BULK_ERROR` is set to false."
1136-
)
1146+
# Check if the item already exists in the database (across all datetime indexes)
1147+
if not exist_ok and self._check_item_exists_in_collection_sync(
1148+
item["collection"], item["id"]
1149+
):
1150+
if self.sync_settings.raise_on_bulk_error:
1151+
raise ItemAlreadyExistsError(item["id"], item["collection"])
1152+
else:
1153+
logger.warning(
1154+
f"Item {item['id']} in collection {item['collection']} already exists. "
1155+
"Skipping as `RAISE_ON_BULK_ERROR` is set to false."
1156+
)
1157+
return None
11371158

11381159
# Serialize the item into a database-compatible format
11391160
prepped_item = self.item_serializer.stac_to_db(item, base_url)

0 commit comments

Comments
 (0)