Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 117 additions & 4 deletions src/solace_agent_mesh/agent/protocol/event_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@
topic_matches_subscription,
translate_a2a_to_adk_content,
)
from ...common.constants import (
EXTENSION_URI_AGENT_TYPE,
EXTENSION_URI_SCHEMAS,
)
from ...common.a2a.types import ToolsExtensionParams
from ...common.data_parts import ToolResultData
from ..sac.task_execution_context import TaskExecutionContext
Expand Down Expand Up @@ -417,6 +421,61 @@ async def handle_a2a_request(component, message: SolaceMessage):
)
return None

# Check for structured invocation mode
if method in ["message/send", "message/stream"]:
a2a_message = a2a.get_message_from_send_request(a2a_request)
invocation_data = component.structured_invocation_handler.extract_structured_invocation_context(
a2a_message
)

if invocation_data:
log.info(
"%s Detected structured invocation request for node '%s' in context '%s'. Delegating to StructuredInvocationHandler.",
component.log_identifier,
invocation_data.node_id,
invocation_data.workflow_name,
)

# Extract context needed for handler
logical_task_id = str(a2a.get_request_id(a2a_request))
original_session_id = a2a_message.context_id
user_id = message.get_user_properties().get("userId", "default_user")

# For structured invocations, we use the original session ID as the effective session ID
# because the caller manages the session scope.

a2a_context = {
"logical_task_id": logical_task_id,
"session_id": original_session_id,
"effective_session_id": original_session_id,
"user_id": user_id,
"jsonrpc_request_id": jsonrpc_request_id,
"contextId": original_session_id,
"messageId": a2a_message.message_id,
"replyToTopic": reply_topic_from_peer,
"a2a_user_config": a2a_user_config,
"statusTopic": status_topic_from_peer,
}
# Note: original_solace_message is NOT stored in a2a_context to avoid
# serialization issues when a2a_context is stored in ADK session state.
# It is stored in TaskExecutionContext by the structured invocation handler.

# Execute as structured invocation
loop = component.get_async_loop()
if loop and loop.is_running():
asyncio.run_coroutine_threadsafe(
component.structured_invocation_handler.execute_structured_invocation(
a2a_message, invocation_data, a2a_context, message
),
loop,
)
else:
log.error(
"%s Async loop not available. Cannot execute structured invocation.",
component.log_identifier,
)
return

if method == "tasks/cancel":
logical_task_id = a2a.get_task_id_from_cancel_request(a2a_request)
log.info(
Expand Down Expand Up @@ -515,7 +574,7 @@ async def handle_a2a_request(component, message: SolaceMessage):
is_paused=False,
exception=TaskCancelledError(
f"Task {logical_task_id} cancelled while paused."
)
),
),
loop,
)
Expand Down Expand Up @@ -1407,13 +1466,26 @@ async def handle_a2a_response(component, message: SolaceMessage):
# final Task response metadata.
filtered_data_parts = []
for data_part in data_parts:
# Filter out artifact creation progress from peer agents
if isinstance(data_part.data, dict) and data_part.data.get("type") == "artifact_creation_progress":
log.debug(
"%s Filtered out artifact_creation_progress DataPart from peer sub-task %s. Not forwarding to user.",
component.log_identifier,
sub_task_id,
)
continue
# Filter out workflow status updates to prevent duplication in the gateway
# The gateway already sees these events via subscription to the peer agent
if isinstance(data_part.data, dict):
data_type = data_part.data.get("type", "")
if data_type.startswith("workflow_"):
log.debug(
"%s Skipping forwarding of workflow status update '%s' from peer for sub-task %s.",
component.log_identifier,
data_type,
sub_task_id,
)
continue
filtered_data_parts.append(data_part)

# Only forward if there are non-filtered data parts
Expand Down Expand Up @@ -1881,6 +1953,21 @@ def publish_agent_card(component):

extensions_list = []

# Create the extension object for agent type.
agent_type = component.get_config("agent_type", "standard")
if agent_type != "standard":
agent_type_extension = AgentExtension(
uri=EXTENSION_URI_AGENT_TYPE,
description="Specifies the type of agent (e.g., 'workflow').",
params={"type": agent_type},
)
extensions_list.append(agent_type_extension)
log.debug(
"%s Added agent_type extension: %s",
component.log_identifier,
agent_type,
)

# Create the extension object for deployment tracking.
deployment_config = component.get_config("deployment", {})
deployment_id = deployment_config.get("id")
Expand All @@ -1890,13 +1977,13 @@ def publish_agent_card(component):
uri=DEPLOYMENT_EXTENSION_URI,
description="SAM deployment tracking for rolling updates",
required=False,
params={"id": deployment_id}
params={"id": deployment_id},
)
extensions_list.append(deployment_extension)
log.debug(
"%s Added deployment extension with ID: %s",
component.log_identifier,
deployment_id
deployment_id,
)

