Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions test/e2e/router/gateway-api/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,9 @@ func TestDuplicateModelName(t *testing.T) {

t.Log("Test completed successfully: same modelName routes to different models via different ports")
}

// TestModelRouteWithGlobalRateLimit tests global rate limiting enforced by the Kthena Router.
// This test runs the shared test function with Gateway API enabled (with ParentRefs).
func TestModelRouteWithGlobalRateLimit(t *testing.T) {
router.TestModelRouteWithGlobalRateLimitShared(t, testCtx, testNamespace, true, kthenaNamespace)
}
276 changes: 276 additions & 0 deletions test/e2e/router/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package router

import (
"context"
"fmt"
"io"
"net/http"
"strings"
Expand All @@ -31,6 +32,7 @@ import (
routercontext "github.com/volcano-sh/kthena/test/e2e/router/context"
"github.com/volcano-sh/kthena/test/e2e/utils"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
)
Expand Down Expand Up @@ -842,3 +844,277 @@ func TestModelRouteLoraShared(t *testing.T, testCtx *routercontext.RouterTestCon
utils.UnloadLoRAAdapter(t, "http://127.0.0.1:9000", "lora-B")
t.Log("LoRA adapters unloaded successfully")
}

const (
redisServerName = "redis-server"
redisServerPort = 6379
)

// deployRedisForTest deploys Redis for global rate limiting tests.
func deployRedisForTest(t *testing.T, ctx context.Context, testCtx *routercontext.RouterTestContext, namespace string) string {
t.Helper()

existingDeploy, err := testCtx.KubeClient.AppsV1().Deployments(namespace).Get(ctx, redisServerName, metav1.GetOptions{})
if err == nil && existingDeploy != nil {
return fmt.Sprintf("%s.%s.svc.cluster.local:%d", redisServerName, namespace, redisServerPort)
}

redisServices := utils.LoadMultiResourceYAMLFromFile[corev1.Service]("examples/redis/redis-standalone.yaml")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file includes multiple resources, not only corev1.Service

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m still working on this PR. However, I’d like to discuss the model server E2E tests. can we connect on slack sir,?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m still working on this PR. However, I’d like to discuss the model server E2E tests. can we connect on slack sir,?

ok

require.Len(t, redisServices, 1, "Redis YAML should contain 1 Service")
redisService := redisServices[0]
redisService.Namespace = namespace
_, err = testCtx.KubeClient.CoreV1().Services(namespace).Create(ctx, redisService, metav1.CreateOptions{})
require.NoError(t, err, "Failed to create Redis service")

redisDeployments := utils.LoadMultiResourceYAMLFromFile[appsv1.Deployment]("examples/redis/redis-standalone.yaml")
require.Len(t, redisDeployments, 1, "Redis YAML should contain 1 Deployment")
redisDeployment := redisDeployments[0]
redisDeployment.Namespace = namespace
_, err = testCtx.KubeClient.AppsV1().Deployments(namespace).Create(ctx, redisDeployment, metav1.CreateOptions{})
require.NoError(t, err, "Failed to create Redis deployment")

t.Cleanup(func() {
cleanupCtx := context.Background()
_ = testCtx.KubeClient.AppsV1().Deployments(namespace).Delete(cleanupCtx, redisServerName, metav1.DeleteOptions{})
_ = testCtx.KubeClient.CoreV1().Services(namespace).Delete(cleanupCtx, redisServerName, metav1.DeleteOptions{})
})

require.Eventually(t, func() bool {
deploy, err := testCtx.KubeClient.AppsV1().Deployments(namespace).Get(ctx, redisServerName, metav1.GetOptions{})
if err != nil {
return false
}
if deploy.Spec.Replicas == nil {
return deploy.Status.ReadyReplicas > 0
}
return deploy.Status.ReadyReplicas == *deploy.Spec.Replicas
}, 3*time.Minute, 5*time.Second, "Redis deployment not ready")

return fmt.Sprintf("%s.%s.svc.cluster.local:%d", redisServerName, namespace, redisServerPort)
}

// TestModelRouteWithGlobalRateLimitShared is a shared test function that can be used by both
// router and gateway-api test suites. When useGatewayAPI is true, it configures ModelRoute
// with ParentRefs to the default Gateway.
func TestModelRouteWithGlobalRateLimitShared(t *testing.T, testCtx *routercontext.RouterTestContext, testNamespace string, useGatewayApi bool, kthenaNamespace string) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For global rate limit, we need to scale router up from 1 to 3 replicas.

And make sure rate limit take effect even we access different router instance.

Also the infra of this test suite may be not suitable for this case. As port forward will only select one pod and then use it to forward. We need traffic load balance between different replicas of the router.

const (
rateLimitWindowSeconds = 60
windowResetBuffer = 10 * time.Second
inputTokenLimit = 10
tokensPerRequest = 10
)
ctx := context.Background()

standardMessage := []utils.ChatMessage{
utils.NewChatMessage("user", "hello world"),
}

// Test 1: Verify Redis connection and basic global rate limiting
t.Run("VerifyRedisConnectionAndBasicRateLimit", func(t *testing.T) {
t.Log("Test 1: Verifying Redis connection and basic global rate limiting")

redisAddress := deployRedisForTest(t, ctx, testCtx, testNamespace)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to redeploy redis in every subtest?


modelRoute := utils.LoadYAMLFromFile[networkingv1alpha1.ModelRoute]("examples/kthena-router/ModelRouteWithGlobalRateLimit.yaml")
modelRoute.Namespace = testNamespace
modelRoute.Spec.RateLimit.Global.Redis.Address = redisAddress
setupModelRouteWithGatewayAPI(modelRoute, useGatewayApi, kthenaNamespace)

createdModelRoute, err := testCtx.KthenaClient.NetworkingV1alpha1().ModelRoutes(testNamespace).Create(ctx, modelRoute, metav1.CreateOptions{})
require.NoError(t, err, "Failed to create ModelRoute")

t.Cleanup(func() {
cleanupCtx := context.Background()
if err := testCtx.KthenaClient.NetworkingV1alpha1().ModelRoutes(testNamespace).Delete(cleanupCtx, createdModelRoute.Name, metav1.DeleteOptions{}); err != nil {
t.Logf("Warning: Failed to delete ModelRoute: %v", err)
}
})

require.Eventually(t, func() bool {
mr, err := testCtx.KthenaClient.NetworkingV1alpha1().ModelRoutes(testNamespace).Get(ctx, createdModelRoute.Name, metav1.GetOptions{})
return err == nil && mr != nil
}, 2*time.Minute, 2*time.Second, "ModelRoute should be created")

resp := utils.CheckChatCompletions(t, createdModelRoute.Spec.ModelName, standardMessage)
tokensConsumed := resp.Attempts * tokensPerRequest
t.Logf("Router reconciliation complete (consumed %d tokens in %d attempts)", tokensConsumed, resp.Attempts)

remainingQuota := inputTokenLimit - tokensConsumed
expectedSuccessfulRequests := remainingQuota / tokensPerRequest

for i := 0; i < expectedSuccessfulRequests; i++ {
resp := utils.SendChatRequest(t, createdModelRoute.Spec.ModelName, standardMessage)
responseBody, readErr := io.ReadAll(resp.Body)
resp.Body.Close()

require.NoError(t, readErr, "Failed to read response body on request %d", i+1)
require.Equal(t, http.StatusOK, resp.StatusCode,
"Request %d should succeed. Response: %s", i+1, string(responseBody))
t.Logf("Request %d succeeded", i+1)
}

rateLimitedResp := utils.SendChatRequest(t, createdModelRoute.Spec.ModelName, standardMessage)
responseBody, readErr := io.ReadAll(rateLimitedResp.Body)
rateLimitedResp.Body.Close()

require.NoError(t, readErr, "Failed to read rate limit response body")
assert.Equal(t, http.StatusTooManyRequests, rateLimitedResp.StatusCode,
"Request should be rate limited after exhausting quota")
assert.Contains(t, strings.ToLower(string(responseBody)), "rate limit",
"Rate limit error response must contain descriptive message")

t.Logf("Global rate limit enforced after %d total requests", resp.Attempts+expectedSuccessfulRequests)
})

// Test 2: Verify global rate limit sharing across multiple ModelRoutes
t.Run("VerifyGlobalRateLimitSharingAcrossModelRoutes", func(t *testing.T) {
t.Log("Test 2: Verifying global rate limit sharing across multiple ModelRoutes")

redisAddress := deployRedisForTest(t, ctx, testCtx, testNamespace)

modelRoute1 := utils.LoadYAMLFromFile[networkingv1alpha1.ModelRoute]("examples/kthena-router/ModelRouteWithGlobalRateLimit.yaml")
modelRoute1.Name = "global-rate-limit-route-1"
modelRoute1.Namespace = testNamespace
modelRoute1.Spec.RateLimit.Global.Redis.Address = redisAddress
setupModelRouteWithGatewayAPI(modelRoute1, useGatewayApi, kthenaNamespace)

modelRoute2 := utils.LoadYAMLFromFile[networkingv1alpha1.ModelRoute]("examples/kthena-router/ModelRouteWithGlobalRateLimit.yaml")
modelRoute2.Name = "global-rate-limit-route-2"
modelRoute2.Namespace = testNamespace
modelRoute2.Spec.RateLimit.Global.Redis.Address = redisAddress
setupModelRouteWithGatewayAPI(modelRoute2, useGatewayApi, kthenaNamespace)

createdRoute1, err := testCtx.KthenaClient.NetworkingV1alpha1().ModelRoutes(testNamespace).Create(ctx, modelRoute1, metav1.CreateOptions{})
require.NoError(t, err, "Failed to create first ModelRoute")

createdRoute2, err := testCtx.KthenaClient.NetworkingV1alpha1().ModelRoutes(testNamespace).Create(ctx, modelRoute2, metav1.CreateOptions{})
require.NoError(t, err, "Failed to create second ModelRoute")

t.Cleanup(func() {
cleanupCtx := context.Background()
if err := testCtx.KthenaClient.NetworkingV1alpha1().ModelRoutes(testNamespace).Delete(cleanupCtx, createdRoute1.Name, metav1.DeleteOptions{}); err != nil {
t.Logf("Warning: Failed to delete ModelRoute: %v", err)
}
if err := testCtx.KthenaClient.NetworkingV1alpha1().ModelRoutes(testNamespace).Delete(cleanupCtx, createdRoute2.Name, metav1.DeleteOptions{}); err != nil {
t.Logf("Warning: Failed to delete ModelRoute: %v", err)
}
})

require.Eventually(t, func() bool {
mr1, err1 := testCtx.KthenaClient.NetworkingV1alpha1().ModelRoutes(testNamespace).Get(ctx, createdRoute1.Name, metav1.GetOptions{})
mr2, err2 := testCtx.KthenaClient.NetworkingV1alpha1().ModelRoutes(testNamespace).Get(ctx, createdRoute2.Name, metav1.GetOptions{})
return err1 == nil && mr1 != nil && err2 == nil && mr2 != nil
}, 2*time.Minute, 2*time.Second, "Both ModelRoutes should be created")

resp := utils.CheckChatCompletions(t, createdRoute1.Spec.ModelName, standardMessage)
tokensConsumed := resp.Attempts * tokensPerRequest
t.Logf("Consumed %d tokens through route 1", tokensConsumed)

remainingQuota := inputTokenLimit - tokensConsumed
expectedSuccessfulRequests := remainingQuota / tokensPerRequest

for i := 0; i < expectedSuccessfulRequests; i++ {
resp := utils.SendChatRequest(t, createdRoute1.Spec.ModelName, standardMessage)
resp.Body.Close()
assert.Equal(t, http.StatusOK, resp.StatusCode, "Request %d through route 2 should succeed", i+1)
}

rateLimitedResp := utils.SendChatRequest(t, createdRoute1.Spec.ModelName, standardMessage)
rateLimitedResp.Body.Close()
assert.Equal(t, http.StatusTooManyRequests, rateLimitedResp.StatusCode,
"Request should be rate limited (shared bucket exhausted)")

t.Log("Global rate limit sharing across ModelRoutes verified")
})

// Test 3: Verify behavior when Redis is unavailable
t.Run("VerifyFallbackWhenRedisUnavailable", func(t *testing.T) {
t.Log("Test 3: Verifying behavior when Redis is unavailable")

modelRoute := utils.LoadYAMLFromFile[networkingv1alpha1.ModelRoute]("examples/kthena-router/ModelRouteWithGlobalRateLimit.yaml")
modelRoute.Name = "global-rate-limit-invalid-redis"
modelRoute.Namespace = testNamespace
modelRoute.Spec.RateLimit.Global.Redis.Address = "invalid-redis-server.invalid-namespace.svc.cluster.local:6379"
setupModelRouteWithGatewayAPI(modelRoute, useGatewayApi, kthenaNamespace)

createdModelRoute, err := testCtx.KthenaClient.NetworkingV1alpha1().ModelRoutes(testNamespace).Create(ctx, modelRoute, metav1.CreateOptions{})
require.NoError(t, err, "Failed to create ModelRoute with invalid Redis")

t.Cleanup(func() {
cleanupCtx := context.Background()
if err := testCtx.KthenaClient.NetworkingV1alpha1().ModelRoutes(testNamespace).Delete(cleanupCtx, createdModelRoute.Name, metav1.DeleteOptions{}); err != nil {
t.Logf("Warning: Failed to delete ModelRoute: %v", err)
}
})

time.Sleep(5 * time.Second)

resp := utils.SendChatRequest(t, createdModelRoute.Spec.ModelName, standardMessage)
responseBody, _ := io.ReadAll(resp.Body)
resp.Body.Close()

assert.NotEqual(t, http.StatusOK, resp.StatusCode, "Request should fail when Redis is unavailable")
t.Logf("Response status: %d, body: %s", resp.StatusCode, string(responseBody))

t.Log("Behavior when Redis is unavailable verified")
})

// Test 4: Verify rate limit persistence across time windows
t.Run("VerifyRateLimitPersistence", func(t *testing.T) {
t.Log("Test 4: Verifying rate limit persistence across time windows")

redisAddress := deployRedisForTest(t, ctx, testCtx, testNamespace)

modelRoute := utils.LoadYAMLFromFile[networkingv1alpha1.ModelRoute]("examples/kthena-router/ModelRouteWithGlobalRateLimit.yaml")
modelRoute.Name = "global-rate-limit-persistence"
modelRoute.Namespace = testNamespace
modelRoute.Spec.RateLimit.Global.Redis.Address = redisAddress
setupModelRouteWithGatewayAPI(modelRoute, useGatewayApi, kthenaNamespace)

createdModelRoute, err := testCtx.KthenaClient.NetworkingV1alpha1().ModelRoutes(testNamespace).Create(ctx, modelRoute, metav1.CreateOptions{})
require.NoError(t, err, "Failed to create ModelRoute")

t.Cleanup(func() {
cleanupCtx := context.Background()
if err := testCtx.KthenaClient.NetworkingV1alpha1().ModelRoutes(testNamespace).Delete(cleanupCtx, createdModelRoute.Name, metav1.DeleteOptions{}); err != nil {
t.Logf("Failed to delete ModelRoute: %v", err)
}
})

require.Eventually(t, func() bool {
mr, err := testCtx.KthenaClient.NetworkingV1alpha1().ModelRoutes(testNamespace).Get(ctx, createdModelRoute.Name, metav1.GetOptions{})
return err == nil && mr != nil
}, 2*time.Minute, 2*time.Second, "ModelRoute should be created")

resp := utils.CheckChatCompletions(t, createdModelRoute.Spec.ModelName, standardMessage)
tokensConsumed := resp.Attempts * tokensPerRequest

remainingQuota := inputTokenLimit - tokensConsumed
expectedSuccessfulRequests := remainingQuota / tokensPerRequest

for i := 0; i < expectedSuccessfulRequests; i++ {
resp := utils.SendChatRequest(t, createdModelRoute.Spec.ModelName, standardMessage)
resp.Body.Close()
}

rateLimitedResp := utils.SendChatRequest(t, createdModelRoute.Spec.ModelName, standardMessage)
rateLimitedResp.Body.Close()
assert.Equal(t, http.StatusTooManyRequests, rateLimitedResp.StatusCode, "Rate limit should be active")

t.Log("Waiting 10 seconds (within rate limit window)...")
time.Sleep(10 * time.Second)

midWindowResp := utils.SendChatRequest(t, createdModelRoute.Spec.ModelName, standardMessage)
midWindowResp.Body.Close()
assert.Equal(t, http.StatusTooManyRequests, midWindowResp.StatusCode, "Rate limit should persist within window")

t.Log("Waiting for rate limit window to reset...")
time.Sleep((rateLimitWindowSeconds * time.Second) + windowResetBuffer)

postWindowResp := utils.SendChatRequest(t, createdModelRoute.Spec.ModelName, standardMessage)
postWindowResp.Body.Close()
assert.Equal(t, http.StatusOK, postWindowResp.StatusCode, "Request should succeed after window reset")

t.Log("Rate limit persistence verified")
})
}
Loading