1919# https://www.timescale.com/blog/postgresql-as-a-vector-database-create-store-and-query-openai-embeddings-with-pgvector/
2020# for providing the base implementation for this indexing functionality
2121
22+ import json
2223import logging
2324import math
24- from typing import Annotated , List
25+ from typing import Annotated
2526
2627from constants import (
2728 CHUNK_OVERLAP ,
4142
4243@step
4344def preprocess_documents (
44- documents : List [ Document ] ,
45- ) -> Annotated [List [ Document ] , ArtifactConfig (name = "split_chunks" )]:
45+ documents : str ,
46+ ) -> Annotated [str , ArtifactConfig (name = "split_chunks" )]:
4647 """
47- Preprocesses a list of documents by splitting them into chunks.
48+ Preprocesses a JSON string of documents by splitting them into chunks.
4849
4950 Args:
50- documents (List[Document] ): A list of documents to be preprocessed.
51+ documents (str ): A JSON string containing a list of documents to be preprocessed.
5152
5253 Returns:
53- Annotated[List[Document] , ArtifactConfig(name="split_chunks")]: A list of preprocessed documents annotated with an ArtifactConfig.
54+ Annotated[str , ArtifactConfig(name="split_chunks")]: A JSON string containing a list of preprocessed documents annotated with an ArtifactConfig.
5455
5556 Raises:
5657 Exception: If an error occurs during preprocessing.
@@ -64,29 +65,34 @@ def preprocess_documents(
6465 },
6566 )
6667
68+ # Parse the JSON string into a list of Document objects
69+ document_list = [Document (** doc ) for doc in json .loads (documents )]
70+
6771 split_docs = split_documents (
68- documents , chunk_size = CHUNK_SIZE , chunk_overlap = CHUNK_OVERLAP
72+ document_list , chunk_size = CHUNK_SIZE , chunk_overlap = CHUNK_OVERLAP
6973 )
70- return split_docs
74+
75+ # Convert the list of Document objects back to a JSON string
76+ split_docs_json = json .dumps ([doc .__dict__ for doc in split_docs ])
77+
78+ return split_docs_json
7179 except Exception as e :
7280 logger .error (f"Error in preprocess_documents: { e } " )
7381 raise
7482
7583
7684@step
7785def generate_embeddings (
78- split_documents : List [Document ],
79- ) -> Annotated [
80- List [Document ], ArtifactConfig (name = "documents_with_embeddings" )
81- ]:
86+ split_documents : str ,
87+ ) -> Annotated [str , ArtifactConfig (name = "documents_with_embeddings" )]:
8288 """
8389 Generates embeddings for a list of split documents using a SentenceTransformer model.
8490
8591 Args:
8692 split_documents (List[Document]): A list of Document objects that have been split into chunks.
8793
8894 Returns:
89- Annotated[List[Document] , ArtifactConfig(name="embeddings ")]: The list of Document objects with generated embeddings, annotated with an ArtifactConfig.
95+ Annotated[str , ArtifactConfig(name="documents_with_embeddings ")]: A JSON string containing the Document objects with generated embeddings, annotated with an ArtifactConfig.
9096
9197 Raises:
9298 Exception: If an error occurs during the generation of embeddings.
@@ -95,7 +101,7 @@ def generate_embeddings(
95101 model = SentenceTransformer (EMBEDDINGS_MODEL )
96102
97103 log_artifact_metadata (
98- artifact_name = "embeddings " ,
104+ artifact_name = "documents_with_embeddings " ,
99105 metadata = {
100106 "embedding_type" : EMBEDDINGS_MODEL ,
101107 "embedding_dimensionality" : EMBEDDING_DIMENSIONALITY ,
@@ -106,17 +112,22 @@ def generate_embeddings(
106112 embeddings = model .encode (document_texts )
107113
108114 for doc , embedding in zip (split_documents , embeddings ):
109- doc .embedding = embedding
115+ doc .embedding = (
116+ embedding .tolist ()
117+ ) # Convert numpy array to list for JSON serialization
110118
111- return split_documents
119+ # Convert the list of Document objects to a JSON string
120+ documents_json = json .dumps ([doc .__dict__ for doc in split_documents ])
121+
122+ return documents_json
112123 except Exception as e :
113124 logger .error (f"Error in generate_embeddings: { e } " )
114125 raise
115126
116127
117128@step
118129def index_generator (
119- documents : List [ Document ] ,
130+ documents : str ,
120131) -> None :
121132 """Generates an index for the given documents.
122133
@@ -126,7 +137,7 @@ def index_generator(
126137 using the cosine distance measure.
127138
128139 Args:
129- documents (List[Document] ): The list of Document objects with generated embeddings.
140+ documents (str ): A JSON string containing the Document objects with generated embeddings.
130141
131142 Raises:
132143 Exception: If an error occurs during the index generation.
@@ -155,6 +166,9 @@ def index_generator(
155166
156167 register_vector (conn )
157168
169+ # load the documents from the JSON string
170+ documents = json .loads (documents )
171+
158172 # Insert data only if it doesn't already exist
159173 for doc in documents :
160174 content = doc .page_content
0 commit comments