# Create the extension object for peer agents.
Expand Down Expand Up @@ -1940,6 +2027,30 @@ def publish_agent_card(component):
)
extensions_list.append(tools_extension)

# Create the extension object for the agent's input/output schemas.
input_schema = component.get_config("input_schema")
output_schema = component.get_config("output_schema")

if input_schema or output_schema:
schema_params = {}
if input_schema:
schema_params["input_schema"] = input_schema
if output_schema:
schema_params["output_schema"] = output_schema

schemas_extension = AgentExtension(
uri=EXTENSION_URI_SCHEMAS,
description="Input and output JSON schemas for the agent.",
params=schema_params,
)
extensions_list.append(schemas_extension)
log.debug(
"%s Added schemas extension (input: %s, output: %s)",
component.log_identifier,
"present" if input_schema else "none",
"present" if output_schema else "none",
)

# Build the capabilities object, including our custom extensions.
capabilities = AgentCapabilities(
streaming=supports_streaming,
Expand All @@ -1952,11 +2063,13 @@ def publish_agent_card(component):
# The 'tools' field is not part of the official AgentCard spec.
# The tools are now included as an extension.

# Ensure all skills have a 'tags' field to prevent validation errors.
# Ensure all skills have 'tags' and 'description' fields to prevent validation errors.
processed_skills = []
for skill in skills_from_config:
if "tags" not in skill:
skill["tags"] = []
if "description" not in skill:
skill["description"] = "No description provided."
processed_skills.append(skill)

agent_card = AgentCard(
Expand Down
12 changes: 12 additions & 0 deletions src/solace_agent_mesh/agent/sac/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ class SamAgentAppConfig(SamConfigBase):
description="Absolute topic prefix for A2A communication (e.g., 'myorg/dev').",
)
agent_name: str = Field(..., description="Unique name for this ADK agent instance.")
agent_type: Literal["standard", "workflow"] = Field(
default="standard",
description="Type of agent: 'standard' (default) or 'workflow'.",
)
display_name: str = Field(
default=None,
description="Human-friendly display name for this ADK agent instance.",
Expand Down Expand Up @@ -395,6 +399,14 @@ class SamAgentAppConfig(SamConfigBase):
default=True,
description="Inject instructions about the 'artifact_content' embed type.",
)
input_schema: Optional[Dict[str, Any]] = Field(
default=None,
description="JSON Schema for validating agent input when used as a workflow node.",
)
output_schema: Optional[Dict[str, Any]] = Field(
default=None,
description="JSON Schema for validating agent output when used as a workflow node.",
)
agent_card: AgentCardConfig = Field(
..., description="Static definition of this agent's capabilities for discovery."
)
Expand Down
89 changes: 72 additions & 17 deletions src/solace_agent_mesh/agent/sac/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
PEER_TOOL_PREFIX,
PeerAgentTool,
)
from ...agent.tools.workflow_tool import WorkflowAgentTool
from ...agent.tools.registry import tool_registry
from ...agent.utils.config_parser import resolve_instruction_provider
from ...common import a2a
Expand All @@ -77,12 +78,15 @@
DEFAULT_COMMUNICATION_TIMEOUT,
HEALTH_CHECK_INTERVAL_SECONDS,
HEALTH_CHECK_TTL_SECONDS,
EXTENSION_URI_AGENT_TYPE,
EXTENSION_URI_SCHEMAS,
)
from ...common.a2a.types import ArtifactInfo
from ...common.data_parts import AgentProgressUpdateData, ArtifactSavedData
from ...common.middleware.registry import MiddlewareRegistry
from ...common.sac.sam_component_base import SamComponentBase
from ...common.utils.rbac_utils import validate_agent_access
from .structured_invocation.handler import StructuredInvocationHandler

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -266,6 +270,10 @@ def __init__(self, **kwargs):
Callable[[CallbackContext, LlmRequest], Optional[str]]
] = None
self._active_background_tasks = set()

# Initialize structured invocation support
self.structured_invocation_handler = StructuredInvocationHandler(self)

