11use crate :: embeddings:: document_indexer:: split_text_into_chunks;
22use crate :: embeddings:: embedder:: { Embedder , OllamaEmbedder } ;
33use crate :: embeddings:: indexer:: { EmbeddingModel , IndexerProvider } ;
4+ use crate :: local_ai:: chat:: retriever:: MultipleSourceRetrieverStore ;
45use async_trait:: async_trait;
56use flowy_ai_pub:: cloud:: CollabType ;
67use flowy_ai_pub:: entities:: { RAG_IDS , SOURCE_ID } ;
@@ -9,10 +10,8 @@ use flowy_sqlite_vec::db::VectorSqliteDB;
910use flowy_sqlite_vec:: entities:: { EmbeddedContent , SqliteEmbeddedDocument } ;
1011use futures:: stream:: { self , StreamExt } ;
1112use langchain_rust:: llm:: client:: OllamaClient ;
12- use langchain_rust:: {
13- schemas:: Document ,
14- vectorstore:: { VecStoreOptions , VectorStore } ,
15- } ;
13+ use langchain_rust:: schemas:: Document ;
14+ use langchain_rust:: vectorstore:: { VecStoreOptions , VectorStore } ;
1615use ollama_rs:: generation:: embeddings:: request:: { EmbeddingsInput , GenerateEmbeddingsRequest } ;
1716use serde_json:: Value ;
1817use std:: collections:: HashMap ;
@@ -86,6 +85,80 @@ impl SqliteVectorStore {
8685 }
8786}
8887
88+ #[ async_trait]
89+ impl MultipleSourceRetrieverStore for SqliteVectorStore {
90+ fn retriever_name ( & self ) -> & ' static str {
91+ "Sqlite Multiple Source Retriever"
92+ }
93+
94+ async fn read_documents (
95+ & self ,
96+ workspace_id : & Uuid ,
97+ query : & str ,
98+ limit : usize ,
99+ rag_ids : & [ String ] ,
100+ score_threshold : f32 ,
101+ _full_search : bool ,
102+ ) -> FlowyResult < Vec < Document > > {
103+ let vector_db = match self . vector_db . upgrade ( ) {
104+ Some ( db) => db,
105+ None => return Err ( FlowyError :: internal ( ) . with_context ( "Vector database not initialized" ) ) ,
106+ } ;
107+
108+ // Create embedder and generate embedding for query
109+ let embedder = self . create_embedder ( ) ?;
110+ let request = GenerateEmbeddingsRequest :: new (
111+ embedder. model ( ) . name ( ) . to_string ( ) ,
112+ EmbeddingsInput :: Single ( query. to_string ( ) ) ,
113+ ) ;
114+
115+ let embedding = embedder. embed ( request) . await ?. embeddings ;
116+ if embedding. is_empty ( ) {
117+ return Ok ( Vec :: new ( ) ) ;
118+ }
119+
120+ debug_assert ! ( embedding. len( ) == 1 ) ;
121+ let query_embedding = embedding. first ( ) . unwrap ( ) ;
122+
123+ // Perform similarity search in the database
124+ let results = vector_db
125+ . search_with_score (
126+ & workspace_id. to_string ( ) ,
127+ rag_ids,
128+ query_embedding,
129+ limit as i32 ,
130+ score_threshold,
131+ )
132+ . await ?;
133+
134+ trace ! (
135+ "[VectorStore] Found {} results for query:{}, rag_ids: {:?}, score_threshold: {}" ,
136+ results. len( ) ,
137+ query,
138+ rag_ids,
139+ score_threshold
140+ ) ;
141+
142+ // Convert results to Documents
143+ let documents = results
144+ . into_iter ( )
145+ . map ( |result| {
146+ let mut metadata = HashMap :: new ( ) ;
147+
148+ if let Some ( map) = result. metadata . as_ref ( ) . and_then ( |v| v. as_object ( ) ) {
149+ for ( key, value) in map {
150+ metadata. insert ( key. clone ( ) , value. clone ( ) ) ;
151+ }
152+ }
153+
154+ Document :: new ( result. content ) . with_metadata ( metadata)
155+ } )
156+ . collect ( ) ;
157+
158+ Ok ( documents)
159+ }
160+ }
161+
89162#[ async_trait]
90163impl VectorStore for SqliteVectorStore {
91164 type Options = VecStoreOptions < Value > ;
@@ -215,74 +288,23 @@ impl VectorStore for SqliteVectorStore {
215288
216289 // Return empty result if workspace_id is missing
217290 let workspace_id = match workspace_id {
218- Some ( id) => id. to_string ( ) ,
291+ Some ( id) => id,
219292 None => {
220293 warn ! ( "[VectorStore] Missing workspace_id in filters. Returning empty result." ) ;
221294 return Ok ( Vec :: new ( ) ) ;
222295 } ,
223296 } ;
224297
225- // Get the vector database
226- let vector_db = match self . vector_db . upgrade ( ) {
227- Some ( db) => db,
228- None => return Err ( "Vector database not initialized" . into ( ) ) ,
229- } ;
230-
231- // Create embedder and generate embedding for query
232- let embedder = self . create_embedder ( ) ?;
233- let request = GenerateEmbeddingsRequest :: new (
234- embedder. model ( ) . name ( ) . to_string ( ) ,
235- EmbeddingsInput :: Single ( query. to_string ( ) ) ,
236- ) ;
237-
238- let embedding = match embedder. embed ( request) . await {
239- Ok ( result) => result. embeddings ,
240- Err ( e) => return Err ( Box :: new ( e) ) ,
241- } ;
242-
243- if embedding. is_empty ( ) {
244- return Ok ( Vec :: new ( ) ) ;
245- }
246-
247- let score_threshold = opt. score_threshold . unwrap_or ( 0.4 ) ;
248- debug_assert ! ( embedding. len( ) == 1 ) ;
249- let query_embedding = embedding. first ( ) . unwrap ( ) ;
250-
251- // Perform similarity search in the database
252- let results = vector_db
253- . search_with_score (
298+ self
299+ . read_documents (
254300 & workspace_id,
301+ query,
302+ limit,
255303 & rag_ids,
256- query_embedding,
257- limit as i32 ,
258- score_threshold,
304+ opt. score_threshold . unwrap_or ( 0.4 ) ,
305+ true ,
259306 )
260- . await ?;
261-
262- trace ! (
263- "[VectorStore] Found {} results for query:{}, rag_ids: {:?}, score_threshold: {}" ,
264- results. len( ) ,
265- query,
266- rag_ids,
267- score_threshold
268- ) ;
269-
270- // Convert results to Documents
271- let documents = results
272- . into_iter ( )
273- . map ( |result| {
274- let mut metadata = HashMap :: new ( ) ;
275-
276- if let Some ( map) = result. metadata . as_ref ( ) . and_then ( |v| v. as_object ( ) ) {
277- for ( key, value) in map {
278- metadata. insert ( key. clone ( ) , value. clone ( ) ) ;
279- }
280- }
281-
282- Document :: new ( result. content ) . with_metadata ( metadata)
283- } )
284- . collect ( ) ;
285-
286- Ok ( documents)
307+ . await
308+ . map_err ( |err| Box :: new ( err) as Box < dyn Error > )
287309 }
288310}
0 commit comments