Skip to content
This repository was archived by the owner on Nov 10, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion crewai_tools/tools/serper_dev_tool/serper_dev_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
from typing import Any, List, Optional, Type

import requests
from crewai.tools import BaseTool, EnvVar
from crewai.tools import BaseTool
from pydantic import BaseModel, Field

from tests.utils import EnvVar

logger = logging.getLogger(__name__)


Expand Down
53 changes: 35 additions & 18 deletions generate_tool_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Any, Dict, List, Optional, Type

from crewai_tools import tools
from crewai.tools.base_tool import EnvVar
from tests.utils import EnvVar


class ToolSpecExtractor:
Expand All @@ -33,8 +33,12 @@ def extract_tool_info(self, tool_class: Type) -> None:
fields = schema.get("schema", {}).get("fields", {})
tool_info = {
"name": tool_class.__name__,
"humanized_name": self._extract_field_default(fields.get("name"), fallback=tool_class.__name__),
"description": self._extract_field_default(fields.get("description")).strip(),
"humanized_name": self._extract_field_default(
fields.get("name"), fallback=tool_class.__name__
),
"description": self._extract_field_default(
fields.get("description")
).strip(),
"run_params": self._extract_params(fields.get("args_schema")),
"env_vars": self._extract_env_vars(fields.get("env_vars")),
}
Expand All @@ -45,24 +49,31 @@ def extract_tool_info(self, tool_class: Type) -> None:
print(f"Error extracting {tool_class.__name__}: {e}")

def _unwrap_schema(self, schema: Dict) -> Dict:
while schema.get("type") in {"function-after", "default"} and "schema" in schema:
while (
schema.get("type") in {"function-after", "default"} and "schema" in schema
):
schema = schema["schema"]
return schema

def _extract_field_default(self, field: Optional[Dict], fallback: str = "") -> str:
if not field:
return fallback

schema = field.get("schema", {})
default = schema.get("default")
return default if isinstance(default, str) else fallback

def _extract_params(self, args_schema_field: Optional[Dict]) -> List[Dict[str, str]]:
def _extract_params(
self, args_schema_field: Optional[Dict]
) -> List[Dict[str, str]]:
if not args_schema_field:
return []

args_schema_class = args_schema_field.get("schema", {}).get("default")
if not (inspect.isclass(args_schema_class) and hasattr(args_schema_class, "__pydantic_core_schema__")):
if not (
inspect.isclass(args_schema_class)
and hasattr(args_schema_class, "__pydantic_core_schema__")
):
return []

try:
Expand All @@ -78,7 +89,7 @@ def _extract_params(self, args_schema_field: Optional[Dict]) -> List[Dict[str, s
param = {
"name": name,
"description": self._extract_field_default(info)
or self._extract_field_description_from_metadata(info),
or self._extract_field_description_from_metadata(info),
"type": _type,
}
params.append(param)
Expand All @@ -96,14 +107,16 @@ def _extract_env_vars(self, env_vars_field: Optional[Dict]) -> List[Dict[str, st
env_vars = []
for env_var in env_vars_field.get("schema", {}).get("default", []):
if isinstance(env_var, EnvVar):
env_vars.append({
"name": env_var.name,
"description": env_var.description,
"required": env_var.required,
"default": env_var.default,
})
env_vars.append(
{
"name": env_var.name,
"description": env_var.description,
"required": env_var.required,
"default": env_var.default,
}
)
return env_vars

def _extract_field_description_from_metadata(self, field: Dict) -> str:
if metadata := field.get("metadata"):
return metadata.get("pydantic_js_updates", {}).get("description", "")
Expand All @@ -125,13 +138,17 @@ def _schema_type_to_str(self, schema: Dict) -> str:
if schema_type == "list" and "items_schema" in schema:
item_type = self._schema_type_to_str(schema["items_schema"])
return f"list[{item_type}]"

if schema_type == "union" and "choices" in schema:
choices = schema["choices"]
item_types = [self._schema_type_to_str(choice) for choice in choices]
return f"union[{', '.join(item_types)}]"

if schema_type == "dict" and "keys_schema" in schema and "values_schema" in schema:
if (
schema_type == "dict"
and "keys_schema" in schema
and "values_schema" in schema
):
key_type = self._schema_type_to_str(schema["keys_schema"])
value_type = self._schema_type_to_str(schema["values_schema"])
return f"dict[{key_type}, {value_type}]"
Expand Down Expand Up @@ -159,4 +176,4 @@ def save_to_json(self, output_path: str) -> None:
specs = extractor.extract_all_tools()
extractor.save_to_json(str(output_file))

print(f"Extracted {len(specs)} tool classes.")
print(f"Extracted {len(specs)} tool classes.")
Loading