Skip to content

Commit 917ddab

Browse files
committed
cache refresh during index removal
1 parent 03ab2e8 commit 917ddab

File tree

4 files changed

+45
-22
lines changed

4 files changed

+45
-22
lines changed

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1596,6 +1596,7 @@ async def delete_collection(self, collection_id: str, **kwargs: Any):
15961596
index=COLLECTIONS_INDEX, id=collection_id, refresh=refresh
15971597
)
15981598
await delete_item_index(collection_id)
1599+
await self.async_index_inserter.refresh_cache()
15991600

16001601
async def bulk_async(
16011602
self,

stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1574,6 +1574,7 @@ async def delete_collection(self, collection_id: str, **kwargs: Any):
15741574
)
15751575
# Delete the item index for the collection
15761576
await delete_item_index(collection_id)
1577+
await self.async_index_inserter.refresh_cache()
15771578

15781579
async def bulk_async(
15791580
self,

stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/base.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,21 @@ async def create_simple_index(self, client: Any, collection_id: str) -> str:
4949
str: Created index name.
5050
"""
5151
pass
52+
53+
@staticmethod
54+
@abstractmethod
55+
def should_create_collection_index() -> bool:
56+
"""Whether this strategy requires collection index creation.
57+
58+
Returns:
59+
bool: True if strategy creates collection indexes, False otherwise.
60+
"""
61+
pass
62+
63+
async def refresh_cache(self) -> None:
64+
"""Refresh internal cache if applicable.
65+
66+
Default implementation does nothing. Subclasses that maintain
67+
internal caches should override this method.
68+
"""
69+
pass

stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/search_engine/inserters.py

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ def __init__(self, client: Any, index_operations: IndexOperations):
3434
self.client = client
3535
self.index_operations = index_operations
3636
self.datetime_manager = DatetimeIndexManager(client, index_operations)
37+
self.index_selector = DatetimeBasedIndexSelector(client)
3738

3839
@property
3940
def use_datetime(self) -> bool:
@@ -74,6 +75,14 @@ async def create_simple_index(self, client: Any, collection_id: str) -> str:
7475
"""
7576
return await self.index_operations.create_simple_index(client, collection_id)
7677

78+
async def refresh_cache(self) -> None:
79+
"""Refresh the index selector cache.
80+
81+
This method refreshes the cached index information used for
82+
datetime-based index selection.
83+
"""
84+
await self.index_selector.refresh_cache()
85+
7786
async def get_target_index(
7887
self, collection_id: str, product: Dict[str, Any]
7988
) -> str:
@@ -86,9 +95,8 @@ async def get_target_index(
8695
Returns:
8796
str: Target index name for the product.
8897
"""
89-
index_selector = DatetimeBasedIndexSelector(self.client)
9098
return await self._get_target_index_internal(
91-
index_selector, collection_id, product, check_size=True
99+
collection_id, product, check_size=True
92100
)
93101

94102
async def prepare_bulk_actions(
@@ -109,17 +117,16 @@ async def prepare_bulk_actions(
109117
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=msg)
110118

111119
items.sort(key=lambda item: item["properties"][self.primary_datetime_name])
112-
index_selector = DatetimeBasedIndexSelector(self.client)
113120

114-
await self._ensure_indexes_exist(index_selector, collection_id, items)
121+
await self._ensure_indexes_exist(collection_id, items)
115122
await self._check_and_handle_oversized_index(
116-
index_selector, collection_id, items
123+
collection_id, items
117124
)
118125

119126
actions = []
120127
for item in items:
121128
target_index = await self._get_target_index_internal(
122-
index_selector, collection_id, item, check_size=False
129+
collection_id, item, check_size=False
123130
)
124131
actions.append(
125132
{
@@ -133,15 +140,13 @@ async def prepare_bulk_actions(
133140

134141
async def _get_target_index_internal(
135142
self,
136-
index_selector,
137143
collection_id: str,
138144
product: Dict[str, Any],
139145
check_size: bool = True,
140146
) -> Optional[str]:
141147
"""Get target index with size checking internally.
142148
143149
Args:
144-
index_selector: Index selector instance.
145150
collection_id (str): Collection identifier.
146151
product (Dict[str, Any]): Product data.
147152
check_size (bool): Whetheru to check index size limits.
@@ -159,16 +164,16 @@ async def _get_target_index_internal(
159164
else product_datetimes.start_datetime
160165
)
161166

162-
target_index = await index_selector.select_indexes(
167+
target_index = await self.index_selector.select_indexes(
163168
[collection_id], primary_datetime_value, for_insertion=True
164169
)
165-
all_indexes = await index_selector.get_collection_indexes(collection_id)
170+
all_indexes = await self.index_selector.get_collection_indexes(collection_id)
166171

167172
if not all_indexes:
168173
target_index = await self.datetime_manager.handle_new_collection(
169174
collection_id, self.primary_datetime_name, product_datetimes
170175
)
171-
await index_selector.refresh_cache()
176+
await self.refresh_cache()
172177
return target_index
173178

174179
all_indexes = sorted(
@@ -186,7 +191,7 @@ async def _get_target_index_internal(
186191
product_datetimes,
187192
all_indexes[0][0],
188193
)
189-
await index_selector.refresh_cache()
194+
await self.refresh_cache()
190195
return alias
191196

192197
if target_index != all_indexes[-1][0][self.primary_datetime_name]:
@@ -213,7 +218,7 @@ async def _get_target_index_internal(
213218
product_datetimes,
214219
aliases_dict,
215220
)
216-
await index_selector.refresh_cache()
221+
await self.refresh_cache()
217222
return target_index
218223

219224
for item in all_indexes:
@@ -229,16 +234,15 @@ async def _get_target_index_internal(
229234
return None
230235

231236
async def _ensure_indexes_exist(
232-
self, index_selector, collection_id: str, items: List[Dict[str, Any]]
237+
self, collection_id: str, items: List[Dict[str, Any]]
233238
):
234239
"""Ensure necessary indexes exist for the items.
235240
236241
Args:
237-
index_selector: Index selector instance.
238242
collection_id (str): Collection identifier.
239243
items (List[Dict[str, Any]]): List of items to process.
240244
"""
241-
all_indexes = await index_selector.get_collection_indexes(collection_id)
245+
all_indexes = await self.index_selector.get_collection_indexes(collection_id)
242246

243247
if not all_indexes:
244248
first_item = items[0]
@@ -260,18 +264,17 @@ async def _ensure_indexes_exist(
260264
collection_id,
261265
**index_params,
262266
)
263-
await index_selector.refresh_cache()
267+
await self.refresh_cache()
264268

265269
async def _check_and_handle_oversized_index(
266-
self, index_selector, collection_id: str, items: List[Dict[str, Any]]
270+
self, collection_id: str, items: List[Dict[str, Any]]
267271
) -> None:
268272
"""Check if index is oversized and create new index if needed.
269273
270274
Checks if the index where the first item would be inserted is oversized.
271275
If so, creates a new index starting from the next day.
272276
273277
Args:
274-
index_selector: Index selector instance.
275278
collection_id (str): Collection identifier.
276279
items (List[Dict[str, Any]]): List of items to process.
277280
@@ -280,10 +283,10 @@ async def _check_and_handle_oversized_index(
280283
"""
281284
first_item = items[0]
282285
first_item_index = await self._get_target_index_internal(
283-
index_selector, collection_id, first_item, check_size=False
286+
collection_id, first_item, check_size=False
284287
)
285288

286-
all_indexes = await index_selector.get_collection_indexes(collection_id)
289+
all_indexes = await self.index_selector.get_collection_indexes(collection_id)
287290
all_indexes = sorted(
288291
all_indexes, key=lambda x: x[0][self.primary_datetime_name]
289292
)
@@ -313,7 +316,7 @@ async def _check_and_handle_oversized_index(
313316
product_datetimes,
314317
all_indexes[-1][0],
315318
)
316-
await index_selector.refresh_cache()
319+
await self.refresh_cache()
317320

318321

319322
class SimpleIndexInserter(BaseIndexInserter):

0 commit comments

Comments
 (0)