1919# https://www.timescale.com/blog/postgresql-as-a-vector-database-create-store-and-query-openai-embeddings-with-pgvector/
2020# for providing the base implementation for this indexing functionality
2121
22+ import json
2223import logging
2324import math
24- from typing import Annotated , List
25+ from typing import Annotated
2526
2627from constants import (
2728 CHUNK_OVERLAP ,
4142
4243@step
4344def preprocess_documents (
44- documents : List [ Document ] ,
45- ) -> Annotated [List [ Document ] , ArtifactConfig (name = "split_chunks" )]:
45+ documents : str ,
46+ ) -> Annotated [str , ArtifactConfig (name = "split_chunks" )]:
4647 """
47- Preprocesses a list of documents by splitting them into chunks.
48+ Preprocesses a JSON string of documents by splitting them into chunks.
4849
4950 Args:
50- documents (List[Document] ): A list of documents to be preprocessed.
51+ documents (str ): A JSON string containing a list of documents to be preprocessed.
5152
5253 Returns:
53- Annotated[List[Document] , ArtifactConfig(name="split_chunks")]: A list of preprocessed documents annotated with an ArtifactConfig.
54+ Annotated[str , ArtifactConfig(name="split_chunks")]: A JSON string containing a list of preprocessed documents annotated with an ArtifactConfig.
5455
5556 Raises:
5657 Exception: If an error occurs during preprocessing.
@@ -64,29 +65,34 @@ def preprocess_documents(
6465 },
6566 )
6667
68+ # Parse the JSON string into a list of Document objects
69+ document_list = [Document (** doc ) for doc in json .loads (documents )]
70+
6771 split_docs = split_documents (
68- documents , chunk_size = CHUNK_SIZE , chunk_overlap = CHUNK_OVERLAP
72+ document_list , chunk_size = CHUNK_SIZE , chunk_overlap = CHUNK_OVERLAP
6973 )
70- return split_docs
74+
75+ # Convert the list of Document objects back to a JSON string
76+ split_docs_json = json .dumps ([doc .__dict__ for doc in split_docs ])
77+
78+ return split_docs_json
7179 except Exception as e :
7280 logger .error (f"Error in preprocess_documents: { e } " )
7381 raise
7482
7583
7684@step
7785def generate_embeddings (
78- split_documents : List [Document ],
79- ) -> Annotated [
80- List [Document ], ArtifactConfig (name = "documents_with_embeddings" )
81- ]:
86+ split_documents : str ,
87+ ) -> Annotated [str , ArtifactConfig (name = "documents_with_embeddings" )]:
8288 """
8389 Generates embeddings for a list of split documents using a SentenceTransformer model.
8490
8591 Args:
8692 split_documents (List[Document]): A list of Document objects that have been split into chunks.
8793
8894 Returns:
89- Annotated[List[Document] , ArtifactConfig(name="embeddings ")]: The list of Document objects with generated embeddings, annotated with an ArtifactConfig.
95+ Annotated[str , ArtifactConfig(name="documents_with_embeddings ")]: A JSON string containing the Document objects with generated embeddings, annotated with an ArtifactConfig.
9096
9197 Raises:
9298 Exception: If an error occurs during the generation of embeddings.
@@ -95,28 +101,36 @@ def generate_embeddings(
95101 model = SentenceTransformer (EMBEDDINGS_MODEL )
96102
97103 log_artifact_metadata (
98- artifact_name = "embeddings " ,
104+ artifact_name = "documents_with_embeddings " ,
99105 metadata = {
100106 "embedding_type" : EMBEDDINGS_MODEL ,
101107 "embedding_dimensionality" : EMBEDDING_DIMENSIONALITY ,
102108 },
103109 )
104110
105- document_texts = [doc .page_content for doc in split_documents ]
111+ # Parse the JSON string into a list of Document objects
112+ document_list = [
113+ Document (** doc ) for doc in json .loads (split_documents )
114+ ]
115+
116+ document_texts = [doc .page_content for doc in document_list ]
106117 embeddings = model .encode (document_texts )
107118
108- for doc , embedding in zip (split_documents , embeddings ):
109- doc .embedding = embedding
119+ for doc , embedding in zip (document_list , embeddings ):
120+ doc .embedding = embedding .tolist ()
121+
122+ # Convert the list of Document objects to a JSON string
123+ documents_json = json .dumps ([doc .__dict__ for doc in document_list ])
110124
111- return split_documents
125+ return documents_json
112126 except Exception as e :
113127 logger .error (f"Error in generate_embeddings: { e } " )
114128 raise
115129
116130
117131@step
118132def index_generator (
119- documents : List [ Document ] ,
133+ documents : str ,
120134) -> None :
121135 """Generates an index for the given documents.
122136
@@ -126,7 +140,7 @@ def index_generator(
126140 using the cosine distance measure.
127141
128142 Args:
129- documents (List[Document] ): The list of Document objects with generated embeddings.
143+ documents (str ): A JSON string containing the Document objects with generated embeddings.
130144
131145 Raises:
132146 Exception: If an error occurs during the index generation.
@@ -155,11 +169,14 @@ def index_generator(
155169
156170 register_vector (conn )
157171
172+ # Parse the JSON string into a list of Document objects
173+ document_list = [Document (** doc ) for doc in json .loads (documents )]
174+
158175 # Insert data only if it doesn't already exist
159- for doc in documents :
176+ for doc in document_list :
160177 content = doc .page_content
161178 token_count = doc .token_count
162- embedding = doc .embedding . tolist ()
179+ embedding = doc .embedding
163180 filename = doc .filename
164181 parent_section = doc .parent_section
165182 url = doc .url
0 commit comments