Skip to content

Commit a6b6e10

Browse files
committed
test, validate items in bulk transacton client
1 parent 0f85dcf commit a6b6e10

File tree

3 files changed

+199
-119
lines changed

3 files changed

+199
-119
lines changed

stac_fastapi/core/stac_fastapi/core/core.py

Lines changed: 43 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -693,19 +693,26 @@ async def create_item(
693693
NotFoundError: If the specified collection is not found in the database.
694694
ConflictError: If an item with the same ID already exists in the collection.
695695
"""
696-
item = item.model_dump(mode="json")
697-
base_url = str(kwargs["request"].base_url)
696+
# Ensure request is present
697+
request = kwargs.get("request")
698+
if not request:
699+
raise ValueError("Request must be provided in kwargs")
700+
base_url = str(request.base_url)
701+
702+
# Convert Pydantic model to dict for uniform processing
703+
item_dict = item.model_dump(mode="json")
698704

699-
# If a feature collection is posted
700-
if item["type"] == "FeatureCollection":
705+
# Handle FeatureCollection (bulk insert)
706+
if item_dict["type"] == "FeatureCollection":
701707
bulk_client = BulkTransactionsClient(
702708
database=self.database, settings=self.settings
703709
)
710+
features = item_dict["features"]
704711
processed_items = [
705712
bulk_client.preprocess_item(
706-
item, base_url, BulkTransactionMethod.INSERT
713+
feature, base_url, BulkTransactionMethod.INSERT
707714
)
708-
for item in item["features"]
715+
for feature in features
709716
]
710717
attempted = len(processed_items)
711718
success, errors = await self.database.bulk_async(
@@ -714,19 +721,23 @@ async def create_item(
714721
refresh=kwargs.get("refresh", False),
715722
)
716723
if errors:
717-
logger.error(f"Bulk async operation encountered errors: {errors}")
724+
logger.error(
725+
f"Bulk async operation encountered errors for collection {collection_id}: {errors} (attempted {attempted})"
726+
)
718727
else:
719-
logger.info(f"Bulk async operation succeeded with {success} actions.")
720-
728+
logger.info(
729+
f"Bulk async operation succeeded with {success} actions for collection {collection_id}."
730+
)
721731
return f"Successfully added {success} Items. {attempted - success} errors occurred."
722-
else:
723-
await self.database.create_item(
724-
item,
725-
refresh=kwargs.get("refresh", False),
726-
base_url=base_url,
727-
exist_ok=False,
728-
)
729-
return ItemSerializer.db_to_stac(item, base_url)
732+
733+
# Handle single item
734+
await self.database.create_item(
735+
item_dict,
736+
refresh=kwargs.get("refresh", False),
737+
base_url=base_url,
738+
exist_ok=False,
739+
)
740+
return ItemSerializer.db_to_stac(item_dict, base_url)
730741

731742
@overrides
732743
async def update_item(
@@ -911,12 +922,22 @@ def bulk_item_insert(
911922
else:
912923
base_url = ""
913924

914-
processed_items = [
915-
self.preprocess_item(item, base_url, items.method)
916-
for item in items.items.values()
917-
]
925+
processed_items = []
926+
for item in items.items.values():
927+
try:
928+
validated = Item(**item) if not isinstance(item, Item) else item
929+
processed_items.append(
930+
self.preprocess_item(
931+
validated.model_dump(mode="json"), base_url, items.method
932+
)
933+
)
934+
except ValidationError:
935+
# Immediately raise on the first invalid item (strict mode)
936+
raise
937+
938+
if not processed_items:
939+
return "No valid items to insert."
918940

919-
# not a great way to get the collection_id-- should be part of the method signature
920941
collection_id = processed_items[0]["collection"]
921942
attempted = len(processed_items)
922943
success, errors = self.database.bulk_sync(
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
import os
2+
import uuid
3+
from copy import deepcopy
4+
5+
import pytest
6+
from pydantic import ValidationError
7+
8+
from stac_fastapi.extensions.third_party.bulk_transactions import Items
9+
from stac_fastapi.types.errors import ConflictError
10+
11+
from ..conftest import MockRequest, create_item
12+
13+
if os.getenv("BACKEND", "elasticsearch").lower() == "opensearch":
14+
from stac_fastapi.opensearch.config import OpensearchSettings as SearchSettings
15+
else:
16+
from stac_fastapi.elasticsearch.config import (
17+
ElasticsearchSettings as SearchSettings,
18+
)
19+
20+
21+
@pytest.mark.asyncio
22+
async def test_bulk_item_insert(ctx, core_client, txn_client, bulk_txn_client):
23+
items = {}
24+
for _ in range(10):
25+
_item = deepcopy(ctx.item)
26+
_item["id"] = str(uuid.uuid4())
27+
items[_item["id"]] = _item
28+
29+
# fc = es_core.item_collection(coll["id"], request=MockStarletteRequest)
30+
# assert len(fc["features"]) == 0
31+
32+
bulk_txn_client.bulk_item_insert(Items(items=items), refresh=True)
33+
34+
fc = await core_client.item_collection(ctx.collection["id"], request=MockRequest())
35+
assert len(fc["features"]) >= 10
36+
37+
# for item in items:
38+
# es_transactions.delete_item(
39+
# item["id"], item["collection"], request=MockStarletteRequest
40+
# )
41+
42+
43+
@pytest.mark.asyncio
44+
async def test_bulk_item_insert_with_raise_on_error(
45+
ctx, core_client, txn_client, bulk_txn_client
46+
):
47+
"""
48+
Test bulk_item_insert behavior with RAISE_ON_BULK_ERROR set to true and false.
49+
50+
This test verifies that when RAISE_ON_BULK_ERROR is set to true, a ConflictError
51+
is raised for conflicting items. When set to false, the operation logs errors
52+
and continues gracefully.
53+
"""
54+
55+
# Insert an initial item to set up a conflict
56+
initial_item = deepcopy(ctx.item)
57+
initial_item["id"] = str(uuid.uuid4())
58+
await create_item(txn_client, initial_item)
59+
60+
# Verify the initial item is inserted
61+
fc = await core_client.item_collection(ctx.collection["id"], request=MockRequest())
62+
assert len(fc["features"]) >= 1
63+
64+
# Create conflicting items (same ID as the initial item)
65+
conflicting_items = {initial_item["id"]: deepcopy(initial_item)}
66+
67+
# Test with RAISE_ON_BULK_ERROR set to true
68+
os.environ["RAISE_ON_BULK_ERROR"] = "true"
69+
bulk_txn_client.database.sync_settings = SearchSettings()
70+
71+
with pytest.raises(ConflictError):
72+
bulk_txn_client.bulk_item_insert(Items(items=conflicting_items), refresh=True)
73+
74+
# Test with RAISE_ON_BULK_ERROR set to false
75+
os.environ["RAISE_ON_BULK_ERROR"] = "false"
76+
bulk_txn_client.database.sync_settings = SearchSettings() # Reinitialize settings
77+
result = bulk_txn_client.bulk_item_insert(
78+
Items(items=conflicting_items), refresh=True
79+
)
80+
81+
# Validate the results
82+
assert "Successfully added/updated 1 Items" in result
83+
84+
# Clean up the inserted item
85+
await txn_client.delete_item(initial_item["id"], ctx.item["collection"])
86+
87+
88+
@pytest.mark.asyncio
89+
async def test_feature_collection_insert(
90+
core_client,
91+
txn_client,
92+
ctx,
93+
):
94+
features = []
95+
for _ in range(10):
96+
_item = deepcopy(ctx.item)
97+
_item["id"] = str(uuid.uuid4())
98+
features.append(_item)
99+
100+
feature_collection = {"type": "FeatureCollection", "features": features}
101+
102+
await create_item(txn_client, feature_collection)
103+
104+
fc = await core_client.item_collection(ctx.collection["id"], request=MockRequest())
105+
assert len(fc["features"]) >= 10
106+
107+
108+
@pytest.mark.asyncio
109+
async def test_bulk_item_insert_validation_error(ctx, core_client, bulk_txn_client):
110+
items = {}
111+
# Add 9 valid items
112+
for _ in range(9):
113+
_item = deepcopy(ctx.item)
114+
_item["id"] = str(uuid.uuid4())
115+
items[_item["id"]] = _item
116+
117+
# Add 1 invalid item (e.g., missing "datetime")
118+
invalid_item = deepcopy(ctx.item)
119+
invalid_item["id"] = str(uuid.uuid4())
120+
invalid_item["properties"].pop(
121+
"datetime", None
122+
) # Remove datetime to make it invalid
123+
items[invalid_item["id"]] = invalid_item
124+
125+
# The bulk insert should raise a ValidationError due to the invalid item
126+
with pytest.raises(ValidationError):
127+
bulk_txn_client.bulk_item_insert(Items(items=items), refresh=True)
128+
129+
130+
@pytest.mark.asyncio
131+
async def test_feature_collection_insert_validation_error(
132+
core_client,
133+
txn_client,
134+
ctx,
135+
):
136+
features = []
137+
# Add 9 valid items
138+
for _ in range(9):
139+
_item = deepcopy(ctx.item)
140+
_item["id"] = str(uuid.uuid4())
141+
features.append(_item)
142+
143+
# Add 1 invalid item (e.g., missing "datetime")
144+
invalid_item = deepcopy(ctx.item)
145+
invalid_item["id"] = str(uuid.uuid4())
146+
invalid_item["properties"].pop(
147+
"datetime", None
148+
) # Remove datetime to make it invalid
149+
features.append(invalid_item)
150+
151+
feature_collection = {"type": "FeatureCollection", "features": features}
152+
153+
# Assert that a ValidationError is raised due to the invalid item
154+
with pytest.raises(ValidationError):
155+
await create_item(txn_client, feature_collection)

stac_fastapi/tests/clients/test_es_os.py

Lines changed: 1 addition & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,13 @@
1-
import os
21
import uuid
32
from copy import deepcopy
43
from typing import Callable
54

65
import pytest
76
from stac_pydantic import Item, api
87

9-
from stac_fastapi.extensions.third_party.bulk_transactions import Items
108
from stac_fastapi.types.errors import ConflictError, NotFoundError
119

12-
from ..conftest import MockRequest, create_item
13-
14-
if os.getenv("BACKEND", "elasticsearch").lower() == "opensearch":
15-
from stac_fastapi.opensearch.config import OpensearchSettings as SearchSettings
16-
else:
17-
from stac_fastapi.elasticsearch.config import (
18-
ElasticsearchSettings as SearchSettings,
19-
)
10+
from ..conftest import MockRequest
2011

2112

2213
@pytest.mark.asyncio
@@ -283,93 +274,6 @@ async def test_delete_item(ctx, core_client, txn_client):
283274
)
284275

285276

286-
@pytest.mark.asyncio
287-
async def test_bulk_item_insert(ctx, core_client, txn_client, bulk_txn_client):
288-
items = {}
289-
for _ in range(10):
290-
_item = deepcopy(ctx.item)
291-
_item["id"] = str(uuid.uuid4())
292-
items[_item["id"]] = _item
293-
294-
# fc = es_core.item_collection(coll["id"], request=MockStarletteRequest)
295-
# assert len(fc["features"]) == 0
296-
297-
bulk_txn_client.bulk_item_insert(Items(items=items), refresh=True)
298-
299-
fc = await core_client.item_collection(ctx.collection["id"], request=MockRequest())
300-
assert len(fc["features"]) >= 10
301-
302-
# for item in items:
303-
# es_transactions.delete_item(
304-
# item["id"], item["collection"], request=MockStarletteRequest
305-
# )
306-
307-
308-
@pytest.mark.asyncio
309-
async def test_bulk_item_insert_with_raise_on_error(
310-
ctx, core_client, txn_client, bulk_txn_client
311-
):
312-
"""
313-
Test bulk_item_insert behavior with RAISE_ON_BULK_ERROR set to true and false.
314-
315-
This test verifies that when RAISE_ON_BULK_ERROR is set to true, a ConflictError
316-
is raised for conflicting items. When set to false, the operation logs errors
317-
and continues gracefully.
318-
"""
319-
320-
# Insert an initial item to set up a conflict
321-
initial_item = deepcopy(ctx.item)
322-
initial_item["id"] = str(uuid.uuid4())
323-
await create_item(txn_client, initial_item)
324-
325-
# Verify the initial item is inserted
326-
fc = await core_client.item_collection(ctx.collection["id"], request=MockRequest())
327-
assert len(fc["features"]) >= 1
328-
329-
# Create conflicting items (same ID as the initial item)
330-
conflicting_items = {initial_item["id"]: deepcopy(initial_item)}
331-
332-
# Test with RAISE_ON_BULK_ERROR set to true
333-
os.environ["RAISE_ON_BULK_ERROR"] = "true"
334-
bulk_txn_client.database.sync_settings = SearchSettings()
335-
336-
with pytest.raises(ConflictError):
337-
bulk_txn_client.bulk_item_insert(Items(items=conflicting_items), refresh=True)
338-
339-
# Test with RAISE_ON_BULK_ERROR set to false
340-
os.environ["RAISE_ON_BULK_ERROR"] = "false"
341-
bulk_txn_client.database.sync_settings = SearchSettings() # Reinitialize settings
342-
result = bulk_txn_client.bulk_item_insert(
343-
Items(items=conflicting_items), refresh=True
344-
)
345-
346-
# Validate the results
347-
assert "Successfully added/updated 1 Items" in result
348-
349-
# Clean up the inserted item
350-
await txn_client.delete_item(initial_item["id"], ctx.item["collection"])
351-
352-
353-
@pytest.mark.asyncio
354-
async def test_feature_collection_insert(
355-
core_client,
356-
txn_client,
357-
ctx,
358-
):
359-
features = []
360-
for _ in range(10):
361-
_item = deepcopy(ctx.item)
362-
_item["id"] = str(uuid.uuid4())
363-
features.append(_item)
364-
365-
feature_collection = {"type": "FeatureCollection", "features": features}
366-
367-
await create_item(txn_client, feature_collection)
368-
369-
fc = await core_client.item_collection(ctx.collection["id"], request=MockRequest())
370-
assert len(fc["features"]) >= 10
371-
372-
373277
@pytest.mark.asyncio
374278
async def test_landing_page_no_collection_title(ctx, core_client, txn_client, app):
375279
ctx.collection["id"] = "new_id"

0 commit comments

Comments
 (0)