Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
945f882
update multimodal part
sdbds Aug 3, 2025
834d1a8
Update core/agent_core/events/ingestors.py
sdbds Aug 6, 2025
241d234
small update
sdbds Aug 6, 2025
6f3fd21
fix node:crypto
sdbds Aug 6, 2025
fc32a01
update for files upload
sdbds Aug 11, 2025
810164a
update image part
sdbds Aug 13, 2025
f4b6bf2
Update core/agent_core/events/ingestors.py
sdbds Aug 14, 2025
6c40474
Update frontend/app/chat/components/ChatInput.tsx
sdbds Aug 14, 2025
b940371
Update frontend/app/chat/components/ChatInput.tsx
sdbds Aug 14, 2025
a8e0f5f
Update frontend/app/chat/components/ChatInput.tsx
sdbds Aug 14, 2025
c10b95f
Update frontend/app/chat/components/ChatInput.tsx
sdbds Aug 14, 2025
aaafd89
Update core/agent_core/events/ingestors.py
sdbds Aug 14, 2025
c6e4b8a
Update core/agent_core/events/ingestors.py
sdbds Aug 14, 2025
1789d93
Update core/agent_core/events/ingestors.py
sdbds Aug 14, 2025
de3d887
Update frontend/app/chat/components/ChatInput.tsx
sdbds Aug 14, 2025
5cef08d
Update core/agent_core/events/ingestors.py
sdbds Aug 14, 2025
b6dfbaa
Update frontend/app/chat/components/ChatInput.tsx
sdbds Aug 14, 2025
27033b7
Update frontend/app/chat/components/ChatInput.tsx
sdbds Aug 14, 2025
28cc20f
Update frontend/app/chat/components/ChatInput.tsx
sdbds Aug 14, 2025
b51e792
Update frontend/app/chat/components/ChatInput.tsx
sdbds Aug 14, 2025
c537f09
Update frontend/app/chat/components/ChatInput.tsx
sdbds Aug 14, 2025
a05befd
Update frontend/app/chat/components/ChatInput.tsx
sdbds Aug 14, 2025
d903c23
Update frontend/app/chat/components/ChatInput.tsx
sdbds Aug 14, 2025
0b18720
Update frontend/app/chat/components/ChatInput.tsx
sdbds Aug 14, 2025
cf742a2
Update core/api/message_handlers.py
sdbds Aug 14, 2025
394aab4
Update core/api/message_handlers.py
sdbds Aug 14, 2025
6e3c67b
Update core/agent_core/framework/inbox_processor.py
sdbds Aug 14, 2025
ca41fc9
Update core/agent_core/framework/inbox_processor.py
sdbds Aug 14, 2025
d9fb426
Update core/agent_core/framework/inbox_processor.py
sdbds Aug 14, 2025
b616b9f
Update core/agent_core/events/ingestors.py
sdbds Aug 14, 2025
8fa6b0d
Update core/agent_core/events/ingestors.py
sdbds Aug 14, 2025
445c7b9
Update core/agent_core/events/ingestors.py
sdbds Aug 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions core/agent_core/events/ingestors.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,39 @@ def user_prompt_ingestor(payload: Any, params: Dict, context: Dict) -> str:
return payload.get("prompt", "")
return str(payload)

@register_ingestor("multimodal_user_prompt_ingestor")
def multimodal_user_prompt_ingestor(payload: Any, params: Dict, context: Dict) -> str:
"""Processes user input containing images/files and returns a concise text description suitable for LLMs (actual data is handled during message construction)."""
if not isinstance(payload, dict):
return str(payload)

prompt = payload.get("prompt", "")
images = payload.get("images", [])
files = payload.get("files", [])

# If there are no images or files, return the text directly
if not images and not files:
return prompt

# Construct a brief attachment description
parts = []
if images:
parts.append(f"User uploaded {len(images)} image(s)")
if files:
# Optional: list up to the first 3 file names
names = []
for f in files[:3]:
name = f.get("name") or f.get("filename")
if name:
names.append(name)
if names:
parts.append(f"and attached {len(files)} files (e.g., {', '.join(names)}{'' if len(files) <= 3 else ' etc.'})")
else:
parts.append(f"and attached {len(files)} files")