try:
self.agent_specific_state: Dict[str, Any] = {}
init_func_details = self.get_config("agent_init_function")
Expand Down Expand Up @@ -985,22 +993,60 @@ def _inject_peer_tools_callback(
continue

try:
peer_tool_instance = PeerAgentTool(
target_agent_name=peer_name, host_component=self
)
if peer_tool_instance.name not in llm_request.tools_dict:
peer_tools_to_add.append(peer_tool_instance)
# Determine agent type and schemas
agent_type = "standard"
input_schema = None

if agent_card.capabilities and agent_card.capabilities.extensions:
for ext in agent_card.capabilities.extensions:
if ext.uri == EXTENSION_URI_AGENT_TYPE:
agent_type = ext.params.get("type", "standard")
elif ext.uri == EXTENSION_URI_SCHEMAS:
input_schema = ext.params.get("input_schema")

tool_instance = None
tool_description_line = ""

if agent_type == "workflow":
# Default schema if none provided
if not input_schema:
input_schema = {
"type": "object",
"properties": {"text": {"type": "string"}},
"required": ["text"],
}

tool_instance = WorkflowAgentTool(
target_agent_name=peer_name,
input_schema=input_schema,
host_component=self,
)

desc = (
getattr(agent_card, "description", "No description")
or "No description"
)
tool_description_line = f"- `{tool_instance.name}`: {desc}"

else:
# Standard Peer Agent
tool_instance = PeerAgentTool(
target_agent_name=peer_name, host_component=self
)
# Get enhanced description from the tool instance
# which includes capabilities, skills, and tools
enhanced_desc = peer_tool_instance._build_enhanced_description(
enhanced_desc = tool_instance._build_enhanced_description(
agent_card
)
allowed_peer_descriptions.append(
f"\n### `peer_{peer_name}`\n{enhanced_desc}"
)
tool_description_line = f"\n### `peer_{peer_name}`\n{enhanced_desc}"

if tool_instance.name not in llm_request.tools_dict:
peer_tools_to_add.append(tool_instance)
allowed_peer_descriptions.append(tool_description_line)

except Exception as e:
log.error(
"%s Failed to create PeerAgentTool for '%s': %s",
"%s Failed to create tool for '%s': %s",
self.log_identifier,
peer_name,
e,
Expand All @@ -1009,15 +1055,19 @@ def _inject_peer_tools_callback(
if allowed_peer_descriptions:
peer_list_str = "\n".join(allowed_peer_descriptions)
instruction_text = (
"## Peer Agent Delegation\n\n"
"You can delegate tasks to other specialized agents if they are better suited.\n\n"
"**How to delegate:**\n"
"## Peer Agent and Workflow Delegation\n\n"
"You can delegate tasks to other specialized agents or workflows if they are better suited.\n\n"
"**How to delegate to peer agents:**\n"
"- Use the `peer_<agent_name>(task_description: str)` tool for delegation\n"
"- Replace `<agent_name>` with the actual name of the target agent\n"
"- Provide a clear and detailed `task_description` for the peer agent\n"
"- **Important:** The peer agent does not have access to your session history, "
"so you must provide all required context necessary to fulfill the request\n\n"
"## Available Peer Agents\n"
"**How to delegate to workflows:**\n"
"- Use the `workflow_<agent_name>` tool for workflow delegation\n"
"- Follow the specific parameter requirements defined in the tool schema\n"
"- Workflows also do not have access to your session history\n\n"
"## Available Peer Agents and Workflows\n"
f"{peer_list_str}"
)
callback_context.state["peer_tool_instructions"] = instruction_text
Expand All @@ -1033,8 +1083,9 @@ def _inject_peer_tools_callback(
if len(llm_request.config.tools) > 0:
for tool in peer_tools_to_add:
llm_request.tools_dict[tool.name] = tool
declaration = tool._get_declaration()
llm_request.config.tools[0].function_declarations.append(
tool._get_declaration()
declaration
)
else:
llm_request.append_tools(peer_tools_to_add)
Expand All @@ -1048,6 +1099,7 @@ def _inject_peer_tools_callback(
"%s Failed to append dynamic peer tools to LLM request: %s",
self.log_identifier,
e,
exc_info=True,
)
return None

Expand Down Expand Up @@ -3037,6 +3089,7 @@ def submit_a2a_task(
f"{self.log_identifier}[SubmitA2ATask:{target_agent_name}]"
)
main_task_id = a2a_message.metadata.get("parentTaskId", "unknown_parent")

log.debug(
"%s Submitting non-blocking task for main task %s",
log_identifier_helper,
Expand Down Expand Up @@ -3433,13 +3486,15 @@ def set_agent_system_instruction_string(self, instruction_string: str) -> None:

def set_agent_system_instruction_callback(
self,
callback_function: Callable[[CallbackContext, LlmRequest], Optional[str]],
callback_function: Optional[
Callable[[CallbackContext, LlmRequest], Optional[str]]
],
) -> None:
"""
Sets a callback function to dynamically generate system prompt injections.
Called by the agent's init_function.
"""
if not callable(callback_function):
if callback_function is not None and not callable(callback_function):
log.error(
"%s Invalid type for callback_function: %s. Must be callable.",
self.log_identifier,
Expand Down
Empty file.
Loading