Skip to content

Commit 73a77ab

Browse files
authored
Merge pull request open-webui#11245 from tupe2009/kleqon.fix-es-bugs
fix: ElasticSearch indexes delete and upsert bugs(Post 0.5.19 release)
2 parents 1639fbb + a8f2052 commit 73a77ab

File tree

2 files changed

+135
-93
lines changed

2 files changed

+135
-93
lines changed

backend/open_webui/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1553,7 +1553,7 @@ class BannerModel(BaseModel):
15531553
ELASTICSEARCH_PASSWORD = os.environ.get("ELASTICSEARCH_PASSWORD", None)
15541554
ELASTICSEARCH_CLOUD_ID = os.environ.get("ELASTICSEARCH_CLOUD_ID", None)
15551555
SSL_ASSERT_FINGERPRINT = os.environ.get("SSL_ASSERT_FINGERPRINT", None)
1556-
1556+
ELASTICSEARCH_INDEX_PREFIX = os.environ.get("ELASTICSEARCH_INDEX_PREFIX", "open_webui_collections")
15571557
# Pgvector
15581558
PGVECTOR_DB_URL = os.environ.get("PGVECTOR_DB_URL", DATABASE_URL)
15591559
if VECTOR_DB == "pgvector" and not PGVECTOR_DB_URL.startswith("postgres"):

backend/open_webui/retrieval/vector/dbs/elasticsearch.py

Lines changed: 134 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,46 @@
11
from elasticsearch import Elasticsearch, BadRequestError
22
from typing import Optional
33
import ssl
4-
from elasticsearch.helpers import bulk, scan
4+
from elasticsearch.helpers import bulk,scan
55
from open_webui.retrieval.vector.main import VectorItem, SearchResult, GetResult
66
from open_webui.config import (
77
ELASTICSEARCH_URL,
8-
ELASTICSEARCH_CA_CERTS,
8+
ELASTICSEARCH_CA_CERTS,
99
ELASTICSEARCH_API_KEY,
1010
ELASTICSEARCH_USERNAME,
11-
ELASTICSEARCH_PASSWORD,
11+
ELASTICSEARCH_PASSWORD,
1212
ELASTICSEARCH_CLOUD_ID,
13+
ELASTICSEARCH_INDEX_PREFIX,
1314
SSL_ASSERT_FINGERPRINT,
15+
1416
)
1517

1618

19+
20+
1721
class ElasticsearchClient:
1822
"""
1923
Important:
20-
in order to reduce the number of indexes and since the embedding vector length is fixed, we avoid creating
21-
an index for each file but store it as a text field, while seperating to different index
24+
in order to reduce the number of indexes and since the embedding vector length is fixed, we avoid creating
25+
an index for each file but store it as a text field, while seperating to different index
2226
baesd on the embedding length.
2327
"""
24-
2528
def __init__(self):
26-
self.index_prefix = "open_webui_collections"
29+
self.index_prefix = ELASTICSEARCH_INDEX_PREFIX
2730
self.client = Elasticsearch(
2831
hosts=[ELASTICSEARCH_URL],
2932
ca_certs=ELASTICSEARCH_CA_CERTS,
3033
api_key=ELASTICSEARCH_API_KEY,
3134
cloud_id=ELASTICSEARCH_CLOUD_ID,
32-
basic_auth=(
33-
(ELASTICSEARCH_USERNAME, ELASTICSEARCH_PASSWORD)
34-
if ELASTICSEARCH_USERNAME and ELASTICSEARCH_PASSWORD
35-
else None
36-
),
37-
ssl_assert_fingerprint=SSL_ASSERT_FINGERPRINT,
35+
basic_auth=(ELASTICSEARCH_USERNAME,ELASTICSEARCH_PASSWORD) if ELASTICSEARCH_USERNAME and ELASTICSEARCH_PASSWORD else None,
36+
ssl_assert_fingerprint=SSL_ASSERT_FINGERPRINT
37+
3838
)
39-
40-
# Status: works
41-
def _get_index_name(self, dimension: int) -> str:
39+
#Status: works
40+
def _get_index_name(self,dimension:int)->str:
4241
return f"{self.index_prefix}_d{str(dimension)}"
43-
44-
# Status: works
42+
43+
#Status: works
4544
def _scan_result_to_get_result(self, result) -> GetResult:
4645
if not result:
4746
return None
@@ -56,7 +55,7 @@ def _scan_result_to_get_result(self, result) -> GetResult:
5655

5756
return GetResult(ids=[ids], documents=[documents], metadatas=[metadatas])
5857

59-
# Status: works
58+
#Status: works
6059
def _result_to_get_result(self, result) -> GetResult:
6160
if not result["hits"]["hits"]:
6261
return None
@@ -71,7 +70,7 @@ def _result_to_get_result(self, result) -> GetResult:
7170