note = "[" + ", ".join(parts) + "]"
return f"{prompt}\n\n{note}" if prompt else note

def _recursive_markdown_formatter(data: Any, schema: Dict, level: int = 0) -> List[str]:
"""
Intelligently formats data recursively into LLM-friendly Markdown.
Expand Down
207 changes: 203 additions & 4 deletions core/agent_core/framework/inbox_processor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import logging
import uuid
import os
import base64
from typing import Any, Dict, List, Optional
from datetime import datetime, timezone
import asyncio
import time
import httpx
from litellm import create_file

from ..events.event_strategies import EVENT_STRATEGY_REGISTRY
from ..events.ingestors import INGESTOR_REGISTRY, markdown_formatter_ingestor
Expand Down Expand Up @@ -34,7 +40,8 @@ def _create_user_turn_from_inbox_item(self, item: Dict) -> Optional[str]:
team_state = self.team_state

prompt_content = item.get("payload", {}).get("prompt")
if not prompt_content:
files_content = item.get("payload", {}).get("files", [])
if not prompt_content and not files_content:
return None

user_turn_id = f"turn_user_{uuid.uuid4().hex[:8]}"
Expand Down Expand Up @@ -62,7 +69,10 @@ def _create_user_turn_from_inbox_item(self, item: Dict) -> Optional[str]:
"end_time": item.get("metadata", {}).get("created_at", datetime.now(timezone.utc).isoformat()),
"source_turn_ids": [last_agent_turn_id] if last_agent_turn_id else [],
"source_tool_call_id": None,
"inputs": {"prompt": prompt_content},
"inputs": (
{"prompt": prompt_content, "files": files_content}
if files_content else {"prompt": prompt_content}
),
"outputs": {},
"llm_interaction": None,
"tool_interactions": [],
Expand Down Expand Up @@ -207,7 +217,7 @@ async def process(self) -> Dict[str, Any]:
try:
payload = item["payload"]

if item.get("source") == "USER_PROMPT":
if item.get("source") in ["USER_PROMPT", "USER_PROMPT_WITH_FILES"]:
new_user_turn_id = self._create_user_turn_from_inbox_item(item)
if new_user_turn_id:
# Pass the "baton" so the next agent_turn can correctly link to this user_turn.
Expand Down Expand Up @@ -249,7 +259,196 @@ async def process(self) -> Dict[str, Any]:
role = params.get("role", "user")
is_persistent = params.get("is_persistent_in_memory", False)

new_message = {"role": role, "content": injected_content}
# Handle multimodal content (files only)
has_multimodal_content = False
content_parts = []

# Check if there is file content
if source in ["USER_PROMPT", "USER_PROMPT_WITH_FILES"] and isinstance(dehydrated_payload, dict):
# Process file content: upload attachments to Gemini and construct as file references
files = dehydrated_payload.get("files", [])
if files:
has_multimodal_content = True
# 添加文本内容(若尚未添加)
if injected_content and not any(part.get("type") == "text" for part in content_parts):
content_parts.append({
"type": "text",
"text": injected_content
})

for f in files:
try:
filename = f.get("name") or f.get("filename") or f"file_{uuid.uuid4().hex[:6]}"
mime_type = f.get("mimeType") or f.get("mime_type") or "application/octet-stream"

if f.get("file_id"):
# Already uploaded, use file reference
file_id = f["file_id"]
logger.info("gemini_file_upload_skipped_existing", extra={
"agent_id": self.agent_id,
"file_name": filename,
"mime_type": mime_type,
"file_id": file_id,
})
# Use file reference
content_parts.append({
"type": "file",
"file": {
"file_id": file_id,
"filename": filename,
"format": mime_type
}
})
else:
# Check if we have direct base64 data from frontend
if f.get("data"):
# Frontend sent base64 data - use directly without file upload
data_str = f["data"]

if isinstance(mime_type, str) and mime_type.startswith("image/"):
# Ensure proper data URL format for images
if not data_str.startswith("data:"):
image_url = f"data:{mime_type};base64,{data_str}"
else:
image_url = data_str

content_parts.append({
"type": "image_url",
"image_url": {
"url": image_url,
"detail": "high"
}
})

logger.info("image_processed_as_base64", extra={
"agent_id": self.agent_id,
"file_name": filename,
"mime_type": mime_type,
"method": "direct_base64"
})
else:
# Non-image files with base64 data
logger.info("non_image_file_with_base64", extra={
"agent_id": self.agent_id,
"file_name": filename,
"mime_type": mime_type,
"note": "Non-image files may not be fully supported"
})

elif f.get("url"):
# URL-based file - need to fetch and potentially upload
file_bytes = None

# Async fetch
async with httpx.AsyncClient(timeout=20) as client:
resp = await client.get(f["url"])
resp.raise_for_status()
file_bytes = resp.content

size_bytes = len(file_bytes) if file_bytes is not None else None
max_base64_size = 20 * 1024 * 1024 # 20MB

if size_bytes and size_bytes < max_base64_size:
# Small file from URL - convert to base64
if isinstance(mime_type, str) and mime_type.startswith("image/"):
base64_data = base64.b64encode(file_bytes).decode()
image_url = f"data:{mime_type};base64,{base64_data}"

content_parts.append({
"type": "image_url",
"image_url": {
"url": image_url,
"detail": "high"
}
})

logger.info("url_file_converted_to_base64", extra={
"agent_id": self.agent_id,
"file_name": filename,
"mime_type": mime_type,
"size_bytes": size_bytes
})
else:
logger.info("non_image_url_file_skipped", extra={
"agent_id": self.agent_id,
"file_name": filename,
"mime_type": mime_type
})
else:
# Large file from URL - use Gemini file upload
# Prefer API key from project LLM config; fallback to env var
try:
resolver = LLMConfigResolver(shared_llm_configs=self.run_context.get("config", {}).get("shared_llm_configs_ref", {}))
llm_config = resolver.resolve(self.profile)
except Exception:
llm_config = {}
gemini_key = (
(llm_config.get("api_key") if isinstance(llm_config, dict) else None)
or os.getenv("GEMINI_API_KEY")
)
if not gemini_key:
logger.error(
"gemini_api_key_missing",
extra={
"agent_id": self.agent_id,
"hint": "Provide api_key in active LLM config or set GEMINI_API_KEY env var"
}
)
continue

# Structured start log
logger.info("gemini_file_upload_start", extra={
"agent_id": self.agent_id,
"file_name": filename,
"mime_type": mime_type,
"size_bytes": size_bytes,
"reason": "file_too_large_for_base64"
})
t0 = time.perf_counter()

# Offload blocking create_file to a thread
created = await asyncio.to_thread(
create_file,
file=file_bytes,
purpose="user_data",
custom_llm_provider="gemini",
api_key=gemini_key,
)
file_id = getattr(created, "id", None) if created is not None else None
if not file_id:
logger.error("gemini_file_upload_failed", extra={
"file_name": filename,
"mime_type": mime_type,
"size_bytes": size_bytes,
"duration_ms": int((time.perf_counter() - t0) * 1000),
})
continue
else:
logger.info("gemini_file_upload_success", extra={
"agent_id": self.agent_id,
"file_name": filename,
"mime_type": mime_type,
"size_bytes": size_bytes,
"file_id": file_id,
"duration_ms": int((time.perf_counter() - t0) * 1000),
})

# Append file reference content part for large files
content_parts.append({
"type": "file",
"file": {
"file_id": file_id,
"filename": filename,
"format": mime_type
}
})
except Exception as ex:
logger.error("file_processing_failed", extra={"error": str(ex)}, exc_info=True)

if has_multimodal_content:
new_message = {"role": role, "content": content_parts}
else:
new_message = {"role": role, "content": injected_content}

# If this message comes from the startup briefing, add an internal flag
# to prevent it from being handed over again in the future.
Expand Down
Loading