Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 165 additions & 93 deletions e2e-tests/01-envoy-extproc-test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@
import json
import os
import sys
import unittest
import uuid

import requests

# Add parent directory to path to allow importing common test utilities
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from tests.test_base import SemanticRouterTestBase
# Import test base from same directory
from test_base import SemanticRouterTestBase

# Constants
ENVOY_URL = "http://localhost:8801"
OPENAI_ENDPOINT = "/v1/chat/completions"
DEFAULT_MODEL = "qwen2.5:32b" # Changed from gemma3:27b to match make test-prompt
DEFAULT_MODEL = "Model-A" # Use configured model that matches router config


class EnvoyExtProcTest(SemanticRouterTestBase):
Expand All @@ -35,11 +35,13 @@ def setUp(self):
)

try:
# Use unique content to bypass cache for setup check
setup_id = str(uuid.uuid4())[:8]
payload = {
"model": DEFAULT_MODEL,
"messages": [
{"role": "assistant", "content": "You are a helpful assistant."},
{"role": "user", "content": "test"},
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": f"ExtProc setup test {setup_id}"},
],
}

Expand Down Expand Up @@ -77,8 +79,11 @@ def test_request_headers_propagation(self):
payload = {
"model": DEFAULT_MODEL,
"messages": [
{"role": "assistant", "content": "You are a helpful assistant."},
{"role": "user", "content": "What is the capital of France?"},
{"role": "system", "content": "You are a helpful assistant."},
{
"role": "user",
"content": f"ExtProc header test {trace_id[:8]} - explain photosynthesis briefly.",
},
],
"temperature": 0.7,
}
Expand Down Expand Up @@ -137,158 +142,225 @@ def test_request_headers_propagation(self):
)
self.assertIn("model", response_json, "Response is missing 'model' field")

def test_extproc_override(self):
"""Test that the ExtProc can modify the request's target model."""
def test_extproc_body_modification(self):
"""Test that the ExtProc can modify the request and response bodies."""
self.print_test_header(
"ExtProc Model Override Test",
"Verifies that ExtProc correctly routes different query types to appropriate models",
"ExtProc Body Modification Test",
"Verifies that ExtProc can modify request and response bodies while preserving essential fields",
)

test_cases = [
{
"name": "Math Query",
"content": "What is the derivative of f(x) = x^3 + 2x^2 - 5x + 7?",
"category": "math",
},
trace_id = str(uuid.uuid4())

payload = {
"model": DEFAULT_MODEL,
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{
"role": "user",
"content": f"ExtProc body test {trace_id[:8]} - describe machine learning in simple terms.",
},
],
"temperature": 0.7,
"test_field": "should_be_preserved",
}

headers = {
"Content-Type": "application/json",
"X-Test-Trace-ID": trace_id,
"X-Test-Body-Modification": "true",
}

self.print_request_info(
payload=payload,
expectations="Expect: Request processing with body modifications while preserving essential fields",
)

response = requests.post(
f"{ENVOY_URL}{OPENAI_ENDPOINT}", headers=headers, json=payload, timeout=60
)

response_json = response.json()
self.print_response_info(
response,
{
"name": "Creative Writing Query",
"content": "Write a short story about a space cat.",
"category": "creative",
"Original Model": DEFAULT_MODEL,
"Final Model": response_json.get("model", "Not specified"),
"Test Field Preserved": "test_field" in response_json,
},
]
)

results = {}
passed = response.status_code < 400 and "model" in response_json
self.print_test_result(
passed=passed,
message=(
"Request processed successfully with body modifications"
if passed
else "Issues with request processing or body modifications"
),
)

for test_case in test_cases:
self.print_subtest_header(test_case["name"])
self.assertLess(
response.status_code,
400,
f"Request was rejected with status code {response.status_code}",
)

trace_id = str(uuid.uuid4())
def test_extproc_error_handling(self):
"""Test ExtProc error handling and failure scenarios."""
self.print_test_header(
"ExtProc Error Handling Test",
"Verifies that ExtProc properly handles and recovers from error conditions",
)

payload = {
"model": DEFAULT_MODEL,
"messages": [
{
"role": "assistant",
"content": f"You are an expert in {test_case['category']}.",
},
{"role": "user", "content": test_case["content"]},
],
"temperature": 0.7,
}
# Test with headers that might cause ExtProc issues
payload = {
"model": DEFAULT_MODEL,
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Simple test query"},
],
}

headers = {
"Content-Type": "application/json",
"X-Test-Trace-ID": trace_id,
"X-Original-Model": DEFAULT_MODEL,
"X-Test-Category": test_case["category"],
}
headers = {
"Content-Type": "application/json",
"X-Very-Long-Header": "x" * 1000, # Very long header value
"X-Test-Error-Recovery": "true",
"X-Special-Chars": "data-with-special-chars-!@#$%^&*()", # Special characters
}