7271
return GetResult(ids=[ids], documents=[documents], metadatas=[metadatas])
7372

74-
# Status: works
73+
#Status: works
7574
def _result_to_search_result(self, result) -> SearchResult:
7675
ids = []
7776
distances = []
@@ -85,16 +84,22 @@ def _result_to_search_result(self, result) -> SearchResult:
8584
metadatas.append(hit["_source"].get("metadata"))
8685

8786
return SearchResult(
88-
ids=[ids],
89-
distances=[distances],
90-
documents=[documents],
91-
metadatas=[metadatas],
87+
ids=[ids], distances=[distances], documents=[documents], metadatas=[metadatas]
9288
)
93-
94-
# Status: works
89+
#Status: works
9590
def _create_index(self, dimension: int):
9691
body = {
9792
"mappings": {
93+
"dynamic_templates": [
94+
{
95+
"strings": {
96+
"match_mapping_type": "string",
97+
"mapping": {
98+
"type": "keyword"
99+
}
100+
}
101+
}
102+
],
98103
"properties": {
99104
"collection": {"type": "keyword"},
100105
"id": {"type": "keyword"},
@@ -110,51 +115,64 @@ def _create_index(self, dimension: int):
110115
}
111116
}
112117
self.client.indices.create(index=self._get_index_name(dimension), body=body)
113-
114-
# Status: works
118+
#Status: works
115119

116120
def _create_batches(self, items: list[VectorItem], batch_size=100):
117121
for i in range(0, len(items), batch_size):
118-
yield items[i : min(i + batch_size, len(items))]
122+
yield items[i : min(i + batch_size,len(items))]
119123

120-
# Status: works
121-
def has_collection(self, collection_name) -> bool:
124+
#Status: works
125+
def has_collection(self,collection_name) -> bool:
122126
query_body = {"query": {"bool": {"filter": []}}}
123-
query_body["query"]["bool"]["filter"].append(
124-
{"term": {"collection": collection_name}}
125-
)
127+
query_body["query"]["bool"]["filter"].append({"term": {"collection": collection_name}})
126128

127129
try:
128-
result = self.client.count(index=f"{self.index_prefix}*", body=query_body)
129-
130-
return result.body["count"] > 0
130+
result = self.client.count(
131+
index=f"{self.index_prefix}*",
132+
body=query_body
133+
)
134+
135+
return result.body["count"]>0
131136
except Exception as e:
132137
return None
138+
133139

134-
# @TODO: Make this delete a collection and not an index
135-
def delete_colleciton(self, collection_name: str):
136-
# TODO: fix this to include the dimension or a * prefix
137-
# delete_collection here means delete a bunch of documents for an index.
138-
# We are simply adapting to the norms of the other DBs.
139-
self.client.indices.delete(index=self._get_collection_name(collection_name))
140-
141-
# Status: works
140+
141+
def delete_collection(self, collection_name: str):
142+
query = {
143+
"query": {
144+
"term": {"collection": collection_name}
145+
}
146+
}
147+
self.client.delete_by_query(index=f"{self.index_prefix}*", body=query)
148+
#Status: works
142149
def search(
143150
self, collection_name: str, vectors: list[list[float]], limit: int
144151
) -> Optional[SearchResult]:
145152
query = {
146153
"size": limit,
147-
"_source": ["text", "metadata"],
154+
"_source": [
155+
"text",
156+
"metadata"
157+
],
148158
"query": {
149159
"script_score": {
150160
"query": {
151-
"bool": {"filter": [{"term": {"collection": collection_name}}]}
161+
"bool": {
162+
"filter": [
163+
{
164+
"term": {
165+
"collection": collection_name
166+
}
167+
}
168+
]
169+
}
152170
},
153171
"script": {
154172
"source": "cosineSimilarity(params.vector, 'vector') + 1.0",
155173
"params": {
156174
"vector": vectors[0]
157-
}, # Assuming single query vector
175+
}, # Assuming single query vector
158176
},
159177
}
160178
},
@@ -165,8 +183,7 @@ def search(
165183
)
166184

167185
return self._result_to_search_result(result)
168-
169-
# Status: only tested halfwat
186+
#Status: only tested halfwat
170187
def query(
171188
self, collection_name: str, filter: dict, limit: Optional[int] = None
172189
) -> Optional[GetResult]:
@@ -180,9 +197,7 @@ def query(
180197

181198
for field, value in filter.items():
182199
query_body["query"]["bool"]["filter"].append({"term": {field: value}})
183-
query_body["query"]["bool"]["filter"].append(
184-
{"term": {"collection": collection_name}}
185-
)
200+
query_body["query"]["bool"]["filter"].append({"term": {"collection": collection_name}})
186201
size = limit if limit else 10
187202

188203
try:
@@ -191,82 +206,109 @@ def query(
191206
body=query_body,
192207
size=size,
193208
)
194-
209+
195210
return self._result_to_get_result(result)
196211

197212
except Exception as e:
198213
return None
214+
#Status: works
215+
def _has_index(self,dimension:int):
216+
return self.client.indices.exists(index=self._get_index_name(dimension=dimension))
199217

200-
# Status: works
201-
def _has_index(self, dimension: int):
202-
return self.client.indices.exists(
203-
index=self._get_index_name(dimension=dimension)
204-
)
205218

206219
def get_or_create_index(self, dimension: int):
207220
if not self._has_index(dimension=dimension):
208221
self._create_index(dimension=dimension)
209-
210-
# Status: works
222+
#Status: works
211223
def get(self, collection_name: str) -> Optional[GetResult]:
212224
# Get all the items in the collection.
213225
query = {
214-
"query": {"bool": {"filter": [{"term": {"collection": collection_name}}]}},
215-
"_source": ["text", "metadata"],
216-
}
226+
"query": {
227+
"bool": {
228+
"filter": [
229+
{
230+
"term": {
231+
"collection": collection_name
232+
}
233+
}
234+
]
235+
}
236+
}, "_source": ["text", "metadata"]}
217237
results = list(scan(self.client, index=f"{self.index_prefix}*", query=query))
218-
238+
219239
return self._scan_result_to_get_result(results)
220240

221-
# Status: works
241+
#Status: works
222242
def insert(self, collection_name: str, items: list[VectorItem]):
223243
if not self._has_index(dimension=len(items[0]["vector"])):
224244
self._create_index(dimension=len(items[0]["vector"]))
225245

246+
226247
for batch in self._create_batches(items):
227248
actions = [
228-
{
229-
"_index": self._get_index_name(dimension=len(items[0]["vector"])),
230-
"_id": item["id"],
231-
"_source": {
232-
"collection": collection_name,
233-
"vector": item["vector"],
234-
"text": item["text"],
235-
"metadata": item["metadata"],
236-
},
237-
}
249+
{
250+
"_index":self._get_index_name(dimension=len(items[0]["vector"])),
251+
"_id": item["id"],
252+
"_source": {
253+
"collection": collection_name,
254+
"vector": item["vector"],
255+
"text": item["text"],
256+
"metadata": item["metadata"],
257+
},
258+
}
238259
for item in batch
239260
]
240-
bulk(self.client, actions)
261+
bulk(self.client,actions)
241262

242-
# Status: should work
263+
# Upsert documents using the update API with doc_as_upsert=True.
243264
def upsert(self, collection_name: str, items: list[VectorItem]):
244265
if not self._has_index(dimension=len(items[0]["vector"])):
245-
self._create_index(collection_name, dimension=len(items[0]["vector"]))
246-
266+
self._create_index(dimension=len(items[0]["vector"]))
247267
for batch in self._create_batches(items):
248268
actions = [
249269
{
250-
"_index": self._get_index_name(dimension=len(items[0]["vector"])),
270+
"_op_type": "update",
271+
"_index": self._get_index_name(dimension=len(item["vector"])),
251272
"_id": item["id"],
252-
"_source": {
273+
"doc": {
274+
"collection": collection_name,
253275
"vector": item["vector"],
254276
"text": item["text"],
255277
"metadata": item["metadata"],
256278
},
279+
"doc_as_upsert": True,
257280
}
258281
for item in batch
259282
]
260-
self.client.bulk(actions)
261-
262-
# TODO: This currently deletes by * which is not always supported in ElasticSearch.
263-
# Need to read a bit before changing. Also, need to delete from a specific collection
264-
def delete(self, collection_name: str, ids: list[str]):
265-
# Assuming ID is unique across collections and indexes
266-
actions = [
267-
{"delete": {"_index": f"{self.index_prefix}*", "_id": id}} for id in ids
268-
]
269-
self.client.bulk(body=actions)
283+
bulk(self.client,actions)
284+
285+
286+
# Delete specific documents from a collection by filtering on both collection and document IDs.
287+
def delete(
288+
self,
289+
collection_name: str,
290+
ids: Optional[list[str]] = None,
291+
filter: Optional[dict] = None,
292+
):
293+
294+
query = {
295+
"query": {
296+
"bool": {
297+
"filter": [
298+
{"term": {"collection": collection_name}}
299+
]
300+
}
301+
}
302+
}
303+
#logic based on chromaDB
304+
if ids:
305+
query["query"]["bool"]["filter"].append({"terms": {"_id": ids}})
306+
elif filter:
307+
for field, value in filter.items():
308+
query["query"]["bool"]["filter"].append({"term": {f"metadata.{field}": value}})
309+
310+
311+
self.client.delete_by_query(index=f"{self.index_prefix}*", body=query)
270312

271313
def reset(self):
272314
indices = self.client.indices.get(index=f"{self.index_prefix}*")

0 commit comments

Comments
 (0)