diff --git a/python/semantic_kernel/connectors/serpex.py b/python/semantic_kernel/connectors/serpex.py new file mode 100644 index 000000000000..bc3d8d62e0d1 --- /dev/null +++ b/python/semantic_kernel/connectors/serpex.py @@ -0,0 +1,398 @@ +# Copyright (c) Microsoft. All rights reserved. + +import ast +import logging +import sys +from collections.abc import AsyncIterable, Callable +from inspect import getsource +from typing import Any, ClassVar, Final, Literal + +from httpx import AsyncClient, HTTPStatusError, RequestError +from pydantic import Field, SecretStr, ValidationError + +from semantic_kernel.connectors._search_shared import SearchLambdaVisitor +from semantic_kernel.data.text_search import ( + KernelSearchResults, + SearchOptions, + TextSearch, + TextSearchResult, + TSearchResult, +) +from semantic_kernel.exceptions import ServiceInitializationError, ServiceInvalidRequestError +from semantic_kernel.kernel_pydantic import KernelBaseModel, KernelBaseSettings +from semantic_kernel.kernel_types import OptionalOneOrList +from semantic_kernel.utils.feature_stage_decorator import experimental +from semantic_kernel.utils.telemetry.user_agent import SEMANTIC_KERNEL_USER_AGENT + +if sys.version_info >= (3, 12): + from typing import override +else: + from typing_extensions import override + +logger: logging.Logger = logging.getLogger(__name__) + +# region Constants +DEFAULT_URL: Final[str] = "https://api.serpex.dev/api/search" +SUPPORTED_ENGINES: Final[list[str]] = ["auto", "google", "bing", "duckduckgo", "brave", "yahoo", "yandex"] +QUERY_PARAMETERS: Final[list[str]] = [ + "engine", + "category", + "time_range", +] +# endregion Constants + + +# region SerpexSettings +class SerpexSettings(KernelBaseSettings): + """Serpex Connector settings. + + The settings are first loaded from environment variables with the prefix 'SERPEX_'. If the + environment variables are not found, the settings can be loaded from a .env file with the + encoding 'utf-8'. If the settings are not found in the .env file, the settings are ignored; + however, validation will fail alerting that the settings are missing. + + Required settings for prefix 'SERPEX_' are: + - api_key: SecretStr - The Serpex API key (Env var SERPEX_API_KEY) + + Optional settings for prefix 'SERPEX_' are: + - engine: str - Search engine to use (default: "auto", options: auto, google, bing, duckduckgo, brave, yahoo, yandex) + - category: str - Search category (default: "web") + - time_range: str | None - Time range filter (options: all, day, week, month, year) + """ + + env_prefix: ClassVar[str] = "SERPEX_" + + api_key: SecretStr + engine: str = "auto" + category: str = "web" + time_range: str | None = None + + +# endregion SerpexSettings + + +# region SerpexResult +@experimental +class SerpexResult(KernelBaseModel): + """A Serpex search result.""" + + position: int | None = None + title: str = "" + url: str = "" + snippet: str = "" + published_date: str | None = None + source: str | None = None + + +@experimental +class SerpexAnswer(KernelBaseModel): + """A Serpex instant answer.""" + + answer: str = "" + source: str | None = None + + +@experimental +class SerpexInfobox(KernelBaseModel): + """A Serpex knowledge panel/infobox.""" + + title: str | None = None + description: str | None = None + url: str | None = None + image: str | None = None + + +@experimental +class SerpexMetadata(KernelBaseModel): + """Metadata about the search.""" + + number_of_results: int = 0 + request_time: float = 0.0 + engine: str = "" + + +@experimental +class SerpexResponse(KernelBaseModel): + """The response from a Serpex search.""" + + metadata: SerpexMetadata | None = None + results: list[SerpexResult] = Field(default_factory=list) + answers: list[SerpexAnswer] = Field(default_factory=list) + infoboxes: list[SerpexInfobox] = Field(default_factory=list) + suggestions: list[str] = Field(default_factory=list) + corrections: list[str] = Field(default_factory=list) + + +# endregion SerpexResult + + +# region SerpexSearch +@experimental +class SerpexSearch(KernelBaseModel, TextSearch): + """Serpex search connector for multi-engine web search. + + Serpex provides unified access to multiple search engines including Google, Bing, + DuckDuckGo, Brave, Yahoo, and Yandex with intelligent auto-routing and retry logic. + + Args: + api_key: The Serpex API key + base_url: Optional custom API endpoint (default: https://api.serpex.dev/api/search) + engine: Search engine to use (default: "auto") + category: Search category (default: "web") + time_range: Optional time range filter + + Example: + ```python + from semantic_kernel.connectors import SerpexSearch + from pydantic import SecretStr + + # Initialize with API key + search = SerpexSearch( + api_key=SecretStr("your-api-key"), + engine="auto" + ) + + # Or use environment variables + search = SerpexSearch() # Reads SERPEX_API_KEY from env + + # Perform search + results = await search.search("Python programming") + ``` + """ + + api_key: SecretStr + base_url: str = DEFAULT_URL + engine: str = "auto" + category: str = "web" + time_range: str | None = None + + def __init__( + self, + api_key: SecretStr | None = None, + base_url: str = DEFAULT_URL, + engine: str = "auto", + category: str = "web", + time_range: str | None = None, + **kwargs: Any, + ) -> None: + """Initialize the Serpex search connector. + + Args: + api_key: The Serpex API key. If not provided, will try to load from SERPEX_API_KEY env var + base_url: Custom API endpoint (default: https://api.serpex.dev/api/search) + engine: Search engine (default: "auto", options: auto, google, bing, duckduckgo, brave, yahoo, yandex) + category: Search category (default: "web") + time_range: Time range filter (options: all, day, week, month, year) + **kwargs: Additional keyword arguments + """ + if api_key is None: + try: + settings = SerpexSettings() + api_key = settings.api_key + if not engine or engine == "auto": + engine = settings.engine + if not category: + category = settings.category + if time_range is None: + time_range = settings.time_range + except ValidationError as e: + raise ServiceInitializationError( + "Serpex API key is required. Please provide it via api_key parameter " + "or set the SERPEX_API_KEY environment variable." + ) from e + + if engine not in SUPPORTED_ENGINES: + raise ServiceInitializationError( + f"Invalid engine '{engine}'. Supported engines: {', '.join(SUPPORTED_ENGINES)}" + ) + + super().__init__( + api_key=api_key, + base_url=base_url, + engine=engine, + category=category, + time_range=time_range, + **kwargs, + ) + + @override + async def search( + self, + query: str, + top: int = 10, + skip: int = 0, + filter: Callable[[TSearchResult], bool] | None = None, + options: SearchOptions | None = None, + **kwargs: Any, + ) -> KernelSearchResults[TextSearchResult]: + """Execute a search query using Serpex. + + Args: + query: The search query string + top: Maximum number of results to return (default: 10) + skip: Number of results to skip (default: 0) + filter: Optional filter function + options: Additional search options + **kwargs: Additional parameters (engine, category, time_range, etc.) + + Returns: + KernelSearchResults containing TextSearchResult items + """ + return await self._inner_search( + query=query, top=top, skip=skip, filter=filter, options=options, **kwargs + ) + + @override + async def get_text_search_results( + self, + query: str, + top: int = 10, + skip: int = 0, + filter: Callable[[TSearchResult], bool] | None = None, + options: SearchOptions | None = None, + **kwargs: Any, + ) -> AsyncIterable[TextSearchResult]: + """Get async iterable of search results. + + Args: + query: The search query string + top: Maximum number of results to return + skip: Number of results to skip + filter: Optional filter function + options: Additional search options + **kwargs: Additional parameters + + Yields: + TextSearchResult items + """ + results = await self._inner_search( + query=query, top=top, skip=skip, filter=filter, options=options, **kwargs + ) + for result in results.results: + yield result + + @override + async def get_search_results( + self, + query: str, + top: int = 10, + skip: int = 0, + filter: Callable[[TSearchResult], bool] | None = None, + options: SearchOptions | None = None, + **kwargs: Any, + ) -> AsyncIterable[SerpexResult]: + """Get async iterable of SerpexResult objects. + + Args: + query: The search query string + top: Maximum number of results to return + skip: Number of results to skip + filter: Optional filter function + options: Additional search options + **kwargs: Additional parameters + + Yields: + SerpexResult items + """ + response = await self._execute_search(query=query, **kwargs) + + # Apply skip and top + results = response.results[skip : skip + top] + + # Apply filter if provided + if filter: + for result in results: + if filter(result): + yield result + else: + for result in results: + yield result + + async def _inner_search( + self, + query: str, + top: int = 10, + skip: int = 0, + filter: Callable[[TSearchResult], bool] | None = None, + options: SearchOptions | None = None, + **kwargs: Any, + ) -> KernelSearchResults[TextSearchResult]: + """Internal search method. + + Args: + query: The search query + top: Number of results + skip: Number to skip + filter: Filter function + options: Search options + **kwargs: Additional parameters + + Returns: + KernelSearchResults + """ + response = await self._execute_search(query=query, **kwargs) + + # Convert SerpexResults to TextSearchResults + text_results: list[TextSearchResult] = [] + for result in response.results[skip : skip + top]: + text_result = TextSearchResult( + name=result.title, + value=result.snippet, + link=result.url, + ) + if filter is None or filter(text_result): + text_results.append(text_result) + + return KernelSearchResults( + results=text_results, + total_count=len(response.results) if response.metadata else 0, + ) + + async def _execute_search(self, query: str, **kwargs: Any) -> SerpexResponse: + """Execute the actual API call to Serpex. + + Args: + query: The search query + **kwargs: Additional parameters (engine, category, time_range) + + Returns: + SerpexResponse object + """ + params: dict[str, Any] = { + "q": query, + "engine": kwargs.get("engine", self.engine), + "category": kwargs.get("category", self.category), + } + + # Add time_range if specified + time_range = kwargs.get("time_range", self.time_range) + if time_range: + params["time_range"] = time_range + + headers = { + "Authorization": f"Bearer {self.api_key.get_secret_value()}", + "User-Agent": SEMANTIC_KERNEL_USER_AGENT, + "Content-Type": "application/json", + } + + async with AsyncClient() as client: + try: + logger.debug(f"Executing Serpex search with query: {query}") + response = await client.get(self.base_url, params=params, headers=headers, timeout=30.0) + response.raise_for_status() + + data = response.json() + return SerpexResponse(**data) + + except HTTPStatusError as e: + logger.error(f"HTTP error during Serpex search: {e}") + raise ServiceInvalidRequestError(f"Serpex API request failed: {e}") from e + except RequestError as e: + logger.error(f"Request error during Serpex search: {e}") + raise ServiceInvalidRequestError(f"Failed to connect to Serpex API: {e}") from e + except Exception as e: + logger.error(f"Unexpected error during Serpex search: {e}") + raise ServiceInvalidRequestError(f"Serpex search failed: {e}") from e + + +# endregion SerpexSearch diff --git a/python/tests/unit/connectors/search/test_serpex_search.py b/python/tests/unit/connectors/search/test_serpex_search.py new file mode 100644 index 000000000000..121fd01c4236 --- /dev/null +++ b/python/tests/unit/connectors/search/test_serpex_search.py @@ -0,0 +1,289 @@ +# Copyright (c) Microsoft. All rights reserved. + +from unittest.mock import AsyncMock, MagicMock, patch + +import httpx +import pytest +from pydantic import SecretStr + +from semantic_kernel.connectors.serpex import ( + SerpexMetadata, + SerpexResponse, + SerpexResult, + SerpexSearch, + SerpexSettings, +) +from semantic_kernel.data.text_search import KernelSearchResults, TextSearchResult +from semantic_kernel.exceptions import ServiceInitializationError, ServiceInvalidRequestError + + +@pytest.fixture +def serpex_unit_test_env(monkeypatch): + """Set up environment variables for Serpex tests.""" + monkeypatch.setenv("SERPEX_API_KEY", "test_api_key") + monkeypatch.setenv("SERPEX_ENGINE", "auto") + monkeypatch.setenv("SERPEX_CATEGORY", "web") + + +@pytest.fixture +def serpex_search(serpex_unit_test_env): + """Set up the fixture to configure the Serpex Search for these tests.""" + return SerpexSearch() + + +@pytest.fixture +def async_client_mock(): + """Set up the fixture to mock AsyncClient.""" + async_client_mock = AsyncMock() + with patch("semantic_kernel.connectors.serpex.AsyncClient", return_value=async_client_mock): + yield async_client_mock + + +@pytest.fixture +def mock_serpex_response(): + """Set up the fixture to mock SerpexResponse.""" + mock_result = SerpexResult( + position=1, + title="Test Title", + url="https://example.com", + snippet="Test snippet content", + ) + mock_metadata = SerpexMetadata( + number_of_results=10, + request_time=0.5, + engine="google", + ) + return SerpexResponse( + metadata=mock_metadata, + results=[mock_result], + answers=[], + infoboxes=[], + suggestions=[], + corrections=[], + ) + + +def test_serpex_search_init_success(serpex_unit_test_env): + """Test that SerpexSearch initializes successfully with valid env.""" + search = SerpexSearch() + assert search.api_key.get_secret_value() == "test_api_key" + assert search.engine == "auto" + assert search.category == "web" + + +def test_serpex_search_init_with_params(): + """Test that SerpexSearch initializes with provided parameters.""" + search = SerpexSearch( + api_key=SecretStr("custom_key"), + engine="google", + category="web", + time_range="day", + ) + assert search.api_key.get_secret_value() == "custom_key" + assert search.engine == "google" + assert search.time_range == "day" + + +def test_serpex_search_init_invalid_engine(): + """Test that SerpexSearch raises error for invalid engine.""" + with pytest.raises(ServiceInitializationError) as exc_info: + SerpexSearch(api_key=SecretStr("test_key"), engine="invalid_engine") + assert "Invalid engine" in str(exc_info.value) + + +@pytest.mark.parametrize("exclude_list", [["SERPEX_API_KEY"]], indirect=True) +def test_serpex_search_init_no_api_key(monkeypatch): + """Test that SerpexSearch raises ServiceInitializationError without API key.""" + monkeypatch.delenv("SERPEX_API_KEY", raising=False) + with pytest.raises(ServiceInitializationError) as exc_info: + SerpexSearch() + assert "API key is required" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_search_success(serpex_unit_test_env, mock_serpex_response): + """Test that search method returns KernelSearchResults successfully.""" + # Arrange + search = SerpexSearch() + + mock_http_response = MagicMock() + mock_http_response.json.return_value = { + "metadata": {"number_of_results": 10, "request_time": 0.5, "engine": "google"}, + "results": [ + { + "position": 1, + "title": "Test Title", + "url": "https://example.com", + "snippet": "Test snippet content", + } + ], + "answers": [], + "infoboxes": [], + "suggestions": [], + "corrections": [], + } + mock_http_response.raise_for_status = MagicMock() + + # Act + with patch("semantic_kernel.connectors.serpex.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + mock_client.__aenter__.return_value = mock_client + mock_client.get.return_value = mock_http_response + mock_client_class.return_value = mock_client + + results: KernelSearchResults[TextSearchResult] = await search.search("Python programming") + + # Assert + assert len(results.results) == 1 + assert results.results[0].name == "Test Title" + assert results.results[0].value == "Test snippet content" + assert results.results[0].link == "https://example.com" + assert results.total_count == 10 + + +@pytest.mark.asyncio +async def test_search_http_status_error(serpex_unit_test_env): + """Test that search raises ServiceInvalidRequestError on HTTP error.""" + # Arrange + search = SerpexSearch() + + # Act & Assert + with patch("semantic_kernel.connectors.serpex.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + mock_client.__aenter__.return_value = mock_client + mock_client.get.side_effect = httpx.HTTPStatusError( + "Error", request=MagicMock(), response=MagicMock() + ) + mock_client_class.return_value = mock_client + + with pytest.raises(ServiceInvalidRequestError) as exc_info: + await search.search("Test query") + assert "Serpex API request failed" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_search_request_error(serpex_unit_test_env): + """Test that search raises ServiceInvalidRequestError on request error.""" + # Arrange + search = SerpexSearch() + + # Act & Assert + with patch("semantic_kernel.connectors.serpex.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + mock_client.__aenter__.return_value = mock_client + mock_client.get.side_effect = httpx.RequestError("Connection failed") + mock_client_class.return_value = mock_client + + with pytest.raises(ServiceInvalidRequestError) as exc_info: + await search.search("Test query") + assert "Failed to connect to Serpex API" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_search_with_custom_engine(serpex_unit_test_env): + """Test search with custom engine parameter.""" + # Arrange + search = SerpexSearch() + + mock_http_response = MagicMock() + mock_http_response.json.return_value = { + "metadata": {"number_of_results": 5, "request_time": 0.3, "engine": "bing"}, + "results": [], + "answers": [], + "infoboxes": [], + "suggestions": [], + "corrections": [], + } + mock_http_response.raise_for_status = MagicMock() + + # Act + with patch("semantic_kernel.connectors.serpex.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + mock_client.__aenter__.return_value = mock_client + mock_client.get.return_value = mock_http_response + mock_client_class.return_value = mock_client + + await search.search("Test query", engine="bing") + + # Verify the correct engine was passed + call_kwargs = mock_client.get.call_args[1] + assert call_kwargs["params"]["engine"] == "bing" + + +@pytest.mark.asyncio +async def test_get_text_search_results(serpex_unit_test_env): + """Test get_text_search_results method.""" + # Arrange + search = SerpexSearch() + + mock_http_response = MagicMock() + mock_http_response.json.return_value = { + "metadata": {"number_of_results": 2, "request_time": 0.4, "engine": "google"}, + "results": [ + {"position": 1, "title": "Result 1", "url": "https://test1.com", "snippet": "Snippet 1"}, + {"position": 2, "title": "Result 2", "url": "https://test2.com", "snippet": "Snippet 2"}, + ], + "answers": [], + "infoboxes": [], + "suggestions": [], + "corrections": [], + } + mock_http_response.raise_for_status = MagicMock() + + # Act + with patch("semantic_kernel.connectors.serpex.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + mock_client.__aenter__.return_value = mock_client + mock_client.get.return_value = mock_http_response + mock_client_class.return_value = mock_client + + results = [] + async for result in search.get_text_search_results("Test query"): + results.append(result) + + # Assert + assert len(results) == 2 + assert results[0].name == "Result 1" + assert results[1].name == "Result 2" + + +@pytest.mark.asyncio +async def test_get_search_results(serpex_unit_test_env): + """Test get_search_results method returns SerpexResult objects.""" + # Arrange + search = SerpexSearch() + + mock_http_response = MagicMock() + mock_http_response.json.return_value = { + "metadata": {"number_of_results": 1, "request_time": 0.2, "engine": "duckduckgo"}, + "results": [ + { + "position": 1, + "title": "DuckDuckGo Result", + "url": "https://duck.com", + "snippet": "Privacy search", + }, + ], + "answers": [], + "infoboxes": [], + "suggestions": [], + "corrections": [], + } + mock_http_response.raise_for_status = MagicMock() + + # Act + with patch("semantic_kernel.connectors.serpex.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + mock_client.__aenter__.return_value = mock_client + mock_client.get.return_value = mock_http_response + mock_client_class.return_value = mock_client + + results = [] + async for result in search.get_search_results("Privacy test"): + results.append(result) + + # Assert + assert len(results) == 1 + assert isinstance(results[0], SerpexResult) + assert results[0].title == "DuckDuckGo Result" + assert results[0].url == "https://duck.com"