Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions notebooks/03_log_register_agent/log_register_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,54 @@

# COMMAND ----------

# MAGIC %md
# MAGIC ## Test Intelligence Control

# COMMAND ----------

print("Testing intelligence control...")

# Test 1: Intelligence enabled (normal behavior)
test_request_intelligent = ResponsesAgentRequest(
input=[{"role": "user", "content": "how much data did the customer use in May?"}],
custom_inputs={"customer": "CUS-10001", "intelligence_enabled": True}
)

response_intelligent = supervisor.predict(test_request_intelligent)
print("✅ Intelligence enabled test completed")

# Test 2: Intelligence disabled (generic response)
test_request_generic = ResponsesAgentRequest(
input=[{"role": "user", "content": "how much data did the customer use in May?"}],
custom_inputs={"customer": "CUS-10001", "intelligence_enabled": False}
)

response_generic = supervisor.predict(test_request_generic)
print("✅ Intelligence disabled test completed")

# Show response previews
intelligent_text = ""
generic_text = ""

for output_item in response_intelligent.output:
if hasattr(output_item, "content"):
for content_item in output_item.content:
if hasattr(content_item, "text"):
intelligent_text = content_item.text
break

for output_item in response_generic.output:
if hasattr(output_item, "content"):
for content_item in output_item.content:
if hasattr(content_item, "text"):
generic_text = content_item.text
break

print(f"Intelligence enabled response: {intelligent_text[:200]}...")
print(f"Intelligence disabled response: {generic_text[:200]}...")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Log Agent to MLflow

Expand Down
11 changes: 11 additions & 0 deletions notebooks/05_deploy_agent/deploy_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,17 @@
"input": [{"role": "user", "content": "My phone won't connect to WiFi"}],
"description": "Tech support query (no custom inputs required)"
},
# Test intelligence control
{
"input": [{"role": "user", "content": "What was my data usage in May?"}],
"custom_inputs": {"customer": "CUS-10001", "intelligence_enabled": True},
"description": "Intelligence enabled - should use tools"
},
{
"input": [{"role": "user", "content": "What was my data usage in May?"}],
"custom_inputs": {"customer": "CUS-10001", "intelligence_enabled": False},
"description": "Intelligence disabled - should provide generic response"
},
]

for i, test_case in enumerate(test_cases, 1):
Expand Down
153 changes: 153 additions & 0 deletions telco_support_agent/agents/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,136 @@ def _prepare_agent_execution(
error_response=None,
)

def _handle_direct_response(
self, request: ResponsesAgentRequest
) -> ResponsesAgentResponse:
"""Handle request directly without routing when intelligence is disabled.

Args:
request: The request containing user messages

Returns:
Direct response from supervisor as a conversational agent
"""
logger.debug("Handling request directly without sub-agent routing")

# Extract user query
user_query = extract_user_query(request.input)
if not user_query:
error_response = {
"role": "assistant",
"type": "message",
"content": [
{
"type": "output_text",
"text": "I'm sorry, I didn't receive a clear question. Could you please rephrase your request?",
}
],
"id": str(uuid4()),
}
return ResponsesAgentResponse(
output=[error_response],
custom_outputs=request.custom_inputs.copy()
if request.custom_inputs
else {},
)

# Use the supervisor's LLM to generate a direct response
# Convert MLflow Message objects to dictionaries
messages = [i.model_dump() for i in request.input]

# Add a system message to make it conversational but limited
system_message = {
"role": "system",
"content": "You are a helpful customer service representative. You can provide general information and assistance, but you cannot access specific customer data or perform account operations. If a customer asks for specific account information, politely explain that you'll need them to contact customer service for detailed account access.",
}

# Insert system message at the beginning
messages_with_system = [system_message] + messages

try:
# Call LLM directly without tools (intelligence disabled)
# Follow the same pattern as route_query - don't pass request to avoid databricks_options processing
llm_response = self.call_llm(messages_with_system)

response_content = llm_response.get(
"content",
"I apologize, but I'm unable to provide a response at the moment. Please try again.",
)

response_message = {
"role": "assistant",
"type": "message",
"content": [
{
"type": "output_text",
"text": response_content,
}
],
"id": str(uuid4()),
}

# Prepare custom outputs without routing info since no routing occurred
custom_outputs = (
request.custom_inputs.copy() if request.custom_inputs else {}
)
custom_outputs["routing"] = {
"agent_type": "supervisor_direct",
"intelligence_enabled": False,
}

self.update_trace_preview(
user_query=user_query,
response_data={"output": [response_message]},
)

return ResponsesAgentResponse(
output=[response_message],
custom_outputs=custom_outputs,
)

except Exception as e:
logger.error(f"Error generating direct response: {e}")
error_response = {
"role": "assistant",
"type": "message",
"content": [
{
"type": "output_text",
"text": "I apologize, but I'm experiencing technical difficulties. Please try again in a moment.",
}
],
"id": str(uuid4()),
}
return ResponsesAgentResponse(
output=[error_response],
custom_outputs=request.custom_inputs.copy()
if request.custom_inputs
else {},
)

