@@ -186,7 +186,7 @@ def query(self, query: str, **kwargs: Any) -> list[str]:
186186 assert collection_name is not None , "Collection name is required."
187187 if not self ._opensearch_client .indices .exists (index = collection_name ):
188188 logger .warning (
189- f"querying { query } , but collection { collection_name } does not exist. retun a empty list."
189+ f"querying { query } , but collection { collection_name } does not exist. return a empty list."
190190 )
191191 return []
192192 query_vector = self ._embedding_client .embed_query (query )
@@ -196,10 +196,58 @@ def query(self, query: str, **kwargs: Any) -> list[str]:
196196
197197 @override
198198 def delete (self , collection_name : str , ** kwargs : Any ):
199+ """drop index"""
199200 if not self ._opensearch_client .indices .exists (index = collection_name ):
200201 raise ValueError (f"Collection { collection_name } does not exist." )
201202 self ._opensearch_client .indices .delete (index = collection_name )
202203
203- def is_empty (self , collection_name : str ):
204+ def is_empty (self , collection_name : str ) -> bool :
204205 response = self ._opensearch_client .count (index = collection_name )
205206 return response ["count" ] == 0
207+
208+ def collection_exists (self , collection_name : str ) -> bool :
209+ return self ._opensearch_client .indices .exists (index = collection_name )
210+
211+ def list_all_collection (self ) -> list :
212+ """List all index name of OpenSearch."""
213+ response = self ._opensearch_client .indices .get_alias ()
214+ return list (response .keys ())
215+
216+ def get_all_docs (self , collection_name : str , size : int = 10000 ) -> list [dict ]:
217+ """Match all docs in one index of OpenSearch"""
218+ if not self .collection_exists (collection_name ):
219+ logger .warning (
220+ f"Get all docs, but collection { collection_name } does not exist. return a empty list."
221+ )
222+ return []
223+
224+ query = {"size" : size , "query" : {"match_all" : {}}}
225+ response = self ._opensearch_client .search (index = collection_name , body = query )
226+ return [
227+ {
228+ "id" : hit ["_id" ],
229+ "page_content" : hit ["_source" ]["page_content" ],
230+ }
231+ for hit in response ["hits" ]["hits" ]
232+ ]
233+
234+ def delete_by_query (self , collection_name : str , query : str ):
235+ """Delete docs by query in one index of OpenSearch"""
236+ if not self .collection_exists (collection_name ):
237+ raise ValueError (f"Collection { collection_name } does not exist." )
238+
239+ query = {"query" : {"match" : {"page_content" : query }}}
240+ response = self ._opensearch_client .delete_by_query (
241+ index = collection_name , body = query
242+ )
243+ self ._opensearch_client .indices .refresh (index = collection_name )
244+ return response
245+
246+ def delete_by_id (self , collection_name : str , id : str ):
247+ """Delete docs by id in index of OpenSearch"""
248+ if not self .collection_exists (collection_name ):
249+ raise ValueError (f"Collection { collection_name } does not exist." )
250+
251+ response = self ._opensearch_client .delete (index = collection_name , id = id )
252+ self ._opensearch_client .indices .refresh (index = collection_name )
253+ return response
0 commit comments