diff --git a/test/e2e/router/gateway-api/e2e_test.go b/test/e2e/router/gateway-api/e2e_test.go index d433c33d3..71db2489b 100644 --- a/test/e2e/router/gateway-api/e2e_test.go +++ b/test/e2e/router/gateway-api/e2e_test.go @@ -287,3 +287,9 @@ func TestDuplicateModelName(t *testing.T) { t.Log("Test completed successfully: same modelName routes to different models via different ports") } + +// TestModelRouteWithGlobalRateLimit tests global rate limiting enforced by the Kthena Router. +// This test runs the shared test function with Gateway API enabled (with ParentRefs). +func TestModelRouteWithGlobalRateLimit(t *testing.T) { + router.TestModelRouteWithGlobalRateLimitShared(t, testCtx, testNamespace, true, kthenaNamespace) +} diff --git a/test/e2e/router/shared.go b/test/e2e/router/shared.go index 42eb18b19..2be8821c9 100644 --- a/test/e2e/router/shared.go +++ b/test/e2e/router/shared.go @@ -18,6 +18,7 @@ package router import ( "context" + "fmt" "io" "net/http" "strings" @@ -31,6 +32,7 @@ import ( routercontext "github.com/volcano-sh/kthena/test/e2e/router/context" "github.com/volcano-sh/kthena/test/e2e/utils" appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" ) @@ -842,3 +844,277 @@ func TestModelRouteLoraShared(t *testing.T, testCtx *routercontext.RouterTestCon utils.UnloadLoRAAdapter(t, "http://127.0.0.1:9000", "lora-B") t.Log("LoRA adapters unloaded successfully") } + +const ( + redisServerName = "redis-server" + redisServerPort = 6379 +) + +// deployRedisForTest deploys Redis for global rate limiting tests. +func deployRedisForTest(t *testing.T, ctx context.Context, testCtx *routercontext.RouterTestContext, namespace string) string { + t.Helper() + + existingDeploy, err := testCtx.KubeClient.AppsV1().Deployments(namespace).Get(ctx, redisServerName, metav1.GetOptions{}) + if err == nil && existingDeploy != nil { + return fmt.Sprintf("%s.%s.svc.cluster.local:%d", redisServerName, namespace, redisServerPort) + } + + redisServices := utils.LoadMultiResourceYAMLFromFile[corev1.Service]("examples/redis/redis-standalone.yaml") + require.Len(t, redisServices, 1, "Redis YAML should contain 1 Service") + redisService := redisServices[0] + redisService.Namespace = namespace + _, err = testCtx.KubeClient.CoreV1().Services(namespace).Create(ctx, redisService, metav1.CreateOptions{}) + require.NoError(t, err, "Failed to create Redis service") + + redisDeployments := utils.LoadMultiResourceYAMLFromFile[appsv1.Deployment]("examples/redis/redis-standalone.yaml") + require.Len(t, redisDeployments, 1, "Redis YAML should contain 1 Deployment") + redisDeployment := redisDeployments[0] + redisDeployment.Namespace = namespace + _, err = testCtx.KubeClient.AppsV1().Deployments(namespace).Create(ctx, redisDeployment, metav1.CreateOptions{}) + require.NoError(t, err, "Failed to create Redis deployment") + + t.Cleanup(func() { + cleanupCtx := context.Background() + _ = testCtx.KubeClient.AppsV1().Deployments(namespace).Delete(cleanupCtx, redisServerName, metav1.DeleteOptions{}) + _ = testCtx.KubeClient.CoreV1().Services(namespace).Delete(cleanupCtx, redisServerName, metav1.DeleteOptions{}) + }) + + require.Eventually(t, func() bool { + deploy, err := testCtx.KubeClient.AppsV1().Deployments(namespace).Get(ctx, redisServerName, metav1.GetOptions{}) + if err != nil { + return false + } + if deploy.Spec.Replicas == nil { + return deploy.Status.ReadyReplicas > 0 + } + return deploy.Status.ReadyReplicas == *deploy.Spec.Replicas + }, 3*time.Minute, 5*time.Second, "Redis deployment not ready") + + return fmt.Sprintf("%s.%s.svc.cluster.local:%d", redisServerName, namespace, redisServerPort) +} + +// TestModelRouteWithGlobalRateLimitShared is a shared test function that can be used by both +// router and gateway-api test suites. When useGatewayAPI is true, it configures ModelRoute +// with ParentRefs to the default Gateway. +func TestModelRouteWithGlobalRateLimitShared(t *testing.T, testCtx *routercontext.RouterTestContext, testNamespace string, useGatewayApi bool, kthenaNamespace string) { + const ( + rateLimitWindowSeconds = 60 + windowResetBuffer = 10 * time.Second + inputTokenLimit = 10 + tokensPerRequest = 10 + ) + ctx := context.Background() + + standardMessage := []utils.ChatMessage{ + utils.NewChatMessage("user", "hello world"), + } + + // Test 1: Verify Redis connection and basic global rate limiting + t.Run("VerifyRedisConnectionAndBasicRateLimit", func(t *testing.T) { + t.Log("Test 1: Verifying Redis connection and basic global rate limiting") + + redisAddress := deployRedisForTest(t, ctx, testCtx, testNamespace) + + modelRoute := utils.LoadYAMLFromFile[networkingv1alpha1.ModelRoute]("examples/kthena-router/ModelRouteWithGlobalRateLimit.yaml") + modelRoute.Namespace = testNamespace + modelRoute.Spec.RateLimit.Global.Redis.Address = redisAddress + setupModelRouteWithGatewayAPI(modelRoute, useGatewayApi, kthenaNamespace) + + createdModelRoute, err := testCtx.KthenaClient.NetworkingV1alpha1().ModelRoutes(testNamespace).Create(ctx, modelRoute, metav1.CreateOptions{}) + require.NoError(t, err, "Failed to create ModelRoute") + + t.Cleanup(func() { + cleanupCtx := context.Background() + if err := testCtx.KthenaClient.NetworkingV1alpha1().ModelRoutes(testNamespace).Delete(cleanupCtx, createdModelRoute.Name, metav1.DeleteOptions{}); err != nil { + t.Logf("Warning: Failed to delete ModelRoute: %v", err) + } + }) + + require.Eventually(t, func() bool { + mr, err := testCtx.KthenaClient.NetworkingV1alpha1().ModelRoutes(testNamespace).Get(ctx, createdModelRoute.Name, metav1.GetOptions{}) + return err == nil && mr != nil + }, 2*time.Minute, 2*time.Second, "ModelRoute should be created") + + resp := utils.CheckChatCompletions(t, createdModelRoute.Spec.ModelName, standardMessage) + tokensConsumed := resp.Attempts * tokensPerRequest + t.Logf("Router reconciliation complete (consumed %d tokens in %d attempts)", tokensConsumed, resp.Attempts) + + remainingQuota := inputTokenLimit - tokensConsumed + expectedSuccessfulRequests := remainingQuota / tokensPerRequest + + for i := 0; i < expectedSuccessfulRequests; i++ { + resp := utils.SendChatRequest(t, createdModelRoute.Spec.ModelName, standardMessage) + responseBody, readErr := io.ReadAll(resp.Body) + resp.Body.Close() + + require.NoError(t, readErr, "Failed to read response body on request %d", i+1) + require.Equal(t, http.StatusOK, resp.StatusCode, + "Request %d should succeed. Response: %s", i+1, string(responseBody)) + t.Logf("Request %d succeeded", i+1) + } + + rateLimitedResp := utils.SendChatRequest(t, createdModelRoute.Spec.ModelName, standardMessage) + responseBody, readErr := io.ReadAll(rateLimitedResp.Body) + rateLimitedResp.Body.Close() + + require.NoError(t, readErr, "Failed to read rate limit response body") + assert.Equal(t, http.StatusTooManyRequests, rateLimitedResp.StatusCode, + "Request should be rate limited after exhausting quota") + assert.Contains(t, strings.ToLower(string(responseBody)), "rate limit", + "Rate limit error response must contain descriptive message") + + t.Logf("Global rate limit enforced after %d total requests", resp.Attempts+expectedSuccessfulRequests) + }) + + // Test 2: Verify global rate limit sharing across multiple ModelRoutes + t.Run("VerifyGlobalRateLimitSharingAcrossModelRoutes", func(t *testing.T) { + t.Log("Test 2: Verifying global rate limit sharing across multiple ModelRoutes") + + redisAddress := deployRedisForTest(t, ctx, testCtx, testNamespace) + + modelRoute1 := utils.LoadYAMLFromFile[networkingv1alpha1.ModelRoute]("examples/kthena-router/ModelRouteWithGlobalRateLimit.yaml") + modelRoute1.Name = "global-rate-limit-route-1" + modelRoute1.Namespace = testNamespace + modelRoute1.Spec.RateLimit.Global.Redis.Address = redisAddress + setupModelRouteWithGatewayAPI(modelRoute1, useGatewayApi, kthenaNamespace) + + modelRoute2 := utils.LoadYAMLFromFile[networkingv1alpha1.ModelRoute]("examples/kthena-router/ModelRouteWithGlobalRateLimit.yaml") + modelRoute2.Name = "global-rate-limit-route-2" + modelRoute2.Namespace = testNamespace + modelRoute2.Spec.RateLimit.Global.Redis.Address = redisAddress + setupModelRouteWithGatewayAPI(modelRoute2, useGatewayApi, kthenaNamespace) + + createdRoute1, err := testCtx.KthenaClient.NetworkingV1alpha1().ModelRoutes(testNamespace).Create(ctx, modelRoute1, metav1.CreateOptions{}) + require.NoError(t, err, "Failed to create first ModelRoute") + + createdRoute2, err := testCtx.KthenaClient.NetworkingV1alpha1().ModelRoutes(testNamespace).Create(ctx, modelRoute2, metav1.CreateOptions{}) + require.NoError(t, err, "Failed to create second ModelRoute") + + t.Cleanup(func() { + cleanupCtx := context.Background() + if err := testCtx.KthenaClient.NetworkingV1alpha1().ModelRoutes(testNamespace).Delete(cleanupCtx, createdRoute1.Name, metav1.DeleteOptions{}); err != nil { + t.Logf("Warning: Failed to delete ModelRoute: %v", err) + } + if err := testCtx.KthenaClient.NetworkingV1alpha1().ModelRoutes(testNamespace).Delete(cleanupCtx, createdRoute2.Name, metav1.DeleteOptions{}); err != nil { + t.Logf("Warning: Failed to delete ModelRoute: %v", err) + } + }) + + require.Eventually(t, func() bool { + mr1, err1 := testCtx.KthenaClient.NetworkingV1alpha1().ModelRoutes(testNamespace).Get(ctx, createdRoute1.Name, metav1.GetOptions{}) + mr2, err2 := testCtx.KthenaClient.NetworkingV1alpha1().ModelRoutes(testNamespace).Get(ctx, createdRoute2.Name, metav1.GetOptions{}) + return err1 == nil && mr1 != nil && err2 == nil && mr2 != nil + }, 2*time.Minute, 2*time.Second, "Both ModelRoutes should be created") + + resp := utils.CheckChatCompletions(t, createdRoute1.Spec.ModelName, standardMessage) + tokensConsumed := resp.Attempts * tokensPerRequest + t.Logf("Consumed %d tokens through route 1", tokensConsumed) + + remainingQuota := inputTokenLimit - tokensConsumed + expectedSuccessfulRequests := remainingQuota / tokensPerRequest + + for i := 0; i < expectedSuccessfulRequests; i++ { + resp := utils.SendChatRequest(t, createdRoute1.Spec.ModelName, standardMessage) + resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode, "Request %d through route 2 should succeed", i+1) + } + + rateLimitedResp := utils.SendChatRequest(t, createdRoute1.Spec.ModelName, standardMessage) + rateLimitedResp.Body.Close() + assert.Equal(t, http.StatusTooManyRequests, rateLimitedResp.StatusCode, + "Request should be rate limited (shared bucket exhausted)") + + t.Log("Global rate limit sharing across ModelRoutes verified") + }) + + // Test 3: Verify behavior when Redis is unavailable + t.Run("VerifyFallbackWhenRedisUnavailable", func(t *testing.T) { + t.Log("Test 3: Verifying behavior when Redis is unavailable") + + modelRoute := utils.LoadYAMLFromFile[networkingv1alpha1.ModelRoute]("examples/kthena-router/ModelRouteWithGlobalRateLimit.yaml") + modelRoute.Name = "global-rate-limit-invalid-redis" + modelRoute.Namespace = testNamespace + modelRoute.Spec.RateLimit.Global.Redis.Address = "invalid-redis-server.invalid-namespace.svc.cluster.local:6379" + setupModelRouteWithGatewayAPI(modelRoute, useGatewayApi, kthenaNamespace) + + createdModelRoute, err := testCtx.KthenaClient.NetworkingV1alpha1().ModelRoutes(testNamespace).Create(ctx, modelRoute, metav1.CreateOptions{}) + require.NoError(t, err, "Failed to create ModelRoute with invalid Redis") + + t.Cleanup(func() { + cleanupCtx := context.Background() + if err := testCtx.KthenaClient.NetworkingV1alpha1().ModelRoutes(testNamespace).Delete(cleanupCtx, createdModelRoute.Name, metav1.DeleteOptions{}); err != nil { + t.Logf("Warning: Failed to delete ModelRoute: %v", err) + } + }) + + time.Sleep(5 * time.Second) + + resp := utils.SendChatRequest(t, createdModelRoute.Spec.ModelName, standardMessage) + responseBody, _ := io.ReadAll(resp.Body) + resp.Body.Close() + + assert.NotEqual(t, http.StatusOK, resp.StatusCode, "Request should fail when Redis is unavailable") + t.Logf("Response status: %d, body: %s", resp.StatusCode, string(responseBody)) + + t.Log("Behavior when Redis is unavailable verified") + }) + + // Test 4: Verify rate limit persistence across time windows + t.Run("VerifyRateLimitPersistence", func(t *testing.T) { + t.Log("Test 4: Verifying rate limit persistence across time windows") + + redisAddress := deployRedisForTest(t, ctx, testCtx, testNamespace) + + modelRoute := utils.LoadYAMLFromFile[networkingv1alpha1.ModelRoute]("examples/kthena-router/ModelRouteWithGlobalRateLimit.yaml") + modelRoute.Name = "global-rate-limit-persistence" + modelRoute.Namespace = testNamespace + modelRoute.Spec.RateLimit.Global.Redis.Address = redisAddress + setupModelRouteWithGatewayAPI(modelRoute, useGatewayApi, kthenaNamespace) + + createdModelRoute, err := testCtx.KthenaClient.NetworkingV1alpha1().ModelRoutes(testNamespace).Create(ctx, modelRoute, metav1.CreateOptions{}) + require.NoError(t, err, "Failed to create ModelRoute") + + t.Cleanup(func() { + cleanupCtx := context.Background() + if err := testCtx.KthenaClient.NetworkingV1alpha1().ModelRoutes(testNamespace).Delete(cleanupCtx, createdModelRoute.Name, metav1.DeleteOptions{}); err != nil { + t.Logf("Failed to delete ModelRoute: %v", err) + } + }) + + require.Eventually(t, func() bool { + mr, err := testCtx.KthenaClient.NetworkingV1alpha1().ModelRoutes(testNamespace).Get(ctx, createdModelRoute.Name, metav1.GetOptions{}) + return err == nil && mr != nil + }, 2*time.Minute, 2*time.Second, "ModelRoute should be created") + + resp := utils.CheckChatCompletions(t, createdModelRoute.Spec.ModelName, standardMessage) + tokensConsumed := resp.Attempts * tokensPerRequest + + remainingQuota := inputTokenLimit - tokensConsumed + expectedSuccessfulRequests := remainingQuota / tokensPerRequest + + for i := 0; i < expectedSuccessfulRequests; i++ { + resp := utils.SendChatRequest(t, createdModelRoute.Spec.ModelName, standardMessage) + resp.Body.Close() + } + + rateLimitedResp := utils.SendChatRequest(t, createdModelRoute.Spec.ModelName, standardMessage) + rateLimitedResp.Body.Close() + assert.Equal(t, http.StatusTooManyRequests, rateLimitedResp.StatusCode, "Rate limit should be active") + + t.Log("Waiting 10 seconds (within rate limit window)...") + time.Sleep(10 * time.Second) + + midWindowResp := utils.SendChatRequest(t, createdModelRoute.Spec.ModelName, standardMessage) + midWindowResp.Body.Close() + assert.Equal(t, http.StatusTooManyRequests, midWindowResp.StatusCode, "Rate limit should persist within window") + + t.Log("Waiting for rate limit window to reset...") + time.Sleep((rateLimitWindowSeconds * time.Second) + windowResetBuffer) + + postWindowResp := utils.SendChatRequest(t, createdModelRoute.Spec.ModelName, standardMessage) + postWindowResp.Body.Close() + assert.Equal(t, http.StatusOK, postWindowResp.StatusCode, "Request should succeed after window reset") + + t.Log("Rate limit persistence verified") + }) +}