Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 24 additions & 14 deletions pkg/llm-d-inference-sim/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,23 +774,33 @@
numOfTokens := len(common.Tokenize(testUserMessage))

DescribeTable("should calculate all latency related metrics correctly for a single request",
func(testNamePrefix string, ttft int, prefillTimePerToken int, interTokenLatency int) {
func(testNamePrefix string, ttft int, prefillTimePerToken int, interTokenLatency int,
kvcacheTransferLatency int, kvCacheTransferTimePerToken int, doRemotePrefill bool) {
// send a single request with a prompt of 4 tokens and echo mode, so output tokens number of 4 too
client := startServerAndSendRequest(testModel, testUserMessage, false, ttft, prefillTimePerToken, interTokenLatency)
checkLatencyMertics(client, testModel, numOfTokens, numOfTokens, ttft, prefillTimePerToken, interTokenLatency)

// same in streaming modeq
client = startServerAndSendRequest(testModel, testUserMessage, true, ttft, prefillTimePerToken, interTokenLatency)
checkLatencyMertics(client, testModel, numOfTokens, numOfTokens, ttft, prefillTimePerToken, interTokenLatency)
client := startServerForLatencyTest(testModel, ttft, prefillTimePerToken, interTokenLatency, kvcacheTransferLatency, kvCacheTransferTimePerToken)
sendCompletionRequestForLatencyTest(client, testModel, testUserMessage, false, doRemotePrefill)
checkLatencyMertics(client, testModel, numOfTokens, numOfTokens, ttft, prefillTimePerToken, interTokenLatency, kvcacheTransferLatency,
kvCacheTransferTimePerToken, doRemotePrefill)

// restart the server and run same test in streaming mode
client = startServerForLatencyTest(testModel, ttft, prefillTimePerToken, interTokenLatency, kvcacheTransferLatency, kvCacheTransferTimePerToken)
sendCompletionRequestForLatencyTest(client, testModel, testUserMessage, false, doRemotePrefill)
checkLatencyMertics(client, testModel, numOfTokens, numOfTokens, ttft, prefillTimePerToken, interTokenLatency,
kvcacheTransferLatency, kvCacheTransferTimePerToken, doRemotePrefill)
},
func(testNamePrefix string, ttft int, prefillTimePerToken int, interTokenLatency int) string {
return fmt.Sprintf("%s\nttft: %d, prefillTimePerToken: %d, interTokenLatency: %d", testNamePrefix, ttft, prefillTimePerToken, interTokenLatency)
func(testNamePrefix string, ttft int, prefillTimePerToken int, interTokenLatency int,
kvcacheTransferLatency int, kvCacheTransferTimePerToken int, doRemotePrefill bool) string {
return fmt.Sprintf("%s\nttft: %d, prefillTimePerToken: %d, interTokenLatency: %d, kvcacheTransferLatency: %d, kvCacheTransferTimePerToken: %d, doRemotePrefill: %t",
testNamePrefix, ttft, prefillTimePerToken, interTokenLatency, kvcacheTransferLatency, kvCacheTransferTimePerToken, doRemotePrefill)
},
// Params order: testName, ttft, prefillTimePerToken, interTokenLatency
Entry(nil, "constant prefill + inter token time", 0, 0, 100),
Entry(nil, "constant prefill + inter token time", 900, 0, 100),
Entry(nil, "constant prefill + inter token time", 1000, 0, 100),
Entry(nil, "prefill per token + inter token time", 0, 100, 100),
// Params order: testName, ttft, prefillTimePerToken, interTokenLatency, kvcacheTransferLatency, kvCacheTransferTimePerToken, doRemotePrefill)
Entry(nil, "constant prefill + inter token time", 0, 0, 100, 0, 0, false),
Entry(nil, "constant prefill + inter token time", 900, 0, 100, 0, 0, false),
Entry(nil, "constant prefill + inter token time", 1000, 0, 100, 0, 0, false),
Entry(nil, "prefill per token + inter token time", 0, 100, 100, 0, 0, false),
Entry(nil, "remote prefill constant time", 0, 0, 0, 1000, 0, true),
Entry(nil, "remote prefill constant time with non-remote times", 5000, 5000, 0, 1000, 0, true),
Entry(nil, "remote prefill time per transfered token", 0, 0, 0, 0, 100, true),

Check failure on line 803 in pkg/llm-d-inference-sim/metrics_test.go

View workflow job for this annotation

GitHub Actions / lint-and-test

`transfered` is a misspelling of `transferred` (misspell)
)
})

