diff --git a/pkg/kthena-router/accesslog/logger.go b/pkg/kthena-router/accesslog/logger.go index aa8e411bc..b6fcbb140 100644 --- a/pkg/kthena-router/accesslog/logger.go +++ b/pkg/kthena-router/accesslog/logger.go @@ -171,6 +171,18 @@ func (l *accessLoggerImpl) formatText(entry *AccessLogEntry) (string, error) { if entry.ModelServer != "" { line += fmt.Sprintf(" model_server=%s", entry.ModelServer) } + + // Add Gateway API fields + if entry.Gateway != "" { + line += fmt.Sprintf(" gateway=%s", entry.Gateway) + } + if entry.HTTPRoute != "" { + line += fmt.Sprintf(" http_route=%s", entry.HTTPRoute) + } + if entry.InferencePool != "" { + line += fmt.Sprintf(" inference_pool=%s", entry.InferencePool) + } + if entry.SelectedPod != "" { line += fmt.Sprintf(" selected_pod=%s", entry.SelectedPod) } diff --git a/pkg/kthena-router/accesslog/logger_test.go b/pkg/kthena-router/accesslog/logger_test.go index 464750fc4..664b05b14 100644 --- a/pkg/kthena-router/accesslog/logger_test.go +++ b/pkg/kthena-router/accesslog/logger_test.go @@ -37,6 +37,9 @@ func TestAccessLogEntry_ToJSON(t *testing.T) { ModelServer: "default/llama2-server", SelectedPod: "llama2-deployment-5f7b8c9d-xk2p4", RequestID: "test-request-id", + Gateway: "default/test-gateway", + HTTPRoute: "default/test-httproute", + InferencePool: "default/test-inferencepool", InputTokens: 150, OutputTokens: 75, DurationTotal: 2350, @@ -68,6 +71,9 @@ func TestAccessLogEntry_ToJSON(t *testing.T) { assert.Equal(t, "default/llama2-route-v1", parsed["model_route"]) assert.Equal(t, "default/llama2-server", parsed["model_server"]) assert.Equal(t, "llama2-deployment-5f7b8c9d-xk2p4", parsed["selected_pod"]) + assert.Equal(t, "default/test-gateway", parsed["gateway"]) + assert.Equal(t, "default/test-httproute", parsed["http_route"]) + assert.Equal(t, "default/test-inferencepool", parsed["inference_pool"]) assert.Equal(t, float64(150), parsed["input_tokens"]) assert.Equal(t, float64(75), parsed["output_tokens"]) @@ -90,6 +96,9 @@ func TestAccessLogEntry_ToText(t *testing.T) { ModelServer: "default/llama2-server", SelectedPod: "llama2-deployment-5f7b8c9d-xk2p4", RequestID: "test-request-id", + Gateway: "default/test-gateway", + HTTPRoute: "default/test-httproute", + InferencePool: "default/test-inferencepool", InputTokens: 150, OutputTokens: 75, DurationTotal: 2350, @@ -118,6 +127,9 @@ func TestAccessLogEntry_ToText(t *testing.T) { `model_server=default/llama2-server`, `selected_pod=llama2-deployment-5f7b8c9d-xk2p4`, `request_id=test-request-id`, + `gateway=default/test-gateway`, + `http_route=default/test-httproute`, + `inference_pool=default/test-inferencepool`, `tokens=150/75`, `timings=2350ms(45+2180+5)`, } @@ -139,6 +151,9 @@ func TestAccessLogEntry_WithError(t *testing.T) { Message: "Model inference timeout after 30s", }, ModelName: "llama2-7b", + Gateway: "default/test-gateway", + HTTPRoute: "default/test-httproute", + InferencePool: "default/test-inferencepool", DurationTotal: 100, DurationRequestProcessing: 50, DurationUpstreamProcessing: 0, @@ -163,12 +178,18 @@ func TestAccessLogEntry_WithError(t *testing.T) { errorInfo := parsed["error"].(map[string]interface{}) assert.Equal(t, "timeout", errorInfo["type"]) assert.Equal(t, "Model inference timeout after 30s", errorInfo["message"]) + assert.Equal(t, "default/test-gateway", parsed["gateway"]) + assert.Equal(t, "default/test-httproute", parsed["http_route"]) + assert.Equal(t, "default/test-inferencepool", parsed["inference_pool"]) // Test text format config.Format = FormatText output, err = logger.formatText(entry) require.NoError(t, err) assert.Contains(t, output, "error=timeout:Model inference timeout after 30s") + assert.Contains(t, output, "gateway=default/test-gateway") + assert.Contains(t, output, "http_route=default/test-httproute") + assert.Contains(t, output, "inference_pool=default/test-inferencepool") } func TestAccessLogContext_Lifecycle(t *testing.T) { @@ -203,6 +224,11 @@ func TestAccessLogContext_Lifecycle(t *testing.T) { assert.Equal(t, "rate_limit", ctx.Error.Type) assert.Equal(t, "Too many requests", ctx.Error.Message) + // Set Gateway API info + ctx.Gateway = "default/test-gateway" + ctx.HTTPRoute = "default/test-httproute" + ctx.InferencePool = "default/test-inferencepool" + // Mark timing phases time.Sleep(1 * time.Millisecond) // Ensure time difference ctx.MarkRequestProcessingEnd() @@ -230,6 +256,9 @@ func TestAccessLogContext_Lifecycle(t *testing.T) { assert.Greater(t, entry.DurationTotal, int64(0)) assert.NotNil(t, entry.Error) assert.Equal(t, "rate_limit", entry.Error.Type) + assert.Equal(t, "default/test-gateway", entry.Gateway) + assert.Equal(t, "default/test-httproute", entry.HTTPRoute) + assert.Equal(t, "default/test-inferencepool", entry.InferencePool) } func TestNoopAccessLogger(t *testing.T) { diff --git a/pkg/kthena-router/accesslog/middleware.go b/pkg/kthena-router/accesslog/middleware.go index efa57a602..10ddfb09b 100644 --- a/pkg/kthena-router/accesslog/middleware.go +++ b/pkg/kthena-router/accesslog/middleware.go @@ -128,3 +128,13 @@ func MarkResponseProcessingEnd(c *gin.Context) { ctx.MarkResponseProcessingEnd() } } + +// SetGatewayAPIInfo sets Gateway API information in the access log context +// gateway, httpRoute, and inferencePool should be in namespace/name format +func SetGatewayAPIInfo(c *gin.Context, gateway, httpRoute, inferencePool string) { + if ctx := GetAccessLogContext(c); ctx != nil { + ctx.Gateway = gateway + ctx.HTTPRoute = httpRoute + ctx.InferencePool = inferencePool + } +} diff --git a/pkg/kthena-router/accesslog/types.go b/pkg/kthena-router/accesslog/types.go index c7940d421..baff064bf 100644 --- a/pkg/kthena-router/accesslog/types.go +++ b/pkg/kthena-router/accesslog/types.go @@ -39,6 +39,11 @@ type AccessLogEntry struct { SelectedPod string `json:"selected_pod,omitempty"` RequestID string `json:"request_id,omitempty"` + // Gateway API information + Gateway string `json:"gateway,omitempty"` + HTTPRoute string `json:"http_route,omitempty"` + InferencePool string `json:"inference_pool,omitempty"` + // Token information InputTokens int `json:"input_tokens,omitempty"` OutputTokens int `json:"output_tokens,omitempty"` @@ -69,6 +74,11 @@ type AccessLogContext struct { ModelServer string SelectedPod string + // Gateway API information + Gateway string + HTTPRoute string + InferencePool string + // Token counts InputTokens int OutputTokens int @@ -189,6 +199,9 @@ func (ctx *AccessLogContext) ToAccessLogEntry(statusCode int) *AccessLogEntry { RequestID: ctx.RequestID, InputTokens: ctx.InputTokens, OutputTokens: ctx.OutputTokens, + Gateway: ctx.Gateway, + HTTPRoute: ctx.HTTPRoute, + InferencePool: ctx.InferencePool, DurationTotal: total, DurationRequestProcessing: requestProcessing, DurationUpstreamProcessing: upstreamProcessing, diff --git a/pkg/kthena-router/router/router.go b/pkg/kthena-router/router/router.go index d0dc45e2c..29c84aa78 100644 --- a/pkg/kthena-router/router/router.go +++ b/pkg/kthena-router/router/router.go @@ -53,7 +53,9 @@ import ( const ( // Context keys for gin context - GatewayKey = "gatewayKey" + GatewayKey = "gatewayKey" + HTTPRouteKey = "httpRouteName" + InferencePoolKey = "inferencePoolName" ) func getEnvBool(key string, fallback bool) bool { @@ -394,6 +396,25 @@ func (r *Router) doLoadbalance(c *gin.Context, modelRequest ModelRequest) { c.Set("modelRouteName", modelRouteName) } + // Set Gateway API info from context + var gatewayKeyForLog, httpRouteKey, inferencePoolKey string + if key, exists := c.Get(GatewayKey); exists { + if k, ok := key.(string); ok { + gatewayKeyForLog = k + } + } + if httpRouteName, exists := c.Get(HTTPRouteKey); exists { + if name, ok := httpRouteName.(types.NamespacedName); ok { + httpRouteKey = fmt.Sprintf("%s/%s", name.Namespace, name.Name) + } + } + if inferencePoolName, exists := c.Get(InferencePoolKey); exists { + if name, ok := inferencePoolName.(types.NamespacedName); ok { + inferencePoolKey = fmt.Sprintf("%s/%s", name.Namespace, name.Name) + } + } + accesslog.SetGatewayAPIInfo(c, gatewayKeyForLog, httpRouteKey, inferencePoolKey) + if len(ctx.BestPods) > 0 && ctx.BestPods[0].Pod != nil { selectedPod := ctx.BestPods[0].Pod.Name accesslog.SetRequestRouting(c, modelRouteName, modelServerFullName, selectedPod) @@ -515,6 +536,18 @@ func (r *Router) handleHTTPRoute(c *gin.Context, gatewayKey string) (bool, types return false, types.NamespacedName{} } + // Store Gateway key in context for access log + if gatewayKey != "" { + c.Set(GatewayKey, gatewayKey) + } + + // Store HTTPRoute name in context for access log + httpRouteName := types.NamespacedName{ + Namespace: matchedRoute.Namespace, + Name: matchedRoute.Name, + } + c.Set(HTTPRouteKey, httpRouteName) + // Store the matched prefix in context for URL rewriting if matchedPrefix != "" { c.Set("matchedPrefix", matchedPrefix) @@ -548,6 +581,9 @@ func (r *Router) handleHTTPRoute(c *gin.Context, gatewayKey string) (bool, types return false, types.NamespacedName{} } + // Store InferencePool name in context for access log + c.Set(InferencePoolKey, inferencePoolName) + // Apply HTTPURLRewriteFilter if present if matchedRule != nil && matchedRule.Filters != nil { for _, filter := range matchedRule.Filters { diff --git a/pkg/kthena-router/router/router_test.go b/pkg/kthena-router/router/router_test.go index b0030a263..ad25a1613 100644 --- a/pkg/kthena-router/router/router_test.go +++ b/pkg/kthena-router/router/router_test.go @@ -33,12 +33,14 @@ import ( "github.com/agiledragon/gomonkey/v2" "github.com/gin-gonic/gin" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "istio.io/istio/pkg/util/sets" corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" aiv1alpha1 "github.com/volcano-sh/kthena/pkg/apis/networking/v1alpha1" "github.com/volcano-sh/kthena/pkg/kthena-router/accesslog" @@ -426,6 +428,115 @@ func TestRouter_HandlerFunc_ScheduleFailure(t *testing.T) { assert.Contains(t, w.Body.String(), "can't schedule to target pod") } +func TestRouter_HandlerFunc_AccessLogRoutingInfo(t *testing.T) { + // 1. Setup backend mock + backendHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, `{"id":"response-id"}`) + }) + router, store, backend := setupTestRouter(backendHandler) + defer backend.Close() + + backendURL, _ := url.Parse(backend.URL) + backendIP := backendURL.Hostname() + backendPort, _ := strconv.Atoi(backendURL.Port()) + + // 2. Populate store + modelServer := &aiv1alpha1.ModelServer{ + ObjectMeta: v1.ObjectMeta{Name: "ms-1", Namespace: "default"}, + Spec: aiv1alpha1.ModelServerSpec{ + Model: func(s string) *string { return &s }("test-model-base"), + WorkloadPort: aiv1alpha1.WorkloadPort{Port: int32(backendPort)}, + InferenceEngine: "vLLM", + }, + } + pod1 := &corev1.Pod{ + ObjectMeta: v1.ObjectMeta{Name: "pod-1", Namespace: "default"}, + Status: corev1.PodStatus{PodIP: backendIP, Phase: corev1.PodRunning}, + } + // Create Gateway and add to store (required for ModelRoute with parentRefs to match) + gateway := &gatewayv1.Gateway{ + ObjectMeta: v1.ObjectMeta{Name: "test-gateway", Namespace: "default"}, + Spec: gatewayv1.GatewaySpec{ + Listeners: []gatewayv1.Listener{ + { + Name: "http", + Port: 80, + Protocol: gatewayv1.HTTPProtocolType, + }, + }, + }, + } + gatewayKey := "default/test-gateway" + store.AddOrUpdateGateway(gateway) + + // Create ModelRoute with parentRefs pointing to the Gateway + gatewayKind := gatewayv1.Kind("Gateway") + modelRoute := &aiv1alpha1.ModelRoute{ + ObjectMeta: v1.ObjectMeta{Name: "mr-1", Namespace: "default"}, + Spec: aiv1alpha1.ModelRouteSpec{ + ModelName: "test-model", + ParentRefs: []gatewayv1.ParentReference{ + { + Name: gatewayv1.ObjectName("test-gateway"), + Kind: &gatewayKind, + Group: func() *gatewayv1.Group { g := gatewayv1.Group("gateway.networking.k8s.io"); return &g }(), + }, + }, + Rules: []*aiv1alpha1.Rule{ + { + TargetModels: []*aiv1alpha1.TargetModel{ + {ModelServerName: "ms-1"}, + }, + }, + }, + }, + } + + store.AddOrUpdateModelServer(modelServer, sets.New(types.NamespacedName{Name: "pod-1", Namespace: "default"})) + store.AddOrUpdatePod(pod1, []*aiv1alpha1.ModelServer{modelServer}) + store.AddOrUpdateModelRoute(modelRoute) + + // 3. Create request with Gateway API info in context + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + + reqBody := `{"model": "test-model", "prompt": "hello"}` + c.Request, _ = http.NewRequest("POST", "/v1/chat/completions", bytes.NewBufferString(reqBody)) + c.Request.Header.Set("Content-Type", "application/json") + + // Set Gateway API info in gin.Context + // We set GatewayKey because ModelRoute has parentRefs that match the Gateway + c.Set(GatewayKey, gatewayKey) + c.Set(HTTPRouteKey, types.NamespacedName{Namespace: "default", Name: "test-httproute"}) + c.Set(InferencePoolKey, types.NamespacedName{Namespace: "default", Name: "test-inferencepool"}) + + // 4. Execute access log middleware first to create access log context + router.AccessLog()(c) + + // 5. Execute handler + router.HandlerFunc()(c) + + // 6. Verify request succeeded + assert.Equal(t, http.StatusOK, w.Code, "Request should succeed to test routing info setting") + + // 7. Verify access log context has all routing information + accessCtx := accesslog.GetAccessLogContext(c) + require.NotNil(t, accessCtx, "Access log context should be set") + + // Verify AI-specific routing information + assert.Equal(t, "test-model", accessCtx.ModelName, "ModelName should be set from request") + assert.Equal(t, "default/mr-1", accessCtx.ModelRoute, "ModelRoute should be set from matched ModelRoute") + assert.Equal(t, "default/ms-1", accessCtx.ModelServer, "ModelServer should be set from matched ModelServer") + assert.Equal(t, "pod-1", accessCtx.SelectedPod, "SelectedPod should be set from scheduled pod") + assert.Equal(t, accessCtx.RequestID, c.Request.Header.Get("x-request-id"), "RequestID should match request header") + + // Verify Gateway API information + assert.Equal(t, gatewayKey, accessCtx.Gateway, "Gateway should be set from gin.Context GatewayKey") + assert.Equal(t, "default/test-httproute", accessCtx.HTTPRoute, "HTTPRoute should be set from gin.Context") + assert.Equal(t, "default/test-inferencepool", accessCtx.InferencePool, "InferencePool should be set from gin.Context") +} + func TestAccessLogConfigurationFromEnv(t *testing.T) { // Save original environment variables originalEnabled := os.Getenv("ACCESS_LOG_ENABLED") diff --git a/test/e2e/router/gateway-inference-extension/e2e_test.go b/test/e2e/router/gateway-inference-extension/e2e_test.go index b175183b4..dc3fff240 100644 --- a/test/e2e/router/gateway-inference-extension/e2e_test.go +++ b/test/e2e/router/gateway-inference-extension/e2e_test.go @@ -20,8 +20,10 @@ import ( "context" "fmt" "os" + "strings" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" networkingv1alpha1 "github.com/volcano-sh/kthena/pkg/apis/networking/v1alpha1" "github.com/volcano-sh/kthena/test/e2e/framework" @@ -244,4 +246,47 @@ func TestBothAPIsConfigured(t *testing.T) { utils.NewChatMessage("user", "Hello HTTPRoute"), } utils.CheckChatCompletions(t, "deepseek-ai/DeepSeek-R1-Distill-Qwen-7B", messages7b) + + // 5. Verify access logs contain correct AI-specific and Gateway API fields with mutual exclusivity + t.Log("Verifying router access logs for AI-specific and Gateway API fields...") + routerPod := utils.GetRouterPod(t, testCtx.KubeClient, kthenaNamespace) + + // Read a reasonable tail of logs to find both requests + var modelRouteLogLine string + var httpRouteLogLine string + + const tailLines int64 = 2000 + logs := utils.FetchPodLogs(t, testCtx.KubeClient, kthenaNamespace, routerPod.Name, tailLines) + require.NotEmpty(t, logs, "Router logs should not be empty") + t.Logf("==== Router logs (tail %d lines) ====\n%s\n==== End router logs ====", tailLines, logs) + + for _, line := range strings.Split(logs, "\n") { + // Match both text format (model_route=...) and JSON format ("model_route": ...) + if modelRouteLogLine == "" && strings.Contains(line, "model_route") && strings.Contains(line, "model_server") { + modelRouteLogLine = line + } + if httpRouteLogLine == "" && strings.Contains(line, "http_route") && strings.Contains(line, "inference_pool") { + httpRouteLogLine = line + } + } + + t.Logf("ModelRoute log line: %s", modelRouteLogLine) + t.Logf("HTTPRoute log line: %s", httpRouteLogLine) + require.NotEmpty(t, modelRouteLogLine, "Expected to find an access log line for ModelRoute/ModelServer request") + require.NotEmpty(t, httpRouteLogLine, "Expected to find an access log line for HTTPRoute/InferencePool request") + + // Verify AI-specific fields are present (support both text and JSON formats) + assert.Contains(t, modelRouteLogLine, "model_name", "ModelRoute access log should contain model_name") + assert.Contains(t, httpRouteLogLine, "model_name", "HTTPRoute access log should contain model_name") + + // Verify ModelRoute/ModelServer vs HTTPRoute/InferencePool mutual exclusivity by field presence + assert.Contains(t, modelRouteLogLine, "model_route", "ModelRoute log should contain model_route") + assert.Contains(t, modelRouteLogLine, "model_server", "ModelRoute log should contain model_server") + assert.NotContains(t, modelRouteLogLine, "http_route", "ModelRoute log should not contain http_route") + assert.NotContains(t, modelRouteLogLine, "inference_pool", "ModelRoute log should not contain inference_pool") + + assert.Contains(t, httpRouteLogLine, "http_route", "HTTPRoute log should contain http_route") + assert.Contains(t, httpRouteLogLine, "inference_pool", "HTTPRoute log should contain inference_pool") + assert.NotContains(t, httpRouteLogLine, "model_route", "HTTPRoute log should not contain model_route") + assert.NotContains(t, httpRouteLogLine, "model_server", "HTTPRoute log should not contain model_server") } diff --git a/test/e2e/utils/pod.go b/test/e2e/utils/pod.go index cb1f93307..a0d4bd771 100644 --- a/test/e2e/utils/pod.go +++ b/test/e2e/utils/pod.go @@ -18,6 +18,7 @@ package utils import ( "context" + "io" "testing" "github.com/stretchr/testify/require" @@ -49,3 +50,22 @@ func GetRouterPod(t *testing.T, kubeClient kubernetes.Interface, kthenaNamespace return &pods.Items[0] } + +// FetchPodLogs returns recent logs of the specified pod. +// This helper is primarily used by e2e tests to validate router access logs. +func FetchPodLogs(t *testing.T, kubeClient kubernetes.Interface, namespace, podName string, tailLines int64) string { + t.Helper() + + req := kubeClient.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{ + TailLines: &tailLines, + }) + + stream, err := req.Stream(context.Background()) + require.NoError(t, err, "Failed to stream logs for pod %s/%s", namespace, podName) + defer stream.Close() + + data, err := io.ReadAll(stream) + require.NoError(t, err, "Failed to read logs for pod %s/%s", namespace, podName) + + return string(data) +}