55from llama_index .core .schema import Document as LlamaDocument
66
77from llmstack .common .blocks .data .store .vectorstore import Document
8- from llmstack .data .models import DataSource
9- from llmstack .data .schemas import DataDocument
8+ from llmstack .data .sources .base import DataDocument
109
1110logger = logging .getLogger (__name__ )
1211
@@ -18,17 +17,15 @@ class LlamaDocumentShim(LlamaDocument):
1817
1918
2019class DataIngestionPipeline :
21- def __init__ (self , datasource : DataSource ):
20+ def __init__ (self , datasource ):
2221 self .datasource = datasource
2322 self ._source_cls = self .datasource .pipeline_obj .source_cls
2423 self ._destination_cls = self .datasource .pipeline_obj .destination_cls
25- logger .debug ("Initializing DataIngestionPipeline" )
2624
2725 self ._destination = None
2826 self ._transformations = self .datasource .pipeline_obj .transformation_objs
2927 embedding_cls = self .datasource .pipeline_obj .embedding_cls
3028 if embedding_cls :
31- logger .debug ("Initializing DataIngestionPipeline Transformation" )
3229 embedding_additional_kwargs = {
3330 ** self .datasource .pipeline_obj .embedding .data .get ("additional_kwargs" , {}),
3431 ** {"datasource" : datasource },
@@ -41,29 +38,21 @@ def __init__(self, datasource: DataSource):
4138 }
4239 )
4340 )
44- logger .debug ("Finished Initializing DataIngestionPipeline Transformation" )
4541
4642 if self ._destination_cls :
47- logger .debug ("Initializing DataIngestionPipeline Destination" )
4843 self ._destination = self ._destination_cls (** self .datasource .pipeline_obj .destination_data )
4944 self ._destination .initialize_client (datasource = self .datasource , create_collection = True )
50- logger .debug ("Finished Initializing DataIngestionPipeline Destination" )
5145
5246 def process (self , document : DataDocument ) -> DataDocument :
53- logger .debug (f"Processing document: { document .name } " )
5447 document = self ._source_cls .process_document (document )
55- logger .debug (f"Creating IngestionPipeline for document: { document .name } " )
5648 ingestion_pipeline = IngestionPipeline (transformations = self ._transformations )
5749 ldoc = LlamaDocumentShim (** document .model_dump ())
5850 ldoc .metadata = {** ldoc .metadata , ** document .metadata }
59- logger .debug (f"Running IngestionPipeline for document: { document .name } " )
6051 document .nodes = ingestion_pipeline .run (documents = [ldoc ])
61- logger .debug (f"Finished running IngestionPipeline for document: { document .name } " )
6252 document .node_ids = list (map (lambda x : x .id_ , document .nodes ))
53+
6354 if self ._destination :
64- logger .debug (f"Adding document: { document .name } to destination" )
6555 self ._destination .add (document = document )
66- logger .debug (f"Finished adding document: { document .name } to destination" )
6756
6857 return document
6958
@@ -80,55 +69,44 @@ def delete_all_entries(self) -> None:
8069
8170
8271class DataQueryPipeline :
83- def __init__ (self , datasource : DataSource ):
72+ def __init__ (self , datasource ):
8473 self .datasource = datasource
8574 self ._destination_cls = self .datasource .pipeline_obj .destination_cls
8675 self ._destination = None
8776 self ._embedding_generator = None
88- logger .debug ("Initializing DataQueryPipeline" )
8977
9078 if self ._destination_cls :
91- logger .debug ("Initializing DataQueryPipeline Destination" )
9279 self ._destination = self ._destination_cls (** self .datasource .pipeline_obj .destination_data )
9380 self ._destination .initialize_client (datasource = self .datasource , create_collection = False )
94- logger .debug ("Finished Initializing DataQueryPipeline Destination" )
9581
9682 if self .datasource .pipeline_obj .embedding :
97- logger .debug ("Initializing DataQueryPipeline Embedding" )
9883 embedding_data = self .datasource .pipeline_obj .embedding .data
9984 embedding_data ["additional_kwargs" ] = {
10085 ** embedding_data .get ("additional_kwargs" , {}),
10186 ** {"datasource" : self .datasource },
10287 }
10388 self ._embedding_generator = self .datasource .pipeline_obj .embedding_cls (** embedding_data )
104- logger .debug ("Finished Initializing DataQueryPipeline Embedding" )
10589
10690 def search (self , query : str , use_hybrid_search = True , ** kwargs ) -> List [dict ]:
10791 content_key = self .datasource .destination_text_content_key
10892 query_embedding = None
10993
110- logger .debug (f"Initializing Search for query: { query } " )
111-
11294 if kwargs .get ("search_filters" , None ):
11395 raise NotImplementedError ("Search filters are not supported for this data source." )
11496
11597 documents = []
11698
11799 if self ._embedding_generator :
118- logger .debug ("Generating embedding for query" )
119100 query_embedding = self ._embedding_generator .get_embedding (query )
120- logger .debug ("Finished generating embedding for query" )
121101
122102 if self ._destination :
123- logger .debug (f"Searching for query: { query } in destination" )
124103 query_result = self ._destination .search (
125104 query = query ,
126105 use_hybrid_search = use_hybrid_search ,
127106 query_embedding = query_embedding ,
128107 datasource_uuid = str (self .datasource .uuid ),
129108 ** kwargs ,
130109 )
131- logger .debug (f"Received results for query: { query } from destination" )
132110 documents = list (
133111 map (
134112 lambda x : Document (page_content_key = content_key , page_content = x .text , metadata = x .metadata ),
0 commit comments