Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ StackOne AI provides a unified interface for accessing various SaaS tools throug
- Glob pattern filtering with patterns like `"hris_*"` and exclusions `"!hris_delete_*"`
- Provider and action filtering with `fetch_tools()`
- Multi-account support
- Dynamic MCP-backed discovery via `fetch_tools()` so you can pull the latest tools at runtime (accounts, providers, or globbed actions)
- **Meta Tools** (Beta): Dynamic tool discovery and execution based on natural language queries using hybrid BM25 + TF-IDF search
- Integration with popular AI frameworks:
- OpenAI Functions
Expand Down Expand Up @@ -105,6 +106,8 @@ tools = toolset.get_tools(["hris_*", "!hris_delete_*"])

The `fetch_tools()` method provides advanced filtering by providers, actions, and account IDs:

> `fetch_tools()` uses the StackOne MCP server under the hood. Install the optional extra (`pip install 'stackone-ai[mcp]'`) on Python 3.10+ to enable dynamic discovery.

```python
from stackone_ai import StackOneToolSet

Expand Down
281 changes: 262 additions & 19 deletions stackone_ai/toolset.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,52 @@
# TODO: Remove when Python 3.9 support is dropped
from __future__ import annotations

import asyncio
import base64
import fnmatch
import json
import os
import threading
import warnings
from typing import Any
from collections.abc import Coroutine
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

never heard of these they good?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from dataclasses import dataclass
from importlib import metadata
from typing import Any, TypeVar

from stackone_ai.constants import OAS_DIR
from stackone_ai.models import (
ExecuteConfig,
ParameterLocation,
StackOneTool,
ToolParameters,
Tools,
)
from stackone_ai.specs.parser import OpenAPIParser

try:
_SDK_VERSION = metadata.version("stackone-ai")
except metadata.PackageNotFoundError: # pragma: no cover - best-effort fallback when running from source
_SDK_VERSION = "dev"

DEFAULT_BASE_URL = "https://api.stackone.com"
_RPC_PARAMETER_LOCATIONS = {
"action": ParameterLocation.BODY,
"body": ParameterLocation.BODY,
"headers": ParameterLocation.BODY,
"path": ParameterLocation.BODY,
"query": ParameterLocation.BODY,
}
_USER_AGENT = f"stackone-ai-python/{_SDK_VERSION}"

T = TypeVar("T")


@dataclass
class _McpToolDefinition:
name: str
description: str | None
input_schema: dict[str, Any]


class ToolsetError(Exception):
"""Base exception for toolset errors"""
Expand All @@ -32,6 +66,166 @@ class ToolsetLoadError(ToolsetError):
pass


def _run_async(awaitable: Coroutine[Any, Any, T]) -> T:
"""Run a coroutine, even when called from an existing event loop."""

try:
asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(awaitable)

result: dict[str, T] = {}
error: dict[str, BaseException] = {}

def runner() -> None:
try:
result["value"] = asyncio.run(awaitable)
except BaseException as exc: # pragma: no cover - surfaced in caller context
Copy link

Copilot AI Nov 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except block directly handles BaseException.

Suggested change
except BaseException as exc: # pragma: no cover - surfaced in caller context
except Exception as exc: # pragma: no cover - surfaced in caller context

Copilot uses AI. Check for mistakes.
error["error"] = exc

thread = threading.Thread(target=runner, daemon=True)
thread.start()
thread.join()

if "error" in error:
raise error["error"]

return result["value"]


def _build_auth_header(api_key: str) -> str:
token = base64.b64encode(f"{api_key}:".encode()).decode()
return f"Basic {token}"


def _fetch_mcp_tools(endpoint: str, headers: dict[str, str]) -> list[_McpToolDefinition]:
try:
from mcp import types as mcp_types
from mcp.client.session import ClientSession
from mcp.client.streamable_http import streamablehttp_client
Copy link

Copilot AI Nov 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The import path mcp.client.streamable_http appears to be incorrect. Based on the MCP Python SDK package structure, this should be mcp.client.streamablehttp (without the underscore). The correct import should be from mcp.client.streamablehttp import streamablehttp_client.

Suggested change
from mcp.client.streamable_http import streamablehttp_client
from mcp.client.streamablehttp import streamablehttp_client

Copilot uses AI. Check for mistakes.
except ImportError as exc: # pragma: no cover - depends on optional extra
raise ToolsetConfigError(
"MCP dependencies are required for fetch_tools. Install with 'pip install \"stackone-ai[mcp]\"'."
) from exc

async def _list() -> list[_McpToolDefinition]:
async with streamablehttp_client(endpoint, headers=headers) as (read_stream, write_stream, _):
session = ClientSession(
read_stream,
write_stream,
client_info=mcp_types.Implementation(name="stackone-ai-python", version=_SDK_VERSION),
)
async with session:
await session.initialize()
cursor: str | None = None
collected: list[_McpToolDefinition] = []
while True:
result = await session.list_tools(cursor)
for tool in result.tools:
input_schema = tool.inputSchema or {}
collected.append(
_McpToolDefinition(
name=tool.name,
description=tool.description,
input_schema=dict(input_schema),
)
)
cursor = result.nextCursor
if cursor is None:
break
return collected

return _run_async(_list())


class _StackOneRpcTool(StackOneTool):
"""RPC-backed tool wired to the StackOne actions RPC endpoint."""

def __init__(
self,
*,
name: str,
description: str,
parameters: ToolParameters,
api_key: str,
base_url: str,
account_id: str | None,
) -> None:
execute_config = ExecuteConfig(
method="POST",
url=f"{base_url.rstrip('/')}/actions/rpc",
name=name,
headers={},
body_type="json",
parameter_locations=dict(_RPC_PARAMETER_LOCATIONS),
)
super().__init__(
description=description,
parameters=parameters,
_execute_config=execute_config,
_api_key=api_key,
_account_id=account_id,
)

def execute(
self, arguments: str | dict[str, Any] | None = None, *, options: dict[str, Any] | None = None
) -> dict[str, Any]:
parsed_arguments = self._parse_arguments(arguments)

body_payload = self._extract_record(parsed_arguments.pop("body", None))
headers_payload = self._extract_record(parsed_arguments.pop("headers", None))
path_payload = self._extract_record(parsed_arguments.pop("path", None))
query_payload = self._extract_record(parsed_arguments.pop("query", None))

rpc_body: dict[str, Any] = dict(body_payload or {})
for key, value in parsed_arguments.items():
rpc_body[key] = value

payload: dict[str, Any] = {
"action": self.name,
"body": rpc_body,
"headers": self._build_action_headers(headers_payload),
}
if path_payload:
payload["path"] = path_payload
if query_payload:
payload["query"] = query_payload

return super().execute(payload, options=options)

def _parse_arguments(self, arguments: str | dict[str, Any] | None) -> dict[str, Any]:
if arguments is None:
return {}
if isinstance(arguments, str):
parsed = json.loads(arguments)
else:
parsed = arguments
if not isinstance(parsed, dict):
raise ValueError("Tool arguments must be a JSON object")
return dict(parsed)

@staticmethod
def _extract_record(value: Any) -> dict[str, Any] | None:
if isinstance(value, dict):
return dict(value)
return None

def _build_action_headers(self, additional_headers: dict[str, Any] | None) -> dict[str, str]:
headers: dict[str, str] = {}
account_id = self.get_account_id()
if account_id:
headers["x-account-id"] = account_id

if additional_headers:
for key, value in additional_headers.items():
if value is None:
continue
headers[str(key)] = str(value)

headers.pop("Authorization", None)
return headers


class StackOneToolSet:
"""Main class for accessing StackOne tools"""

Expand Down Expand Up @@ -59,7 +253,7 @@ def __init__(
)
self.api_key: str = api_key_value
self.account_id = account_id
self.base_url = base_url
self.base_url = base_url or DEFAULT_BASE_URL
self._account_ids: list[str] = []

def _parse_parameters(self, parameters: list[dict[str, Any]]) -> dict[str, dict[str, str]]:
Expand Down Expand Up @@ -194,34 +388,83 @@ def fetch_tools(
tools = toolset.fetch_tools()
"""
try:
# Use account IDs from options, or fall back to instance state
effective_account_ids = account_ids or self._account_ids
if not effective_account_ids and self.account_id:
effective_account_ids = [self.account_id]

all_tools: list[StackOneTool] = []

# Load tools for each account ID or once if no account filtering
if effective_account_ids:
for acc_id in effective_account_ids:
tools = self.get_tools(account_id=acc_id)
all_tools.extend(tools.to_list())
account_scope: list[str | None] = list(dict.fromkeys(effective_account_ids))
else:
tools = self.get_tools()
all_tools.extend(tools.to_list())
account_scope = [None]

endpoint = f"{self.base_url.rstrip('/')}/mcp"
all_tools: list[StackOneTool] = []

for account in account_scope:
headers = self._build_mcp_headers(account)
catalog = _fetch_mcp_tools(endpoint, headers)
for tool_def in catalog:
all_tools.append(self._create_rpc_tool(tool_def, account))

# Apply provider filtering
if providers:
all_tools = [t for t in all_tools if self._filter_by_provider(t.name, providers)]
all_tools = [tool for tool in all_tools if self._filter_by_provider(tool.name, providers)]

# Apply action filtering
if actions:
all_tools = [t for t in all_tools if self._filter_by_action(t.name, actions)]
all_tools = [tool for tool in all_tools if self._filter_by_action(tool.name, actions)]

return Tools(all_tools)

except Exception as e:
if isinstance(e, ToolsetError):
raise
raise ToolsetLoadError(f"Error fetching tools: {e}") from e
except ToolsetError:
raise
except Exception as exc: # pragma: no cover - unexpected runtime errors
raise ToolsetLoadError(f"Error fetching tools: {exc}") from exc

def _build_mcp_headers(self, account_id: str | None) -> dict[str, str]:
headers = {
"Authorization": _build_auth_header(self.api_key),
"User-Agent": _USER_AGENT,
}
if account_id:
headers["x-account-id"] = account_id
return headers

def _create_rpc_tool(self, tool_def: _McpToolDefinition, account_id: str | None) -> StackOneTool:
schema = tool_def.input_schema or {}
parameters = ToolParameters(
type=str(schema.get("type") or "object"),
properties=self._normalize_schema_properties(schema),
)
return _StackOneRpcTool(
name=tool_def.name,
description=tool_def.description or "",
parameters=parameters,
api_key=self.api_key,
base_url=self.base_url,
account_id=account_id,
)

def _normalize_schema_properties(self, schema: dict[str, Any]) -> dict[str, Any]:
properties = schema.get("properties")
if not isinstance(properties, dict):
return {}

required_fields = {str(name) for name in schema.get("required", [])}
normalized: dict[str, Any] = {}

for name, details in properties.items():
if isinstance(details, dict):
prop = dict(details)
else:
prop = {"description": str(details)}

if name in required_fields:
prop.setdefault("nullable", False)
else:
prop.setdefault("nullable", True)

normalized[str(name)] = prop

return normalized

def get_tool(self, name: str, *, account_id: str | None = None) -> StackOneTool | None:
"""Get a specific tool by name
Expand Down
Loading