def _handle_direct_response_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
"""Handle streaming request directly without routing when intelligence is disabled.

Args:
request: The request containing user messages

Yields:
ResponsesAgentStreamEvent objects for direct response
"""
logger.debug("Handling streaming request directly without sub-agent routing")

# Get the direct response
direct_response = self._handle_direct_response(request)

# Convert the response to streaming events
for output_item in direct_response.output:
yield ResponsesAgentStreamEvent(
type="response.output_item.done", item=output_item
)

@mlflow.trace(span_type=SpanType.AGENT, name="supervisor")
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
"""Process user query, route to, and yield response from sub-agent.
Expand All @@ -268,6 +398,17 @@ def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
Returns:
The response from the sub-agent
"""
# Extract intelligence_enabled from custom_inputs
intelligence_enabled = (
request.custom_inputs.get("intelligence_enabled", True)
if request.custom_inputs
else True
)

if not intelligence_enabled:
logger.info("Intelligence disabled - using direct response")
return self._handle_direct_response(request)

execution_result = self._prepare_agent_execution(request)

if execution_result.error_response:
Expand Down Expand Up @@ -336,6 +477,18 @@ def predict_stream(
Yields:
ResponsesAgentStreamEvent objects from the sub-agent
"""
# Extract intelligence_enabled from custom_inputs
intelligence_enabled = (
request.custom_inputs.get("intelligence_enabled", True)
if request.custom_inputs
else True
)

if not intelligence_enabled:
logger.info("Intelligence disabled - using direct response (streaming)")
yield from self._handle_direct_response_stream(request)
return

execution_result = self._prepare_agent_execution(request)

if execution_result.error_response:
Expand Down
6 changes: 6 additions & 0 deletions telco_support_agent/ui/backend/app/routes/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ class ChatRequest(BaseModel):
conversation_history: list[ChatMessage] = Field(
default_factory=list, description="Previous conversation messages"
)
intelligence_enabled: bool = Field(
default=True,
description="Whether to use agent intelligence or generic responses",
)


class AgentResponse(BaseModel):
Expand Down Expand Up @@ -126,6 +130,7 @@ async def chat(
message=request.message,
customer_id=request.customer_id,
conversation_history=request.conversation_history,
intelligence_enabled=request.intelligence_enabled,
)

logger.info(f"Agent response tools_used: {response.tools_used}")
Expand Down Expand Up @@ -163,6 +168,7 @@ async def event_generator():
message=request.message,
customer_id=request.customer_id,
conversation_history=request.conversation_history,
intelligence_enabled=request.intelligence_enabled,
):
yield event_data

Expand Down
34 changes: 31 additions & 3 deletions telco_support_agent/ui/backend/app/services/telco_agent_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ def _build_databricks_payload(
customer_id: str,
conversation_history: list[ChatMessage],
stream: bool = False,
intelligence_enabled: bool = True,
) -> dict[str, Any]:
"""Build the payload for Databricks API."""
input_messages = []
Expand All @@ -156,13 +157,24 @@ def _build_databricks_payload(

payload = {
"input": input_messages,
"custom_inputs": {"customer": customer_id},
"custom_inputs": {
"customer": customer_id,
"intelligence_enabled": intelligence_enabled,
},
"databricks_options": {"return_trace": True},
}

if stream:
payload["stream"] = True

# Debug logging
logger.error(
f"DEBUG: TelcoAgentService payload - intelligence_enabled: {intelligence_enabled}"
)
logger.error(
f"DEBUG: TelcoAgentService payload - custom_inputs: {payload['custom_inputs']}"
)

return payload

def _parse_agent_response(
Expand Down Expand Up @@ -434,6 +446,7 @@ async def send_message(
message: str,
customer_id: str,
conversation_history: list[ChatMessage] = None,
intelligence_enabled: bool = True,
) -> AgentResponse:
"""Send a message to the telco support agent."""
if conversation_history is None:
Expand All @@ -454,7 +467,11 @@ async def send_message(

try:
payload = self._build_databricks_payload(
message, customer_id, conversation_history, stream=False
message,
customer_id,
conversation_history,
stream=False,
intelligence_enabled=intelligence_enabled,
)

logger.info(f"Sending request to Databricks for customer {customer_id}")
Expand Down Expand Up @@ -530,6 +547,7 @@ async def send_message_stream(
message: str,
customer_id: str,
conversation_history: list[ChatMessage] = None,
intelligence_enabled: bool = True,
) -> AsyncGenerator[str, None]:
"""Send a message with real streaming response."""
if conversation_history is None:
Expand All @@ -546,11 +564,21 @@ async def send_message_stream(

try:
payload = self._build_databricks_payload(
message, customer_id, conversation_history, stream=True
message,
customer_id,
conversation_history,
stream=True,
intelligence_enabled=intelligence_enabled,
)

logger.info(f"Starting streaming request for customer {customer_id}")
logger.error(
f"DEBUG: Request endpoint: {self.settings.databricks_endpoint}"
)
logger.error(f"DEBUG: Request payload: {payload}")

headers = await self._get_headers()
logger.error(f"DEBUG: Request headers: {headers}")

# make streaming request
async with self.client.stream(
Expand Down
Loading