Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,6 @@ db.sqlite3

# Docker compose override
compose.override.yml

# LLM configuration
src/backend/conversations/configuration/llm/default_dev.json
15 changes: 12 additions & 3 deletions src/backend/chat/agent_rag/document_converter/markitdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def convert_raw( # pylint: disable=unused-argument
*,
name: str,
content_type: str,
content: bytes,
content: bytes | BytesIO,
) -> str:
"""
Convert a document to Markdown format.
Expand All @@ -28,9 +28,18 @@ def convert_raw( # pylint: disable=unused-argument
Args:
name (str): The name of the document.
content_type (str): The MIME type of the document (e.g., "application/pdf").
content (bytes): The content of the document as bytes.
content (bytes | BytesIO): The content of the document as bytes or BytesIO.
"""
return self._convert(BytesIO(content), file_extension=os.path.splitext(name)[1])
# Handle both bytes and BytesIO
if isinstance(content, BytesIO):
# Read the BytesIO to bytes, then create a new BytesIO for the converter
content_bytes = content.read()
content_io = BytesIO(content_bytes)
else:
# content is already bytes
content_io = BytesIO(content)

return self._convert(content_io, file_extension=os.path.splitext(name)[1])

def _convert(self, document: BytesIO, file_extension: str) -> str:
"""
Expand Down
102 changes: 57 additions & 45 deletions src/backend/chat/clients/pydantic_ai.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import time
import uuid
from contextlib import AsyncExitStack, ExitStack
from io import BytesIO
from typing import Dict, List, Optional, Tuple

from django.conf import settings
Expand All @@ -22,7 +21,6 @@
from django.core.exceptions import ImproperlyConfigured
from django.core.files.storage import default_storage
from django.db.models import Q
from django.utils.module_loading import import_string

from asgiref.sync import sync_to_async
from langfuse import get_client
Expand Down Expand Up @@ -63,6 +61,7 @@
from chat.ai_sdk_types import (
LanguageModelV1Source,
SourceUIPart,
TextUIPart,
UIMessage,
)
from chat.clients.async_to_sync import convert_async_generator_to_sync
Expand All @@ -71,10 +70,12 @@
model_message_to_ui_message,
ui_message_to_user_content,
)
from chat.document_storage import store_document_with_attachment
from chat.mcp_servers import get_mcp_servers
from chat.tools.document_generic_search_rag import add_document_rag_search_tool_from_setting
from chat.tools.document_search_rag import add_document_rag_search_tool
from chat.tools.document_summarize import document_summarize
from chat.tools.fetch_url import detect_url_in_conversation, fetch_url
from chat.vercel_ai_sdk.core import events_v4, events_v5
from chat.vercel_ai_sdk.encoder import EventEncoder

Expand Down Expand Up @@ -249,19 +250,10 @@ async def parse_input_documents(self, documents: List[BinaryContent | DocumentUr
):
raise ValueError("Document URL does not belong to the conversation.")

document_store_backend = import_string(settings.RAG_DOCUMENT_SEARCH_BACKEND)

document_store = document_store_backend(self.conversation.collection_id)
if not document_store.collection_id:
# Create a new collection for the conversation
collection_id = document_store.create_collection(
name=f"conversation-{self.conversation.pk}",
)
self.conversation.collection_id = str(collection_id)
await self.conversation.asave(update_fields=["collection_id", "updated_at"])

for document in documents:
key = None
document_data = None

if isinstance(document, DocumentUrl):
if document.url.startswith("/media-key/"):
# Local file, retrieve from object storage
Expand All @@ -272,33 +264,31 @@ async def parse_input_documents(self, documents: List[BinaryContent | DocumentUr
# Retrieve the document data
with default_storage.open(key, "rb") as file:
document_data = file.read()
parsed_content = document_store.parse_and_store_document(
name=document.identifier,
content_type=document.media_type,
content=document_data,
)
else:
# Remote URL
raise ValueError("External document URL are not accepted yet.")
else:
parsed_content = document_store.parse_and_store_document(
name=document.identifier,
content_type=document.media_type,
content=document.data,
)

if not document.media_type.startswith("text/"):
md_attachment = await models.ChatConversationAttachment.objects.acreate(
conversation=self.conversation,
uploaded_by=self.user,
key=key or f"{self.conversation.pk}/attachments/{document.identifier}.md",
file_name=f"{document.identifier}.md",
content_type="text/markdown",
conversion_from=key, # might be None
)
default_storage.save(md_attachment.key, BytesIO(parsed_content.encode("utf8")))
md_attachment.upload_state = models.AttachmentStatus.READY
await md_attachment.asave(update_fields=["upload_state", "updated_at"])
document_data = document.data
# Convert BytesIO to bytes if needed
if hasattr(document_data, 'read'):
document_data = document_data.read()

# Use the shared document storage utility
create_attachment = not document.media_type.startswith("text/")
attachment_key = None
if create_attachment:
attachment_key = key or f"{self.conversation.pk}/attachments/{document.identifier}.md"

await store_document_with_attachment(
conversation=self.conversation,
user=self.user,
name=document.identifier,
content_type=document.media_type,
content=document_data,
create_attachment=create_attachment,
conversion_from=key, # might be None
attachment_key=attachment_key,
)

def prepare_prompt( # noqa: PLR0912 # pylint: disable=too-many-branches
self, message: UIMessage
Expand Down Expand Up @@ -389,6 +379,19 @@ async def _run_agent( # noqa: PLR0912, PLR0915 # pylint: disable=too-many-branc
langfuse.update_current_trace(
input=user_prompt if self._store_analytics else "REDACTED"
)

# Check conversation history or provided messages
urls_in_conversation = detect_url_in_conversation(messages)
has_url_in_conversation = bool(urls_in_conversation)

if has_url_in_conversation:
# Add fetch_url tool dynamically if URL is detected and tool doesn't exist yet
@self.conversation_agent.tool(name="fetch_url", retries=2)
@functools.wraps(fetch_url)
async def fetch_url_tool(ctx: RunContext, url: str) -> ToolReturn:
"""Wrap the fetch_url tool to provide context and add the tool."""
ctx.deps.messages = messages
return await fetch_url(ctx, url)

usage = {"promptTokens": 0, "completionTokens": 0}

Expand Down Expand Up @@ -484,11 +487,13 @@ def force_web_search_prompt() -> str:
.aexists()
)

should_enable_rag = has_not_pdf_docs or has_url_in_conversation

document_urls = []
if not conversation_has_documents and not has_not_pdf_docs:
if not conversation_has_documents and not should_enable_rag:
# No documents to process
pass
elif has_not_pdf_docs:
elif should_enable_rag:
add_document_rag_search_tool(self.conversation_agent)

@self.conversation_agent.instructions
Expand All @@ -505,13 +510,15 @@ def summarization_system_prompt() -> str:
)

# Inform the model (system-level) that documents are attached and available
@self.conversation_agent.system_prompt
def attached_documents_note() -> str:
return (
"[Internal context] User documents are attached to this conversation. "
"Do not request re-upload of documents; consider them already available "
"via the internal store."
)
# Only if we actually have documents (not just URL), to avoid hallucination
if has_not_pdf_docs:
@self.conversation_agent.system_prompt
def attached_documents_note() -> str:
return (
"[Internal context] User documents are attached to this conversation. "
"Do not request re-upload of documents; consider them already available "
"via the internal store."
)

@self.conversation_agent.tool(name="summarize", retries=2)
@functools.wraps(document_summarize)
Expand Down Expand Up @@ -549,6 +556,7 @@ async def summarize(ctx: RunContext, *args, **kwargs) -> ToolReturn:

_final_output_from_tool = None
_ui_sources = []
_added_source_urls = set()

# Help Mistral to prevent `Unexpected role 'user' after role 'tool'` error.
if history and history[-1].kind == "request":
Expand Down Expand Up @@ -661,6 +669,10 @@ async def summarize(ctx: RunContext, *args, **kwargs) -> ToolReturn:
sources := event.result.metadata.get("sources")
):
for source_url in sources:
# Skip if we've already added this URL to avoid duplicates
if source_url in _added_source_urls:
continue
_added_source_urls.add(source_url)
url_source = LanguageModelV1Source(
sourceType="url",
id=str(uuid.uuid4()),
Expand Down
Loading
Loading