|
1 | 1 | --- |
2 | 2 | title: "Retrieval Augmented Generation (RAG)" |
3 | | -description: "TODO" |
| 3 | +description: "Use vector stores, embedding and text-extraction services to build RAG pipelines" |
4 | 4 | --- |
| 5 | + |
| 6 | +RAG is one of the keystones for efficient data processing and search in the age of AI agents. The goal is to surface |
| 7 | +information from a knowledge base relevant to a specific user query and provide curated context to the LLM. This is |
| 8 | +a complex topic with many variants. We will focus on the fundamental building blocks that any RAG pipeline needs. |
| 9 | + |
| 10 | +The document processing pipeline: |
| 11 | +1. **text extraction** - process complex document formats (PDF, CSV, etc.) |
| 12 | +2. **text splitting** - create meaningful chunks out of long pages of text |
| 13 | +3. **embedding** - vectorize chunks (extract semantic meaning) |
| 14 | +4. **store** - insert chunks to a specialized database |
| 15 | + |
| 16 | +Retrieval: |
| 17 | +1. **embedding** - vectorize user query |
| 18 | +2. **search** - retrieve the document chunks most similar to the user query |
| 19 | + |
| 20 | + |
| 21 | +## Building blocks |
| 22 | +Let's break down how each step can be implemented with the Agent Stack API, but first, make sure you have the |
| 23 | +Platform API extension enabled in your agent: |
| 24 | + |
| 25 | +```python |
| 26 | +from typing import Annotated |
| 27 | + |
| 28 | +from a2a.types import Message |
| 29 | + |
| 30 | +from agentstack_sdk import server |
| 31 | +from agentstack_sdk.a2a.extensions import ( |
| 32 | + PlatformApiExtensionServer, |
| 33 | + PlatformApiExtensionSpec, |
| 34 | + EmbeddingServiceExtensionServer, |
| 35 | + EmbeddingServiceExtensionSpec, |
| 36 | +) |
| 37 | +from agentstack_sdk.server.context import RunContext |
| 38 | + |
| 39 | +# Fileformats supported by the text-extraction service (docling) |
| 40 | +default_input_modes = [ |
| 41 | + "text/plain", |
| 42 | + "application/pdf", |
| 43 | + "application/vnd.openxmlformats-officedocument.wordprocessingml.document", # DOCX |
| 44 | + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", # XLSX |
| 45 | + "application/vnd.openxmlformats-officedocument.presentationml.presentation", # PPTX |
| 46 | + "text/markdown", # Markdown |
| 47 | + "text/asciidoc", # AsciiDoc |
| 48 | + "text/html", # HTML |
| 49 | + "application/xhtml+xml", # XHTML |
| 50 | + "text/csv", # CSV |
| 51 | + "image/png", # PNG |
| 52 | + "image/jpeg", # JPEG |
| 53 | + "image/tiff", # TIFF |
| 54 | + "image/bmp", # BMP |
| 55 | + "image/webp", # WEBP |
| 56 | +] |
| 57 | + |
| 58 | + |
| 59 | +@server.agent( |
| 60 | + default_input_modes=default_input_modes, default_output_modes=["text/plain"] |
| 61 | +) |
| 62 | +async def rag_agent( |
| 63 | + input: Message, |
| 64 | + context: RunContext, |
| 65 | + embedding: Annotated[ |
| 66 | + EmbeddingServiceExtensionServer, EmbeddingServiceExtensionSpec.single_demand() |
| 67 | + ], |
| 68 | + _: Annotated[PlatformApiExtensionServer, PlatformApiExtensionSpec()], |
| 69 | +): ... # Agent code |
| 70 | + |
| 71 | +``` |
| 72 | + |
| 73 | +Agent Stack uses [docling](https://docling-project.github.io/docling/) for extracting text out of documents in various |
| 74 | +[supoported formats](https://docling-project.github.io/docling/usage/supported_formats/). To select which formats the |
| 75 | +agent can accept, use the `default_input_modes` parameter in the agent decorator. |
| 76 | + |
| 77 | +First, let's build a set of functions to process the documents which we will then use in the agent. |
| 78 | + |
| 79 | +### Text Extraction |
| 80 | +To extract text from a `File` uploaded to the Platform API, simply use `file.create_extraction()` and wait for |
| 81 | +the result. After extraction is completed, the `extraction` object will contain |
| 82 | +`extracted_file_id`, which is an ID of a new file containing the extracted text in Markdown. |
| 83 | + |
| 84 | +```python |
| 85 | +from agentstack_sdk.platform import File, Extraction |
| 86 | +import asyncio |
| 87 | + |
| 88 | +async def extract_file(file: File): |
| 89 | + extraction = await file.create_extraction() |
| 90 | + while extraction.status in {"pending", "in_progress"}: |
| 91 | + await asyncio.sleep(1) |
| 92 | + extraction = await file.get_extraction() |
| 93 | + if extraction.status != "completed" or not extraction.extracted_file_id: |
| 94 | + raise ValueError(f"Extraction failed with status: {extraction.status}") |
| 95 | +``` |
| 96 | + |
| 97 | +### Text Splitting |
| 98 | +In this example we will use `MarkdownTextSplitter` from the |
| 99 | +[langchain-text-splitters](https://reference.langchain.com/python/langchain_text_splitters/) package. |
| 100 | +This will split a long document into reasonably sized chunks based on the Markdown header structure. |
| 101 | + |
| 102 | +```python |
| 103 | +from langchain_text_splitters import MarkdownTextSplitter |
| 104 | + |
| 105 | +def chunk_markdown(markdown_text: str) -> list[str]: |
| 106 | + return MarkdownTextSplitter().split_text(markdown_text) |
| 107 | +``` |
| 108 | + |
| 109 | +### Embedding |
| 110 | +Now we need to embed each chunk using the embedding service. Similarly to LLM, Agent Stack implements |
| 111 | +OpenAI-compatible embedding API. You can use any preferred client, in this example we will use the embedding extension |
| 112 | +to create an `AsyncOpenAI` client: |
| 113 | + |
| 114 | +```python |
| 115 | +from openai import AsyncOpenAI |
| 116 | +from agentstack_sdk.a2a.extensions import EmbeddingServiceExtensionServer |
| 117 | + |
| 118 | +def get_embedding_client( |
| 119 | + embedding: EmbeddingServiceExtensionServer, |
| 120 | +) -> tuple[AsyncOpenAI, str]: |
| 121 | + if not embedding: |
| 122 | + raise ValueError("Embedding extension not provided") |
| 123 | + |
| 124 | + embedding_config = embedding.data.embedding_fulfillments.get("default") |
| 125 | + embedding_client = AsyncOpenAI( |
| 126 | + api_key=embedding_config.api_key, base_url=embedding_config.api_base |
| 127 | + ) |
| 128 | + embedding_model = embedding_config.api_model |
| 129 | + return embedding_client, embedding_model |
| 130 | + |
| 131 | + |
| 132 | +``` |
| 133 | +Now we can use this client to embed our chunks and create vector store items: |
| 134 | + |
| 135 | +```python |
| 136 | +from openai import AsyncOpenAI |
| 137 | +from agentstack_sdk.platform import VectorStoreItem, File |
| 138 | + |
| 139 | + |
| 140 | +async def embed_chunks( |
| 141 | + file: File, chunks: list[str], embedding_client: AsyncOpenAI, embedding_model: str |
| 142 | +) -> list[VectorStoreItem]: |
| 143 | + vector_store_items = [] |
| 144 | + embedding_result = await embedding_client.embeddings.create( |
| 145 | + input=chunks, |
| 146 | + model=embedding_model, |
| 147 | + encoding_format="float", |
| 148 | + ) |
| 149 | + for i, embedding_data in enumerate(embedding_result.data): |
| 150 | + item = VectorStoreItem( |
| 151 | + document_id=file.id, |
| 152 | + document_type="platform_file", |
| 153 | + model_id=embedding_model, |
| 154 | + text=chunks[i], |
| 155 | + embedding=embedding_data.embedding, |
| 156 | + metadata={"chunk_index": str(i)}, # add arbitrary string metadata |
| 157 | + ) |
| 158 | + vector_store_items.append(item) |
| 159 | + return vector_store_items |
| 160 | +``` |
| 161 | + |
| 162 | +### Store |
| 163 | +Finally, to insert the prepared items, we need a function to create a vector store. For this we will need to know |
| 164 | +the dimension of the embeddings and model_id. Because the model is chosen by the embedding extension and we don't know |
| 165 | +it in advance, we will create a test embedding request to calculate the dimension: |
| 166 | + |
| 167 | +```python |
| 168 | +from openai import AsyncOpenAI |
| 169 | +from agentstack_sdk.platform import VectorStore |
| 170 | + |
| 171 | + |
| 172 | +async def create_vector_store(embedding_client: AsyncOpenAI, embedding_model: str): |
| 173 | + embedding_response = await embedding_client.embeddings.create( |
| 174 | + input="test", model=embedding_model |
| 175 | + ) |
| 176 | + dimension = len(embedding_response.data[0].embedding) |
| 177 | + return await VectorStore.create( |
| 178 | + name="rag-example", |
| 179 | + dimension=dimension, |
| 180 | + model_id=embedding_model, |
| 181 | + ) |
| 182 | +``` |
| 183 | +We can then add the prepared items using `vector_store.add_documents`, this will become clear in the final example. |
| 184 | + |
| 185 | +### Query vector store |
| 186 | +Assuming we have our knowledge base of documents prepared, we can now easily search the store according to the user |
| 187 | +query. The following function will retrieve five document chunks most similar to the query embedding: |
| 188 | + |
| 189 | +```python |
| 190 | +from openai import AsyncOpenAI |
| 191 | +from agentstack_sdk.platform import VectorStore, VectorStoreSearchResult |
| 192 | + |
| 193 | +async def search_vector_store( |
| 194 | + vector_store: VectorStore, |
| 195 | + query: str, |
| 196 | + embedding_client: AsyncOpenAI, |
| 197 | + embedding_model: str, |
| 198 | +) -> list[VectorStoreSearchResult]: |
| 199 | + embedding_response = await embedding_client.embeddings.create( |
| 200 | + input=query, model=embedding_model |
| 201 | + ) |
| 202 | + query_vector = embedding_response.data[0].embedding |
| 203 | + return await vector_store.search(query_vector=query_vector, limit=5) |
| 204 | +``` |
| 205 | + |
| 206 | +## Putting all together |
| 207 | + |
| 208 | +Having all the pieces in place, we can now build the agent. |
| 209 | + |
| 210 | +### Simple agent |
| 211 | + |
| 212 | +This is a simplified agent that expects a message with one or more files attached as `FilePart` and a |
| 213 | +user query as `TextPart`. A new vector store is created for each message. |
| 214 | + |
| 215 | +```python |
| 216 | +@server.agent( |
| 217 | + default_input_modes=default_input_modes, |
| 218 | + default_output_modes=["text/plain"], |
| 219 | +) |
| 220 | +async def rag_agent( |
| 221 | + input: Message, |
| 222 | + context: RunContext, |
| 223 | + embedding: Annotated[ |
| 224 | + EmbeddingServiceExtensionServer, EmbeddingServiceExtensionSpec.single_demand() |
| 225 | + ], |
| 226 | + _: Annotated[PlatformApiExtensionServer, PlatformApiExtensionSpec()], |
| 227 | +) -> AsyncGenerator[RunYield, None]: |
| 228 | + # Create embedding client |
| 229 | + embedding_client, embedding_model = get_embedding_client(embedding) |
| 230 | + |
| 231 | + # Extract files and query from input |
| 232 | + files = [] |
| 233 | + query = "" |
| 234 | + for part in input.parts: |
| 235 | + match part.root: |
| 236 | + case FilePart(file=FileWithUri(uri=uri)): |
| 237 | + files.append(await File.get(PlatformFileUrl(uri).file_id)) |
| 238 | + case TextPart(text=text): |
| 239 | + query = text |
| 240 | + case _: |
| 241 | + raise NotImplementedError(f"Unsupported part: {type(part.root)}") |
| 242 | + |
| 243 | + if not files or not query: |
| 244 | + raise ValueError("No files or query provided") |
| 245 | + |
| 246 | + # Create vector store |
| 247 | + vector_store = await create_vector_store(embedding_client, embedding_model) |
| 248 | + |
| 249 | + # Process files, add to vector store |
| 250 | + for file in files: |
| 251 | + await extract_file(file) |
| 252 | + async with file.load_text_content() as loaded_file: |
| 253 | + chunks = chunk_markdown(loaded_file.text) |
| 254 | + items = await embed_chunks(file, chunks, embedding_client, embedding_model) |
| 255 | + await vector_store.add_documents(items=items) |
| 256 | + |
| 257 | + # Search vector store |
| 258 | + results = await search_vector_store( |
| 259 | + vector_store, query, embedding_client, embedding_model |
| 260 | + ) |
| 261 | + |
| 262 | + # TODO: You can add LLM result processing here |
| 263 | + |
| 264 | + snippet = [res.model_dump() for res in results] |
| 265 | + yield f"# Results:\n```\n{json.dumps(snippet, indent=2)}\n```" |
| 266 | +``` |
| 267 | + |
| 268 | +Instead of simply returning the output of the vector store, you would typically plug this as a tool into your favorite |
| 269 | +agentic framework. |
| 270 | + |
| 271 | +### Conversational agent |
| 272 | +Having a new vector store for each message is not really a good practice. Typically, you would want to search through |
| 273 | +all documents uploaded in the conversation. Below is a version of the agent which will reuse the vector store across |
| 274 | +messages so you can ask multiple queries and or additional documents later on. |
| 275 | + |
| 276 | +```python |
| 277 | +@server.agent( |
| 278 | + default_input_modes=default_input_modes, |
| 279 | + default_output_modes=["text/plain"], |
| 280 | +) |
| 281 | +async def rag_agent( |
| 282 | + input: Message, |
| 283 | + context: RunContext, |
| 284 | + embedding: Annotated[ |
| 285 | + EmbeddingServiceExtensionServer, EmbeddingServiceExtensionSpec.single_demand() |
| 286 | + ], |
| 287 | + _: Annotated[PlatformApiExtensionServer, PlatformApiExtensionSpec()], |
| 288 | +) -> AsyncGenerator[RunYield, None]: |
| 289 | + # Create embedding client |
| 290 | + embedding_client, embedding_model = get_embedding_client(embedding) |
| 291 | + |
| 292 | + # Extract files and query from input |
| 293 | + files = [] |
| 294 | + query = "" |
| 295 | + for part in input.parts: |
| 296 | + match part.root: |
| 297 | + case FilePart(file=FileWithUri(uri=uri)): |
| 298 | + files.append(await File.get(PlatformFileUrl(uri).file_id)) |
| 299 | + case TextPart(text=text): |
| 300 | + query = text |
| 301 | + case _: |
| 302 | + raise NotImplementedError(f"Unsupported part: {type(part.root)}") |
| 303 | + |
| 304 | + # Check if vector store exists |
| 305 | + vector_store = None |
| 306 | + async for message in context.load_history(): |
| 307 | + match message: |
| 308 | + case Message(parts=[Part(root=DataPart(data=data))]): |
| 309 | + vector_store = await VectorStore.get(data["vector_store_id"]) |
| 310 | + |
| 311 | + # Create vector store if it does not exist |
| 312 | + if not vector_store: |
| 313 | + vector_store = await create_vector_store(embedding_client, embedding_model) |
| 314 | + # store vector store id in context for future messages |
| 315 | + data_part = DataPart(data={"vector_store_id": vector_store.id}) |
| 316 | + await context.store(AgentMessage(parts=[data_part])) |
| 317 | + |
| 318 | + # Process files, add to vector store |
| 319 | + for file in files: |
| 320 | + await extract_file(file) |
| 321 | + async with file.load_text_content() as loaded_file: |
| 322 | + chunks = chunk_markdown(loaded_file.text) |
| 323 | + items = await embed_chunks(file, chunks, embedding_client, embedding_model) |
| 324 | + await vector_store.add_documents(items=items) |
| 325 | + |
| 326 | + # Search vector store |
| 327 | + if query: |
| 328 | + results = await search_vector_store( |
| 329 | + vector_store, query, embedding_client, embedding_model |
| 330 | + ) |
| 331 | + snippet = [res.model_dump() for res in results] |
| 332 | + |
| 333 | + # TODO: You can add LLM result processing here |
| 334 | + |
| 335 | + yield f"# Results:\n```\n{json.dumps(snippet, indent=2)}\n```" |
| 336 | + elif files: |
| 337 | + yield f"{len(files)} file(s) processed" |
| 338 | + else: |
| 339 | + yield "Nothing to do" |
| 340 | +``` |
| 341 | + |
| 342 | +### Next steps |
| 343 | +To further improve the agent, learn how to use other parts of the platform such as LLMs, |
| 344 | +file uploads and conversations: |
| 345 | +- [LLM extension](./llm-configuration) |
| 346 | +- [Multi-turn conversations](./multi-turn) |
| 347 | +- [File handling](./file-handling) |
0 commit comments