Expand Down
93 changes: 57 additions & 36 deletions pkg/llm-d-inference-sim/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ limitations under the License.
package llmdinferencesim

import (
"bufio"
"context"
"crypto/tls"
"errors"
Expand Down Expand Up @@ -136,13 +137,12 @@ func startServerWithArgsAndEnv(ctx context.Context, mode string, args []string,
}, nil
}

// startServerAndSendRequest - starts server configured according the given latency parameters in echo mode,
// sends a single request with the given prompt
func startServerAndSendRequest(modelName string, prompt string, isStreaming bool, ttft int, prefillTimePerToken int, interTokenLatency int) *http.Client {
// startServerForLatencyTest - starts server configured according the given latency parameters in echo mode,
func startServerForLatencyTest(modelName string, ttft int, prefillTimePerToken int, interTokenLatency int, kvcacheTransferLatency int, kvCacheTransferTimePerToken int) *http.Client {
ctx := context.TODO()
args := []string{"cmd", "--model", modelName, "--mode", common.ModeEcho,
// "--kv-cache-transfer-latency", strconv.Itoa(kvcacheTransferLatency),
// "--kv-cache-transfer-time-per-token", strconv.Itoa(kvCacheTransferTimePerToken),
args := []string{"cmd", "--model", modelName, "--mode", common.ModeEcho, "--v=4",
"--kv-cache-transfer-latency", strconv.Itoa(kvcacheTransferLatency),
"--kv-cache-transfer-time-per-token", strconv.Itoa(kvCacheTransferTimePerToken),
"--time-to-first-token", strconv.Itoa(ttft),
"--prefill-time-per-token", strconv.Itoa(prefillTimePerToken),
"--inter-token-latency", strconv.Itoa(interTokenLatency),
Expand All @@ -151,27 +151,38 @@ func startServerAndSendRequest(modelName string, prompt string, isStreaming bool
client, err := startServerWithArgs(ctx, args)
gomega.Expect(err).NotTo(gomega.HaveOccurred())

openaitextclient, params := getOpenAIClientAndTextParams(client, modelName, prompt, isStreaming)
return client
}

if isStreaming {
// send a single request in a serial way
stream := openaitextclient.Completions.NewStreaming(ctx, params)
chunksCnt := 0
// sendCompletionRequestForLatencyTest sends completion request according the given parameters
// uses http.Post and not openai-api function because vllm specific fields should be sent
func sendCompletionRequestForLatencyTest(client *http.Client, modelName string, prompt string, isStreaming bool, doRemotePrefill bool) {
// send completions request using http post because disagregated PD fields should be included
// Test with raw HTTP to verify the error response format
reqBody := fmt.Sprintf(`{
"prompt": "%s",
"model": "%s",
"stream": %t,
"do_remote_prefill": %t
}`, prompt, modelName, isStreaming, doRemotePrefill)

resp, err := client.Post("http://localhost/v1/completions", "application/json", strings.NewReader(reqBody))
gomega.Expect(err).NotTo(gomega.HaveOccurred())
defer func() {
err := resp.Body.Close()
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}()

for stream.Next() {
chunksCnt++
}
if err := stream.Err(); err != nil {
if isStreaming {
reader := bufio.NewReader(resp.Body)
for {
_, err := reader.ReadString('\n')
if err == io.EOF {
break
}
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}
// number of chunks is number of tokens + 2 (one chunk with usage info and one closing chunk)
gomega.Expect(chunksCnt).To(gomega.BeNumerically("==", len(common.Tokenize(prompt))+2))
} else {
_, err = openaitextclient.Completions.New(ctx, params)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}

return client
}

// sendSimpleChatRequest starts server using the given environment variables and sends one chat completions request
Expand Down Expand Up @@ -212,6 +223,7 @@ func getOpenAIClientAndChatParams(client option.HTTPClient, model string, messag
return openaiclient, params
}

// nolint
// getOpenAIClientAndTextParams - creates an openai client and params for /completions call based on the given parameters
func getOpenAIClientAndTextParams(client option.HTTPClient, model string, message string, streaming bool) (openai.Client, openai.CompletionNewParams) {
openaiclient := openai.NewClient(
Expand Down Expand Up @@ -445,7 +457,8 @@ func checkBucketBoundary(metrics string, modelName string, metricName string, bu
// ttft time to first token parameter
// prefillTimePerToken prefill time per input tokens
// interTokenLatency processing time per output token
func checkLatencyMertics(client *http.Client, modelName string, numOfInputTokens int, numOfOutputTokens int, ttft int, prefillTimePerToken int, interTokenLatency int) {
func checkLatencyMertics(client *http.Client, modelName string, numOfInputTokens int, numOfOutputTokens int, ttft int,
prefillTimePerToken int, interTokenLatency int, kvcacheTransferLatency int, kvCacheTransferTimePerToken int, doRemotePrefill bool) {
// wait a little bit and check metrics
time.Sleep(300 * time.Millisecond)
metricsResp, err := client.Get(metricsUrl)
Expand All @@ -456,30 +469,38 @@ func checkLatencyMertics(client *http.Client, modelName string, numOfInputTokens
gomega.Expect(err).NotTo(gomega.HaveOccurred())
metrics := string(data)

var expectedPrefillTime float64
// TODO take into consideration remote prefill
if ttft > 0 {
// time-to-first-token overwrites calculation of prefill time based on number of input tokens
expectedPrefillTime = float64(ttft) / 1000

expectedPrefillTimeInSecs := 0.0
if doRemotePrefill {
// when doRemotePrefill is true, this means that this is decode request and prefill was executed on remote vllm,
if kvcacheTransferLatency != 0 {
expectedPrefillTimeInSecs = float64(kvcacheTransferLatency) / 1000
} else {
expectedPrefillTimeInSecs = float64(kvCacheTransferTimePerToken*numOfOutputTokens) / 1000
}
} else {
expectedPrefillTime = float64(numOfInputTokens*prefillTimePerToken) / 1000
if ttft > 0 {
// time-to-first-token overwrites calculation of prefill time based on number of input tokens
expectedPrefillTimeInSecs = float64(ttft) / 1000

} else {
expectedPrefillTimeInSecs = float64(numOfInputTokens*prefillTimePerToken) / 1000
}
}
expectedDecodeTime := float64(interTokenLatency*(numOfOutputTokens-1)) / 1000
expectedE2ELatency := expectedPrefillTime + expectedDecodeTime
expectedDecodeTimeInSecs := float64(interTokenLatency*(numOfOutputTokens-1)) / 1000
expectedE2ELatency := expectedPrefillTimeInSecs + expectedDecodeTimeInSecs

prevBoundary := math.Inf(-1)

for _, bucketBoudary := range common.RequestLatencyBucketsBoundaries {
checkBucketBoundary(metrics, modelName, prefillTimeMetricName, bucketBoudary, prevBoundary, expectedPrefillTime)
checkBucketBoundary(metrics, modelName, decodeTimeMetricName, bucketBoudary, prevBoundary, expectedDecodeTime)
checkBucketBoundary(metrics, modelName, prefillTimeMetricName, bucketBoudary, prevBoundary, expectedPrefillTimeInSecs)
checkBucketBoundary(metrics, modelName, decodeTimeMetricName, bucketBoudary, prevBoundary, expectedDecodeTimeInSecs)
checkBucketBoundary(metrics, modelName, e2eReqLatencyMetricName, bucketBoudary, prevBoundary, expectedE2ELatency)

prevBoundary = bucketBoudary
}
// check the last bucket
lastBoundary := common.RequestLatencyBucketsBoundaries[len(common.RequestLatencyBucketsBoundaries)-1]
checkBucketBoundary(metrics, modelName, prefillTimeMetricName, math.Inf(1), lastBoundary, expectedPrefillTime)
checkBucketBoundary(metrics, modelName, decodeTimeMetricName, math.Inf(1), lastBoundary, expectedDecodeTime)
checkBucketBoundary(metrics, modelName, prefillTimeMetricName, math.Inf(1), lastBoundary, expectedPrefillTimeInSecs)
checkBucketBoundary(metrics, modelName, decodeTimeMetricName, math.Inf(1), lastBoundary, expectedDecodeTimeInSecs)
checkBucketBoundary(metrics, modelName, e2eReqLatencyMetricName, math.Inf(1), lastBoundary, expectedE2ELatency)
}
Loading