self.print_request_info(
payload=payload,
expectations=f"Expect: Query to be routed based on {test_case['category']} category",
)
self.print_request_info(
payload=payload,
expectations="Expect: ExtProc to handle unusual headers gracefully without crashing",
)

try:
response = requests.post(
f"{ENVOY_URL}{OPENAI_ENDPOINT}",
headers=headers,
json=payload,
timeout=60,
)

response_json = response.json()
results[test_case["name"]] = response_json.get("model", "unknown")
# ExtProc should either process successfully or fail gracefully without hanging
passed = (
response.status_code < 500
) # No server errors due to ExtProc issues

self.print_response_info(
response,
{
"Category": test_case["category"],
"Original Model": DEFAULT_MODEL,
"Routed Model": results[test_case["name"]],
"Status Code": response.status_code,
"Error Handling": "Graceful" if passed else "Server Error",
},
)

passed = (
response.status_code < 400 and results[test_case["name"]] != "unknown"
)
self.print_test_result(
passed=passed,
message=(
f"Successfully routed to model: {results[test_case['name']]}"
if passed
else f"Routing failed or returned unknown model"
),
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
# Connection errors are acceptable - it shows the system is protecting itself
passed = True
self.print_response_info(
None,
{
"Connection": "Terminated (Expected)",
"Error Handling": "Protective disconnection",
"Error": str(e)[:100] + "..." if len(str(e)) > 100 else str(e),
},
)

self.assertLess(
response.status_code,
400,
f"{test_case['name']} request failed with status {response.status_code}",
)
self.print_test_result(
passed=passed,
message=(
"ExtProc handled error conditions gracefully"
if passed
else "ExtProc error handling failed"
),
)

# Final summary of routing results
if len(results) == 2:
print("\nRouting Summary:")
print(f"Math Query → {results['Math Query']}")
print(f"Creative Writing Query → {results['Creative Writing Query']}")
# The test passes if either the request succeeds or fails gracefully
self.assertTrue(
passed,
"ExtProc should handle malformed input gracefully",
)

def test_extproc_body_modification(self):
"""Test that the ExtProc can modify the request and response bodies."""
def test_extproc_performance_impact(self):
"""Test that ExtProc doesn't significantly impact request performance."""
self.print_test_header(
"ExtProc Body Modification Test",
"Verifies that ExtProc can modify request and response bodies while preserving essential fields",
"ExtProc Performance Impact Test",
"Verifies that ExtProc processing doesn't add excessive latency",
)

# Generate unique content for cache bypass
trace_id = str(uuid.uuid4())

payload = {
"model": DEFAULT_MODEL,
"messages": [
{"role": "assistant", "content": "You are a helpful assistant."},
{"role": "user", "content": "What is quantum computing?"},
{"role": "system", "content": "You are a helpful assistant."},
{
"role": "user",
"content": f"ExtProc performance test {trace_id[:8]} - what is artificial intelligence?",
},
],
"temperature": 0.7,
"test_field": "should_be_preserved",
}

headers = {
# Test with minimal ExtProc processing
headers_minimal = {"Content-Type": "application/json"}

# Test with ExtProc headers
headers_extproc = {
"Content-Type": "application/json",
"X-Test-Trace-ID": trace_id,
"X-Test-Body-Modification": "true",
"X-Test-Performance": "true",
"X-Processing-Mode": "full",
}

self.print_request_info(
payload=payload,
expectations="Expect: Request processing with body modifications while preserving essential fields",
expectations="Expect: Reasonable response times with ExtProc processing",
)

import time

# Measure response time with ExtProc
start_time = time.time()
response = requests.post(
f"{ENVOY_URL}{OPENAI_ENDPOINT}", headers=headers, json=payload, timeout=60
f"{ENVOY_URL}{OPENAI_ENDPOINT}",
headers=headers_extproc,
json=payload,
timeout=60,
)
response_time = time.time() - start_time

passed = (
response.status_code < 400 and response_time < 30.0
) # Reasonable timeout

response_json = response.json()
self.print_response_info(
response,
{
"Original Model": DEFAULT_MODEL,
"Final Model": response_json.get("model", "Not specified"),
"Test Field Preserved": "test_field" in response_json,
"Response Time": f"{response_time:.2f}s",
"Performance": (
"Acceptable" if response_time < 10.0 else "Slow but functional"
),
},
)

passed = response.status_code < 400 and "model" in response_json
self.print_test_result(
passed=passed,
message=(
"Request processed successfully with body modifications"
f"ExtProc processing completed in {response_time:.2f}s"
if passed
else "Issues with request processing or body modifications"
else f"ExtProc processing too slow: {response_time:.2f}s"
),
)

self.assertLess(
response.status_code,
400,
f"Request was rejected with status code {response.status_code}",
"ExtProc should not cause request failures",
)
self.assertLess(
response_time,
30.0,
"ExtProc should not cause excessive delays",
)


Expand Down
Loading