Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 209 additions & 2 deletions camel/agents/chat_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,202 @@ def _summarize_tool_result(self, text: str, limit: int = 160) -> str:
return normalized
return normalized[: max(0, limit - 3)].rstrip() + "..."

def _clean_snapshot_line(self, line: str) -> str:
r"""Clean a single snapshot line by removing prefixes and references.

This method handles snapshot lines in the format:
- [prefix] "quoted text" [attributes] [ref=...]: description

It preserves:
- Quoted text content (including brackets inside quotes)
- Description text after the colon

It removes:
- Line prefixes (e.g., "- button", "- tooltip", "generic:")
- Attribute markers (e.g., [disabled], [ref=e47])
- Lines with only element types
- All indentation

Args:
line: The original line content.

Returns:
The cleaned line content, or empty string if line should be removed.
"""
# Remove all leading whitespace (indentation)
original = line.strip()
if not original:
return ''

# Check if line is just a line prefix (element type) with optional colon
# Examples: "- generic:", "- img", "generic:", "img", "button:"
# Matches "- word" or "word:" or "- word:" or just "word"
if re.match(r'^(?:-\s+)?\w+\s*:?\s*$', original):
return '' # Remove lines with only element types

# Step 1: Remove line prefix using regex
# Matches: "- button ", "- generic: ", "tooltip: ", "- img ", etc.
line = re.sub(r'^(?:-\s+)?\w+[\s:]+', '', original)

# Step 2: Remove bracket markers outside of quotes
# Strategy: protect quoted content, remove brackets, restore quotes

# Save all quoted content temporarily
quoted_parts = []

def save_quoted(match):
quoted_parts.append(match.group(0))
return f'__QUOTED_{len(quoted_parts)-1}__'

# Protect quoted content (double quotes)
line = re.sub(r'"[^"]*"', save_quoted, line)

# Remove all bracket markers (quotes are protected)
line = re.sub(r'\s*\[[^\]]+\]\s*', ' ', line)

# Restore quoted content
for i, quoted in enumerate(quoted_parts):
line = line.replace(f'__QUOTED_{i}__', quoted)

# Step 3: Clean up whitespace and formatting
line = re.sub(r'\s+', ' ', line).strip()
line = re.sub(r'\s*:\s*', ': ', line)

# Remove leading colon if that's all that remains
if line == ':' or line.startswith(': '):
line = line.lstrip(': ').strip()

# Return empty string if nothing meaningful remains
if not line:
return ''

return line

def _clean_snapshot_content(self, content: str) -> str:
r"""Clean snapshot content by removing prefixes, references, and
deduplicating lines.

This method identifies snapshot lines (containing element keywords or
references) and cleans them while preserving non-snapshot content.
It also handles JSON-formatted tool outputs with snapshot fields.

Args:
content: The original snapshot content.

Returns:
The cleaned content with deduplicated lines.
"""
# Try to parse as JSON first
try:
import json

data = json.loads(content)
modified = False

# Recursively clean snapshot fields in JSON
def clean_json_value(obj):
nonlocal modified
if isinstance(obj, dict):
result = {}
for key, value in obj.items():
if key == 'snapshot' and isinstance(value, str):
# Found a snapshot field, clean it
# Decode escape sequences (e.g., \n -> actual newline)
try:
decoded_value = value.encode().decode('unicode_escape')
except:
decoded_value = value

# Check if cleaning is needed
needs_cleaning = (
'- ' in decoded_value or
'[ref=' in decoded_value or
any(elem + ':' in decoded_value for elem in [
'generic', 'img', 'banner', 'list',
'listitem', 'search', 'navigation'
])
)

if needs_cleaning:
cleaned_snapshot = self._clean_text_snapshot(
decoded_value
)
result[key] = cleaned_snapshot
modified = True
else:
result[key] = value
else:
result[key] = clean_json_value(value)
return result
elif isinstance(obj, list):
return [clean_json_value(item) for item in obj]
else:
return obj

cleaned_data = clean_json_value(data)

if modified:
return json.dumps(cleaned_data, ensure_ascii=False, indent=4)
else:
return content

except (json.JSONDecodeError, TypeError):
# Not JSON, process as plain text
return self._clean_text_snapshot(content)

def _clean_text_snapshot(self, content: str) -> str:
r"""Clean plain text snapshot content.

This method:
- Removes all indentation
- Deletes empty lines
- Deduplicates all lines
- Cleans snapshot-specific markers

Args:
content: The original snapshot text.

Returns:
The cleaned content with deduplicated lines, no indentation, and no empty lines.
"""
lines = content.split('\n')
cleaned_lines = []
seen = set() # For deduplication across ALL lines

for line in lines:
# Strip indentation from every line
stripped_line = line.strip()

# Skip empty lines
if not stripped_line:
continue

# Skip metadata lines (like "- /url:", "- /ref:", etc.)
if re.match(r'^-?\s*/\w+\s*:', stripped_line):
continue

# Check if this is a snapshot line using regex
# Matches lines with: [ref=...], "- element", "element:", "- element:", etc.
is_snapshot_line = (
'[ref=' in stripped_line or
re.match(r'^(?:-\s+)?\w+(?:[\s:]|$)', stripped_line)
)

if is_snapshot_line:
# Clean snapshot line
cleaned = self._clean_snapshot_line(stripped_line)
# Only add if not empty and not duplicate
if cleaned and cleaned not in seen:
cleaned_lines.append(cleaned)
seen.add(cleaned)
else:
# Non-snapshot line: remove indentation, deduplicate
if stripped_line not in seen:
cleaned_lines.append(stripped_line)
seen.add(stripped_line)

