From 9a76a091d4ebbf887eb162bde997311b9e1332c5 Mon Sep 17 00:00:00 2001 From: Maya Barnea Date: Sun, 2 Nov 2025 12:36:16 +0200 Subject: [PATCH 1/2] add metrics tests for latency metrics with remote prefill Signed-off-by: Maya Barnea --- pkg/llm-d-inference-sim/metrics_test.go | 38 ++++++---- pkg/llm-d-inference-sim/test_utils.go | 93 +++++++++++++++---------- 2 files changed, 81 insertions(+), 50 deletions(-) diff --git a/pkg/llm-d-inference-sim/metrics_test.go b/pkg/llm-d-inference-sim/metrics_test.go index e64b7323..217c4215 100644 --- a/pkg/llm-d-inference-sim/metrics_test.go +++ b/pkg/llm-d-inference-sim/metrics_test.go @@ -774,23 +774,33 @@ var _ = Describe("Simulator metrics", Ordered, func() { numOfTokens := len(common.Tokenize(testUserMessage)) DescribeTable("should calculate all latency related metrics correctly for a single request", - func(testNamePrefix string, ttft int, prefillTimePerToken int, interTokenLatency int) { + func(testNamePrefix string, ttft int, prefillTimePerToken int, interTokenLatency int, + kvcacheTransferLatency int, kvCacheTransferTimePerToken int, doRemotePrefill bool) { // send a single request with a prompt of 4 tokens and echo mode, so output tokens number of 4 too - client := startServerAndSendRequest(testModel, testUserMessage, false, ttft, prefillTimePerToken, interTokenLatency) - checkLatencyMertics(client, testModel, numOfTokens, numOfTokens, ttft, prefillTimePerToken, interTokenLatency) - - // same in streaming modeq - client = startServerAndSendRequest(testModel, testUserMessage, true, ttft, prefillTimePerToken, interTokenLatency) - checkLatencyMertics(client, testModel, numOfTokens, numOfTokens, ttft, prefillTimePerToken, interTokenLatency) + client := startServerForLatencyTest(testModel, ttft, prefillTimePerToken, interTokenLatency, kvcacheTransferLatency, kvCacheTransferTimePerToken) + sendCompletionRequestForLatencyTest(client, testModel, testUserMessage, false, doRemotePrefill) + checkLatencyMertics(client, testModel, numOfTokens, numOfTokens, ttft, prefillTimePerToken, interTokenLatency, kvcacheTransferLatency, + kvCacheTransferTimePerToken, doRemotePrefill) + + // restart the server and run same test in streaming mode + client = startServerForLatencyTest(testModel, ttft, prefillTimePerToken, interTokenLatency, kvcacheTransferLatency, kvCacheTransferTimePerToken) + sendCompletionRequestForLatencyTest(client, testModel, testUserMessage, false, doRemotePrefill) + checkLatencyMertics(client, testModel, numOfTokens, numOfTokens, ttft, prefillTimePerToken, interTokenLatency, + kvcacheTransferLatency, kvCacheTransferTimePerToken, doRemotePrefill) }, - func(testNamePrefix string, ttft int, prefillTimePerToken int, interTokenLatency int) string { - return fmt.Sprintf("%s\nttft: %d, prefillTimePerToken: %d, interTokenLatency: %d", testNamePrefix, ttft, prefillTimePerToken, interTokenLatency) + func(testNamePrefix string, ttft int, prefillTimePerToken int, interTokenLatency int, + kvcacheTransferLatency int, kvCacheTransferTimePerToken int, doRemotePrefill bool) string { + return fmt.Sprintf("%s\nttft: %d, prefillTimePerToken: %d, interTokenLatency: %d, kvcacheTransferLatency: %d, kvCacheTransferTimePerToken: %d, doRemotePrefill: %t", + testNamePrefix, ttft, prefillTimePerToken, interTokenLatency, kvcacheTransferLatency, kvCacheTransferTimePerToken, doRemotePrefill) }, - // Params order: testName, ttft, prefillTimePerToken, interTokenLatency - Entry(nil, "constant prefill + inter token time", 0, 0, 100), - Entry(nil, "constant prefill + inter token time", 900, 0, 100), - Entry(nil, "constant prefill + inter token time", 1000, 0, 100), - Entry(nil, "prefill per token + inter token time", 0, 100, 100), + // Params order: testName, ttft, prefillTimePerToken, interTokenLatency, kvcacheTransferLatency, kvCacheTransferTimePerToken, doRemotePrefill) + Entry(nil, "constant prefill + inter token time", 0, 0, 100, 0, 0, false), + Entry(nil, "constant prefill + inter token time", 900, 0, 100, 0, 0, false), + Entry(nil, "constant prefill + inter token time", 1000, 0, 100, 0, 0, false), + Entry(nil, "prefill per token + inter token time", 0, 100, 100, 0, 0, false), + Entry(nil, "remote prefill constant time", 0, 0, 0, 1000, 0, true), + Entry(nil, "remote prefill constant time with non-remote times", 5000, 5000, 0, 1000, 0, true), + Entry(nil, "remote prefill time per transfered token", 0, 0, 0, 0, 100, true), ) }) diff --git a/pkg/llm-d-inference-sim/test_utils.go b/pkg/llm-d-inference-sim/test_utils.go index fbd30bf9..56debbeb 100644 --- a/pkg/llm-d-inference-sim/test_utils.go +++ b/pkg/llm-d-inference-sim/test_utils.go @@ -16,6 +16,7 @@ limitations under the License. package llmdinferencesim import ( + "bufio" "context" "crypto/tls" "errors" @@ -136,13 +137,12 @@ func startServerWithArgsAndEnv(ctx context.Context, mode string, args []string, }, nil } -// startServerAndSendRequest - starts server configured according the given latency parameters in echo mode, -// sends a single request with the given prompt -func startServerAndSendRequest(modelName string, prompt string, isStreaming bool, ttft int, prefillTimePerToken int, interTokenLatency int) *http.Client { +// startServerForLatencyTest - starts server configured according the given latency parameters in echo mode, +func startServerForLatencyTest(modelName string, ttft int, prefillTimePerToken int, interTokenLatency int, kvcacheTransferLatency int, kvCacheTransferTimePerToken int) *http.Client { ctx := context.TODO() - args := []string{"cmd", "--model", modelName, "--mode", common.ModeEcho, - // "--kv-cache-transfer-latency", strconv.Itoa(kvcacheTransferLatency), - // "--kv-cache-transfer-time-per-token", strconv.Itoa(kvCacheTransferTimePerToken), + args := []string{"cmd", "--model", modelName, "--mode", common.ModeEcho, "--v=4", + "--kv-cache-transfer-latency", strconv.Itoa(kvcacheTransferLatency), + "--kv-cache-transfer-time-per-token", strconv.Itoa(kvCacheTransferTimePerToken), "--time-to-first-token", strconv.Itoa(ttft), "--prefill-time-per-token", strconv.Itoa(prefillTimePerToken), "--inter-token-latency", strconv.Itoa(interTokenLatency), @@ -151,27 +151,38 @@ func startServerAndSendRequest(modelName string, prompt string, isStreaming bool client, err := startServerWithArgs(ctx, args) gomega.Expect(err).NotTo(gomega.HaveOccurred()) - openaitextclient, params := getOpenAIClientAndTextParams(client, modelName, prompt, isStreaming) + return client +} - if isStreaming { - // send a single request in a serial way - stream := openaitextclient.Completions.NewStreaming(ctx, params) - chunksCnt := 0 +// sendCompletionRequestForLatencyTest sends completion request according the given parameters +// uses http.Post and not openai-api function because vllm specific fields should be sent +func sendCompletionRequestForLatencyTest(client *http.Client, modelName string, prompt string, isStreaming bool, doRemotePrefill bool) { + // send completions request using http post because disagregated PD fields should be included + // Test with raw HTTP to verify the error response format + reqBody := fmt.Sprintf(`{ + "prompt": "%s", + "model": "%s", + "stream": %t, + "do_remote_prefill": %t + }`, prompt, modelName, isStreaming, doRemotePrefill) + + resp, err := client.Post("http://localhost/v1/completions", "application/json", strings.NewReader(reqBody)) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + defer func() { + err := resp.Body.Close() + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + }() - for stream.Next() { - chunksCnt++ - } - if err := stream.Err(); err != nil { + if isStreaming { + reader := bufio.NewReader(resp.Body) + for { + _, err := reader.ReadString('\n') + if err == io.EOF { + break + } gomega.Expect(err).NotTo(gomega.HaveOccurred()) } - // number of chunks is number of tokens + 2 (one chunk with usage info and one closing chunk) - gomega.Expect(chunksCnt).To(gomega.BeNumerically("==", len(common.Tokenize(prompt))+2)) - } else { - _, err = openaitextclient.Completions.New(ctx, params) - gomega.Expect(err).NotTo(gomega.HaveOccurred()) } - - return client } // sendSimpleChatRequest starts server using the given environment variables and sends one chat completions request @@ -212,6 +223,7 @@ func getOpenAIClientAndChatParams(client option.HTTPClient, model string, messag return openaiclient, params } +// nolint // getOpenAIClientAndTextParams - creates an openai client and params for /completions call based on the given parameters func getOpenAIClientAndTextParams(client option.HTTPClient, model string, message string, streaming bool) (openai.Client, openai.CompletionNewParams) { openaiclient := openai.NewClient( @@ -445,7 +457,8 @@ func checkBucketBoundary(metrics string, modelName string, metricName string, bu // ttft time to first token parameter // prefillTimePerToken prefill time per input tokens // interTokenLatency processing time per output token -func checkLatencyMertics(client *http.Client, modelName string, numOfInputTokens int, numOfOutputTokens int, ttft int, prefillTimePerToken int, interTokenLatency int) { +func checkLatencyMertics(client *http.Client, modelName string, numOfInputTokens int, numOfOutputTokens int, ttft int, + prefillTimePerToken int, interTokenLatency int, kvcacheTransferLatency int, kvCacheTransferTimePerToken int, doRemotePrefill bool) { // wait a little bit and check metrics time.Sleep(300 * time.Millisecond) metricsResp, err := client.Get(metricsUrl) @@ -456,30 +469,38 @@ func checkLatencyMertics(client *http.Client, modelName string, numOfInputTokens gomega.Expect(err).NotTo(gomega.HaveOccurred()) metrics := string(data) - var expectedPrefillTime float64 - // TODO take into consideration remote prefill - if ttft > 0 { - // time-to-first-token overwrites calculation of prefill time based on number of input tokens - expectedPrefillTime = float64(ttft) / 1000 - + expectedPrefillTimeInSecs := 0.0 + if doRemotePrefill { + // when doRemotePrefill is true, this means that this is decode request and prefill was executed on remote vllm, + if kvcacheTransferLatency != 0 { + expectedPrefillTimeInSecs = float64(kvcacheTransferLatency) / 1000 + } else { + expectedPrefillTimeInSecs = float64(kvCacheTransferTimePerToken*numOfOutputTokens) / 1000 + } } else { - expectedPrefillTime = float64(numOfInputTokens*prefillTimePerToken) / 1000 + if ttft > 0 { + // time-to-first-token overwrites calculation of prefill time based on number of input tokens + expectedPrefillTimeInSecs = float64(ttft) / 1000 + + } else { + expectedPrefillTimeInSecs = float64(numOfInputTokens*prefillTimePerToken) / 1000 + } } - expectedDecodeTime := float64(interTokenLatency*(numOfOutputTokens-1)) / 1000 - expectedE2ELatency := expectedPrefillTime + expectedDecodeTime + expectedDecodeTimeInSecs := float64(interTokenLatency*(numOfOutputTokens-1)) / 1000 + expectedE2ELatency := expectedPrefillTimeInSecs + expectedDecodeTimeInSecs prevBoundary := math.Inf(-1) for _, bucketBoudary := range common.RequestLatencyBucketsBoundaries { - checkBucketBoundary(metrics, modelName, prefillTimeMetricName, bucketBoudary, prevBoundary, expectedPrefillTime) - checkBucketBoundary(metrics, modelName, decodeTimeMetricName, bucketBoudary, prevBoundary, expectedDecodeTime) + checkBucketBoundary(metrics, modelName, prefillTimeMetricName, bucketBoudary, prevBoundary, expectedPrefillTimeInSecs) + checkBucketBoundary(metrics, modelName, decodeTimeMetricName, bucketBoudary, prevBoundary, expectedDecodeTimeInSecs) checkBucketBoundary(metrics, modelName, e2eReqLatencyMetricName, bucketBoudary, prevBoundary, expectedE2ELatency) prevBoundary = bucketBoudary } // check the last bucket lastBoundary := common.RequestLatencyBucketsBoundaries[len(common.RequestLatencyBucketsBoundaries)-1] - checkBucketBoundary(metrics, modelName, prefillTimeMetricName, math.Inf(1), lastBoundary, expectedPrefillTime) - checkBucketBoundary(metrics, modelName, decodeTimeMetricName, math.Inf(1), lastBoundary, expectedDecodeTime) + checkBucketBoundary(metrics, modelName, prefillTimeMetricName, math.Inf(1), lastBoundary, expectedPrefillTimeInSecs) + checkBucketBoundary(metrics, modelName, decodeTimeMetricName, math.Inf(1), lastBoundary, expectedDecodeTimeInSecs) checkBucketBoundary(metrics, modelName, e2eReqLatencyMetricName, math.Inf(1), lastBoundary, expectedE2ELatency) } From 34be14dc173108cd81f71446835d901b0f671d38 Mon Sep 17 00:00:00 2001 From: Maya Barnea Date: Sun, 2 Nov 2025 15:00:06 +0200 Subject: [PATCH 2/2] fix pr comments Signed-off-by: Maya Barnea --- pkg/llm-d-inference-sim/metrics_test.go | 16 +++++----------- pkg/llm-d-inference-sim/test_utils.go | 21 +++++++++++++++------ 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/pkg/llm-d-inference-sim/metrics_test.go b/pkg/llm-d-inference-sim/metrics_test.go index 217c4215..bafc3a8a 100644 --- a/pkg/llm-d-inference-sim/metrics_test.go +++ b/pkg/llm-d-inference-sim/metrics_test.go @@ -777,16 +777,10 @@ var _ = Describe("Simulator metrics", Ordered, func() { func(testNamePrefix string, ttft int, prefillTimePerToken int, interTokenLatency int, kvcacheTransferLatency int, kvCacheTransferTimePerToken int, doRemotePrefill bool) { // send a single request with a prompt of 4 tokens and echo mode, so output tokens number of 4 too - client := startServerForLatencyTest(testModel, ttft, prefillTimePerToken, interTokenLatency, kvcacheTransferLatency, kvCacheTransferTimePerToken) - sendCompletionRequestForLatencyTest(client, testModel, testUserMessage, false, doRemotePrefill) - checkLatencyMertics(client, testModel, numOfTokens, numOfTokens, ttft, prefillTimePerToken, interTokenLatency, kvcacheTransferLatency, - kvCacheTransferTimePerToken, doRemotePrefill) - - // restart the server and run same test in streaming mode - client = startServerForLatencyTest(testModel, ttft, prefillTimePerToken, interTokenLatency, kvcacheTransferLatency, kvCacheTransferTimePerToken) - sendCompletionRequestForLatencyTest(client, testModel, testUserMessage, false, doRemotePrefill) - checkLatencyMertics(client, testModel, numOfTokens, numOfTokens, ttft, prefillTimePerToken, interTokenLatency, - kvcacheTransferLatency, kvCacheTransferTimePerToken, doRemotePrefill) + singleRequestLatencyTest(ttft, prefillTimePerToken, interTokenLatency, kvcacheTransferLatency, + kvCacheTransferTimePerToken, false, numOfTokens, doRemotePrefill) + singleRequestLatencyTest(ttft, prefillTimePerToken, interTokenLatency, kvcacheTransferLatency, + kvCacheTransferTimePerToken, true, numOfTokens, doRemotePrefill) }, func(testNamePrefix string, ttft int, prefillTimePerToken int, interTokenLatency int, kvcacheTransferLatency int, kvCacheTransferTimePerToken int, doRemotePrefill bool) string { @@ -800,7 +794,7 @@ var _ = Describe("Simulator metrics", Ordered, func() { Entry(nil, "prefill per token + inter token time", 0, 100, 100, 0, 0, false), Entry(nil, "remote prefill constant time", 0, 0, 0, 1000, 0, true), Entry(nil, "remote prefill constant time with non-remote times", 5000, 5000, 0, 1000, 0, true), - Entry(nil, "remote prefill time per transfered token", 0, 0, 0, 0, 100, true), + Entry(nil, "remote prefill time per transferfed token", 0, 0, 0, 0, 100, true), ) }) diff --git a/pkg/llm-d-inference-sim/test_utils.go b/pkg/llm-d-inference-sim/test_utils.go index 56debbeb..57ce4222 100644 --- a/pkg/llm-d-inference-sim/test_utils.go +++ b/pkg/llm-d-inference-sim/test_utils.go @@ -137,10 +137,10 @@ func startServerWithArgsAndEnv(ctx context.Context, mode string, args []string, }, nil } -// startServerForLatencyTest - starts server configured according the given latency parameters in echo mode, +// startServerForLatencyTest - starts server configured according the given latency parameters in echo modes func startServerForLatencyTest(modelName string, ttft int, prefillTimePerToken int, interTokenLatency int, kvcacheTransferLatency int, kvCacheTransferTimePerToken int) *http.Client { ctx := context.TODO() - args := []string{"cmd", "--model", modelName, "--mode", common.ModeEcho, "--v=4", + args := []string{"cmd", "--model", modelName, "--mode", common.ModeEcho, "--kv-cache-transfer-latency", strconv.Itoa(kvcacheTransferLatency), "--kv-cache-transfer-time-per-token", strconv.Itoa(kvCacheTransferTimePerToken), "--time-to-first-token", strconv.Itoa(ttft), @@ -154,6 +154,15 @@ func startServerForLatencyTest(modelName string, ttft int, prefillTimePerToken i return client } +func singleRequestLatencyTest(ttft int, prefillTimePerToken int, interTokenLatency int, kvcacheTransferLatency int, + kvCacheTransferTimePerToken int, isStreaming bool, numOfTokens int, doRemotePrefill bool) { + client := startServerForLatencyTest(testModel, ttft, prefillTimePerToken, interTokenLatency, kvcacheTransferLatency, kvCacheTransferTimePerToken) + sendCompletionRequestForLatencyTest(client, testModel, testUserMessage, isStreaming, doRemotePrefill) + checkLatencyMetrics(client, testModel, numOfTokens, numOfTokens, ttft, prefillTimePerToken, interTokenLatency, kvcacheTransferLatency, + kvCacheTransferTimePerToken, doRemotePrefill) + +} + // sendCompletionRequestForLatencyTest sends completion request according the given parameters // uses http.Post and not openai-api function because vllm specific fields should be sent func sendCompletionRequestForLatencyTest(client *http.Client, modelName string, prompt string, isStreaming bool, doRemotePrefill bool) { @@ -450,14 +459,14 @@ func checkBucketBoundary(metrics string, modelName string, metricName string, bu gomega.Expect(metrics).To(gomega.ContainSubstring(getFloatBucketMetricLine(modelName, metricName, bucketBoudary, expectedCount))) } -// checkLatencyMertics sends /metrics request and checks that latency related values are valid +// checkLatencyMetrics sends /metrics request and checks that latency related values are valid // client the http client to be used for request send // modelName the model name // numOfOutputTokens number of tokens in the output of the completion request we want to validate // ttft time to first token parameter // prefillTimePerToken prefill time per input tokens // interTokenLatency processing time per output token -func checkLatencyMertics(client *http.Client, modelName string, numOfInputTokens int, numOfOutputTokens int, ttft int, +func checkLatencyMetrics(client *http.Client, modelName string, numOfInputTokens int, numOfOutputTokens int, ttft int, prefillTimePerToken int, interTokenLatency int, kvcacheTransferLatency int, kvCacheTransferTimePerToken int, doRemotePrefill bool) { // wait a little bit and check metrics time.Sleep(300 * time.Millisecond) @@ -471,11 +480,11 @@ func checkLatencyMertics(client *http.Client, modelName string, numOfInputTokens expectedPrefillTimeInSecs := 0.0 if doRemotePrefill { - // when doRemotePrefill is true, this means that this is decode request and prefill was executed on remote vllm, + // when doRemotePrefill is true, this means that this is decode request and prefill was executed on remote vllm if kvcacheTransferLatency != 0 { expectedPrefillTimeInSecs = float64(kvcacheTransferLatency) / 1000 } else { - expectedPrefillTimeInSecs = float64(kvCacheTransferTimePerToken*numOfOutputTokens) / 1000 + expectedPrefillTimeInSecs = float64(kvCacheTransferTimePerToken*numOfInputTokens) / 1000 } } else { if ttft > 0 {