Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- Updated dynamic mapping for items to map long values to double versus float. [#326](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/326)
- Extended Datetime Search to search on start_datetime and end_datetime as well as datetime fields. [#182](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/182)
- Changed item update operation to use Elasticsearch index API instead of delete and create for better efficiency and atomicity. [#75](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/issues/75)
- Bulk insertion via `BulkTransactionsClient` now strictly validates all STAC Items using the Pydantic model before insertion. Any invalid item will immediately raise a `ValidationError`, ensuring consistent validation with single-item inserts and preventing invalid STAC Items from being stored. This validation is enforced regardless of the `RAISE_ON_BULK_ERROR` setting. [#368](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/368)


### Fixed

- Refactored `create_item` and `update_item` methods to share unified logic, ensuring consistent conflict detection, validation, and database operations. [#368](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/368)

## [v4.0.0] - 2025-04-23

### Added
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ You can customize additional settings in your `.env` file:
| `ELASTICSEARCH_VERSION` | Version of Elasticsearch to use. | `8.11.0` | Optional |
| `ENABLE_DIRECT_RESPONSE` | Enable direct response for maximum performance (disables all FastAPI dependencies, including authentication, custom status codes, and validation) | `false` | Optional |
| `OPENSEARCH_VERSION` | OpenSearch version | `2.11.1` | Optional
| `RAISE_ON_BULK_ERROR` | Controls whether bulk insert operations raise exceptions on errors. If set to `true`, the operation will stop and raise an exception when an error occurs. If set to `false`, errors will be logged, and the operation will continue. | `false` | Optional |
| `RAISE_ON_BULK_ERROR` | Controls whether bulk insert operations raise exceptions on errors. If set to `true`, the operation will stop and raise an exception when an error occurs. If set to `false`, errors will be logged, and the operation will continue. **Note:** STAC Item and ItemCollection validation errors will always raise, regardless of this flag. | `false` | Optional |

> [!NOTE]
> The variables `ES_HOST`, `ES_PORT`, `ES_USE_SSL`, and `ES_VERIFY_CERTS` apply to both Elasticsearch and OpenSearch backends, so there is no need to rename the key names to `OS_` even if you're using OpenSearch.
Expand Down
68 changes: 46 additions & 22 deletions stac_fastapi/core/stac_fastapi/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -693,19 +693,26 @@ async def create_item(
NotFoundError: If the specified collection is not found in the database.
ConflictError: If an item with the same ID already exists in the collection.
"""
item = item.model_dump(mode="json")
base_url = str(kwargs["request"].base_url)
# Ensure request is present
request = kwargs.get("request")
if not request:
raise ValueError("Request must be provided in kwargs")
base_url = str(request.base_url)

# Convert Pydantic model to dict for uniform processing
item_dict = item.model_dump(mode="json")

# If a feature collection is posted
if item["type"] == "FeatureCollection":
# Handle FeatureCollection (bulk insert)
if item_dict["type"] == "FeatureCollection":
bulk_client = BulkTransactionsClient(
database=self.database, settings=self.settings
)
features = item_dict["features"]
processed_items = [
bulk_client.preprocess_item(
item, base_url, BulkTransactionMethod.INSERT
feature, base_url, BulkTransactionMethod.INSERT
)
for item in item["features"]
for feature in features
]
attempted = len(processed_items)
success, errors = await self.database.bulk_async(
Expand All @@ -714,17 +721,23 @@ async def create_item(
refresh=kwargs.get("refresh", False),
)
if errors:
logger.error(f"Bulk async operation encountered errors: {errors}")
logger.error(
f"Bulk async operation encountered errors for collection {collection_id}: {errors} (attempted {attempted})"
)
else:
logger.info(f"Bulk async operation succeeded with {success} actions.")

logger.info(
f"Bulk async operation succeeded with {success} actions for collection {collection_id}."
)
return f"Successfully added {success} Items. {attempted - success} errors occurred."
else:
item = await self.database.async_prep_create_item(
item=item, base_url=base_url
)
await self.database.create_item(item, refresh=kwargs.get("refresh", False))
return ItemSerializer.db_to_stac(item, base_url)

# Handle single item
await self.database.create_item(
item_dict,
refresh=kwargs.get("refresh", False),
base_url=base_url,
exist_ok=False,
)
return ItemSerializer.db_to_stac(item_dict, base_url)

@overrides
async def update_item(
Expand All @@ -750,8 +763,9 @@ async def update_item(
now = datetime_type.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
item["properties"]["updated"] = now

await self.database.check_collection_exists(collection_id)
await self.database.create_item(item, refresh=kwargs.get("refresh", False))
await self.database.create_item(
item, refresh=kwargs.get("refresh", False), base_url=base_url, exist_ok=True
)

return ItemSerializer.db_to_stac(item, base_url)

Expand Down Expand Up @@ -908,12 +922,22 @@ def bulk_item_insert(
else:
base_url = ""

processed_items = [
self.preprocess_item(item, base_url, items.method)
for item in items.items.values()
]
processed_items = []
for item in items.items.values():
try:
validated = Item(**item) if not isinstance(item, Item) else item
processed_items.append(
self.preprocess_item(
validated.model_dump(mode="json"), base_url, items.method
)
)
except ValidationError:
# Immediately raise on the first invalid item (strict mode)
raise

if not processed_items:
return "No valid items to insert."

# not a great way to get the collection_id-- should be part of the method signature
collection_id = processed_items[0]["collection"]
attempted = len(processed_items)
success, errors = self.database.bulk_sync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,13 @@ def bulk_sync_prep_create_item(
logger.debug(f"Item {item['id']} prepared successfully.")
return prepped_item

async def create_item(self, item: Item, refresh: bool = False):
async def create_item(
self,
item: Item,
refresh: bool = False,
base_url: str = "",
exist_ok: bool = False,
):
"""Database logic for creating one item.

Args:
Expand All @@ -858,18 +864,16 @@ async def create_item(self, item: Item, refresh: bool = False):
# todo: check if collection exists, but cache
item_id = item["id"]
collection_id = item["collection"]
es_resp = await self.client.index(
item = await self.async_prep_create_item(
item=item, base_url=base_url, exist_ok=exist_ok
)
await self.client.index(
index=index_alias_by_collection_id(collection_id),
id=mk_item_id(item_id, collection_id),
document=item,
refresh=refresh,
)

if (meta := es_resp.get("meta")) and meta.get("status") == 409:
raise ConflictError(
f"Item {item_id} in collection {collection_id} already exists"
)

async def delete_item(
self, item_id: str, collection_id: str, refresh: bool = False
):
Expand Down
18 changes: 11 additions & 7 deletions stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,13 @@ def bulk_sync_prep_create_item(
logger.debug(f"Item {item['id']} prepared successfully.")
return prepped_item

async def create_item(self, item: Item, refresh: bool = False):
async def create_item(
self,
item: Item,
refresh: bool = False,
base_url: str = "",
exist_ok: bool = False,
):
"""Database logic for creating one item.

Args:
Expand All @@ -877,18 +883,16 @@ async def create_item(self, item: Item, refresh: bool = False):
# todo: check if collection exists, but cache
item_id = item["id"]
collection_id = item["collection"]
es_resp = await self.client.index(
item = await self.async_prep_create_item(
item=item, base_url=base_url, exist_ok=exist_ok
)
await self.client.index(
index=index_alias_by_collection_id(collection_id),
id=mk_item_id(item_id, collection_id),
body=item,
refresh=refresh,
)

if (meta := es_resp.get("meta")) and meta.get("status") == 409:
raise ConflictError(
f"Item {item_id} in collection {collection_id} already exists"
)

async def delete_item(
self, item_id: str, collection_id: str, refresh: bool = False
):
Expand Down
155 changes: 155 additions & 0 deletions stac_fastapi/tests/clients/test_bulk_transactions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import os
import uuid
from copy import deepcopy

import pytest
from pydantic import ValidationError

from stac_fastapi.extensions.third_party.bulk_transactions import Items
from stac_fastapi.types.errors import ConflictError

from ..conftest import MockRequest, create_item

if os.getenv("BACKEND", "elasticsearch").lower() == "opensearch":
from stac_fastapi.opensearch.config import OpensearchSettings as SearchSettings
else:
from stac_fastapi.elasticsearch.config import (
ElasticsearchSettings as SearchSettings,
)


@pytest.mark.asyncio
async def test_bulk_item_insert(ctx, core_client, txn_client, bulk_txn_client):
items = {}
for _ in range(10):
_item = deepcopy(ctx.item)
_item["id"] = str(uuid.uuid4())
items[_item["id"]] = _item

# fc = es_core.item_collection(coll["id"], request=MockStarletteRequest)
# assert len(fc["features"]) == 0

bulk_txn_client.bulk_item_insert(Items(items=items), refresh=True)

fc = await core_client.item_collection(ctx.collection["id"], request=MockRequest())
assert len(fc["features"]) >= 10

# for item in items:
# es_transactions.delete_item(
# item["id"], item["collection"], request=MockStarletteRequest
# )


@pytest.mark.asyncio
async def test_bulk_item_insert_with_raise_on_error(
ctx, core_client, txn_client, bulk_txn_client
):
"""
Test bulk_item_insert behavior with RAISE_ON_BULK_ERROR set to true and false.
This test verifies that when RAISE_ON_BULK_ERROR is set to true, a ConflictError
is raised for conflicting items. When set to false, the operation logs errors
and continues gracefully.
"""

# Insert an initial item to set up a conflict
initial_item = deepcopy(ctx.item)
initial_item["id"] = str(uuid.uuid4())
await create_item(txn_client, initial_item)

# Verify the initial item is inserted
fc = await core_client.item_collection(ctx.collection["id"], request=MockRequest())
assert len(fc["features"]) >= 1

# Create conflicting items (same ID as the initial item)
conflicting_items = {initial_item["id"]: deepcopy(initial_item)}

# Test with RAISE_ON_BULK_ERROR set to true
os.environ["RAISE_ON_BULK_ERROR"] = "true"
bulk_txn_client.database.sync_settings = SearchSettings()

with pytest.raises(ConflictError):
bulk_txn_client.bulk_item_insert(Items(items=conflicting_items), refresh=True)

# Test with RAISE_ON_BULK_ERROR set to false
os.environ["RAISE_ON_BULK_ERROR"] = "false"
bulk_txn_client.database.sync_settings = SearchSettings() # Reinitialize settings
result = bulk_txn_client.bulk_item_insert(
Items(items=conflicting_items), refresh=True
)

# Validate the results
assert "Successfully added/updated 1 Items" in result

# Clean up the inserted item
await txn_client.delete_item(initial_item["id"], ctx.item["collection"])


@pytest.mark.asyncio
async def test_feature_collection_insert(
core_client,
txn_client,
ctx,
):
features = []
for _ in range(10):
_item = deepcopy(ctx.item)
_item["id"] = str(uuid.uuid4())
features.append(_item)

feature_collection = {"type": "FeatureCollection", "features": features}

await create_item(txn_client, feature_collection)

fc = await core_client.item_collection(ctx.collection["id"], request=MockRequest())
assert len(fc["features"]) >= 10


@pytest.mark.asyncio
async def test_bulk_item_insert_validation_error(ctx, core_client, bulk_txn_client):
items = {}
# Add 9 valid items
for _ in range(9):
_item = deepcopy(ctx.item)
_item["id"] = str(uuid.uuid4())
items[_item["id"]] = _item

# Add 1 invalid item (e.g., missing "datetime")
invalid_item = deepcopy(ctx.item)
invalid_item["id"] = str(uuid.uuid4())
invalid_item["properties"].pop(
"datetime", None
) # Remove datetime to make it invalid
items[invalid_item["id"]] = invalid_item

# The bulk insert should raise a ValidationError due to the invalid item
with pytest.raises(ValidationError):
bulk_txn_client.bulk_item_insert(Items(items=items), refresh=True)


@pytest.mark.asyncio
async def test_feature_collection_insert_validation_error(
core_client,
txn_client,
ctx,
):
features = []
# Add 9 valid items
for _ in range(9):
_item = deepcopy(ctx.item)
_item["id"] = str(uuid.uuid4())
features.append(_item)

# Add 1 invalid item (e.g., missing "datetime")
invalid_item = deepcopy(ctx.item)
invalid_item["id"] = str(uuid.uuid4())
invalid_item["properties"].pop(
"datetime", None
) # Remove datetime to make it invalid
features.append(invalid_item)

feature_collection = {"type": "FeatureCollection", "features": features}

# Assert that a ValidationError is raised due to the invalid item
with pytest.raises(ValidationError):
await create_item(txn_client, feature_collection)
Loading