return '\n'.join(cleaned_lines)

def _register_tool_output_for_cache(
self,
func_name: str,
Expand Down Expand Up @@ -956,11 +1152,22 @@ def _cache_tool_output_entry(self, entry: _ToolOutputHistoryEntry) -> None:
if self._tool_output_cache_manager is None or not entry.record_uuids:
return

# Check if result contains snapshot markers and clean if necessary
result_to_cache = entry.result_text
if '- ' in result_to_cache and '[ref=' in result_to_cache:
# Likely contains snapshot with references, clean it
result_to_cache = self._clean_snapshot_content(result_to_cache)
logger.debug(
"Cleaned snapshot references from tool output '%s' (%s)",
entry.tool_name,
entry.tool_call_id,
)

try:
cache_id, cache_path = self._tool_output_cache_manager.save(
entry.tool_name,
entry.tool_call_id,
entry.result_text,
result_to_cache,
)
except Exception as exc: # pragma: no cover - defensive
logger.warning(
Expand All @@ -986,7 +1193,7 @@ def _cache_tool_output_entry(self, entry: _ToolOutputHistoryEntry) -> None:
},
content="",
func_name=entry.tool_name,
result=self._build_cache_reference_text(entry, cache_id),
result=result_to_cache, # Use cleaned content directly
tool_call_id=entry.tool_call_id,
)

Expand Down
3 changes: 1 addition & 2 deletions camel/models/base_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ def __init__(
self._max_retries = max_retries
# Initialize logging configuration
self._log_enabled = (
os.environ.get("CAMEL_MODEL_LOG_ENABLED", "False").lower()
== "true"
os.environ.get("CAMEL_MODEL_LOG_ENABLED", "True").lower() == "true"
)
self._log_dir = os.environ.get("CAMEL_LOG_DIR", "camel_logs")

Expand Down
90 changes: 87 additions & 3 deletions camel/models/moonshot_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def __init__(
model_type: Union[ModelType, str],
model_config_dict: Optional[Dict[str, Any]] = None,
api_key: Optional[str] = None,
url: Optional[str] = "https://api.moonshot.ai/v1",
url: Optional[str] = None,
token_counter: Optional[BaseTokenCounter] = None,
timeout: Optional[float] = None,
max_retries: int = 3,
Expand All @@ -93,7 +93,12 @@ def __init__(
if model_config_dict is None:
model_config_dict = MoonshotConfig().as_dict()
api_key = api_key or os.environ.get("MOONSHOT_API_KEY")
url = url or os.environ.get("MOONSHOT_API_BASE_URL")
# Preserve default URL if not provided
if url is None:
url = (
os.environ.get("MOONSHOT_API_BASE_URL")
or "https://api.moonshot.ai/v1"
)
timeout = timeout or float(os.environ.get("MODEL_TIMEOUT", 180))
super().__init__(
model_type=model_type,
Expand Down Expand Up @@ -130,14 +135,93 @@ def _prepare_request(
request_config = copy.deepcopy(self.model_config_dict)

if tools:
request_config["tools"] = tools
# Clean tools to remove null types (Moonshot API incompatibility)
cleaned_tools = self._clean_tool_schemas(tools)
request_config["tools"] = cleaned_tools
elif response_format:
# Use the same approach as DeepSeek for structured output
try_modify_message_with_format(messages[-1], response_format)
request_config["response_format"] = {"type": "json_object"}

return request_config

def _clean_tool_schemas(
self, tools: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
r"""Clean tool schemas to remove null types for Moonshot compatibility.

Moonshot API doesn't accept {"type": "null"} in anyOf schemas.
This method removes null type definitions from parameters.

Args:
tools (List[Dict[str, Any]]): Original tool schemas.

Returns:
List[Dict[str, Any]]: Cleaned tool schemas.
"""
import copy

def remove_null_from_schema(schema: Any) -> Any:
"""Recursively remove null types from schema."""
if isinstance(schema, dict):
# Create a copy to avoid modifying the original
result = {}

for key, value in schema.items():
if key == 'type' and isinstance(value, list):
# Handle type arrays like ["string", "null"]
filtered_types = [t for t in value if t != 'null']
if len(filtered_types) == 1:
# Single type remains, convert to string
result[key] = filtered_types[0]
elif len(filtered_types) > 1:
# Multiple types remain, keep as array
result[key] = filtered_types
else:
# All were null, use string as fallback
result[key] = 'string'
elif key == 'anyOf':
# Handle anyOf with null types
filtered = [
item
for item in value
if not (
isinstance(item, dict)
and item.get('type') == 'null'
)
]
if len(filtered) == 1:
# If only one type remains, flatten it
return remove_null_from_schema(filtered[0])
elif len(filtered) > 1:
result[key] = [
remove_null_from_schema(item)
for item in filtered
]
else:
# All were null, return string type as fallback
return {"type": "string"}
else:
# Recursively process other values
result[key] = remove_null_from_schema(value)

return result
elif isinstance(schema, list):
return [remove_null_from_schema(item) for item in schema]
else:
return schema

cleaned_tools = copy.deepcopy(tools)
for tool in cleaned_tools:
if 'function' in tool and 'parameters' in tool['function']:
params = tool['function']['parameters']
if 'properties' in params:
params['properties'] = remove_null_from_schema(
params['properties']
)

return cleaned_tools

@observe()
async def _arun(
self,
Expand Down
Loading