diff --git a/pkg/llm-d-inference-sim/metrics.go b/pkg/llm-d-inference-sim/metrics.go index 3fa60a2e..5b065648 100644 --- a/pkg/llm-d-inference-sim/metrics.go +++ b/pkg/llm-d-inference-sim/metrics.go @@ -19,9 +19,9 @@ limitations under the License. package llmdinferencesim import ( + "context" "strconv" "strings" - "sync/atomic" "time" "github.com/prometheus/client_golang/prometheus" @@ -157,9 +157,8 @@ func (s *VllmSimulator) reportRunningRequests() { return } if s.runningRequests != nil { - nRunningReqs := atomic.LoadInt64(&(s.nRunningReqs)) s.runningRequests.WithLabelValues( - s.getDisplayedModelName(s.config.Model)).Set(float64(nRunningReqs)) + s.getDisplayedModelName(s.config.Model)).Set(float64(s.nRunningReqs)) } } @@ -169,8 +168,46 @@ func (s *VllmSimulator) reportWaitingRequests() { return } if s.waitingRequests != nil { - nWaitingReqs := atomic.LoadInt64(&(s.nWaitingReqs)) s.waitingRequests.WithLabelValues( - s.getDisplayedModelName(s.config.Model)).Set(float64(nWaitingReqs)) + s.getDisplayedModelName(s.config.Model)).Set(float64(s.nWaitingReqs)) + } +} + +func (s *VllmSimulator) unregisterPrometheus() { + prometheus.Unregister(s.loraInfo) + prometheus.Unregister(s.runningRequests) + prometheus.Unregister(s.waitingRequests) + prometheus.Unregister(s.kvCacheUsagePercentage) +} + +// startMetricsUpdaters starts the various metrics updaters +func (s *VllmSimulator) startMetricsUpdaters(ctx context.Context) { + go s.waitingRequestsUpdater(ctx) + go s.runningRequestsUpdater(ctx) +} + +// waitingRequestsUpdater updates the waiting requests metric by listening on the relevant channel +func (s *VllmSimulator) waitingRequestsUpdater(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case inc := <-s.waitingReqChan: + s.nWaitingReqs += inc + s.reportWaitingRequests() + } + } +} + +// runningRequestsUpdater updates the running requests metric by listening on the relevant channel +func (s *VllmSimulator) runningRequestsUpdater(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case inc := <-s.runReqChan: + s.nRunningReqs += inc + s.reportRunningRequests() + } } } diff --git a/pkg/llm-d-inference-sim/metrics_test.go b/pkg/llm-d-inference-sim/metrics_test.go new file mode 100644 index 00000000..0d4e1f3c --- /dev/null +++ b/pkg/llm-d-inference-sim/metrics_test.go @@ -0,0 +1,276 @@ +/* +Copyright 2025 The llm-d-inference-sim Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package llmdinferencesim + +import ( + "context" + "io" + "net/http" + "regexp" + "strconv" + "strings" + "sync" + "time" + + "github.com/llm-d/llm-d-inference-sim/pkg/common" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/openai/openai-go" + "github.com/openai/openai-go/option" +) + +var _ = Describe("Simulator metrics", Ordered, func() { + It("Should send correct running and waiting requests metrics", func() { + modelName := "testmodel" + // Three requests, only two can run in parallel, we expect + // two running requests and one waiting request in the metrics + ctx := context.TODO() + args := []string{"cmd", "--model", modelName, "--mode", common.ModeRandom, + "--time-to-first-token", "3000", "--max-num-seqs", "2"} + + s, client, err := startServerWithArgsAndMetrics(ctx, common.ModeRandom, args, nil, true) + Expect(err).NotTo(HaveOccurred()) + defer s.unregisterPrometheus() + + openaiclient := openai.NewClient( + option.WithBaseURL(baseURL), + option.WithHTTPClient(client)) + + params := openai.ChatCompletionNewParams{ + Messages: []openai.ChatCompletionMessageParamUnion{ + openai.UserMessage(userMessage), + }, + Model: modelName, + } + + var wg sync.WaitGroup + wg.Add(1) + + for range 3 { + go func() { + defer GinkgoRecover() + _, err := openaiclient.Chat.Completions.New(ctx, params) + Expect(err).NotTo(HaveOccurred()) + }() + } + + go func() { + defer wg.Done() + defer GinkgoRecover() + + time.Sleep(300 * time.Millisecond) + metricsResp, err := client.Get("http://localhost/metrics") + Expect(err).NotTo(HaveOccurred()) + Expect(metricsResp.StatusCode).To(Equal(http.StatusOK)) + + data, err := io.ReadAll(metricsResp.Body) + Expect(err).NotTo(HaveOccurred()) + metrics := string(data) + Expect(metrics).To(ContainSubstring("vllm:num_requests_running{model_name=\"testmodel\"} 2")) + Expect(metrics).To(ContainSubstring("vllm:num_requests_waiting{model_name=\"testmodel\"} 1")) + }() + + wg.Wait() + }) + + It("Should send correct lora metrics", func() { + ctx := context.TODO() + args := []string{"cmd", "--model", model, "--mode", common.ModeRandom, + "--time-to-first-token", "3000", + "--lora-modules", "{\"name\":\"lora1\",\"path\":\"/path/to/lora1\"}", + "{\"name\":\"lora2\",\"path\":\"/path/to/lora2\"}"} + + s, client, err := startServerWithArgsAndMetrics(ctx, common.ModeRandom, args, nil, true) + Expect(err).NotTo(HaveOccurred()) + defer s.unregisterPrometheus() + + openaiclient := openai.NewClient( + option.WithBaseURL(baseURL), + option.WithHTTPClient(client)) + + params1 := openai.ChatCompletionNewParams{ + Messages: []openai.ChatCompletionMessageParamUnion{ + openai.UserMessage(userMessage), + }, + Model: "lora1", + } + + _, err = openaiclient.Chat.Completions.New(ctx, params1) + Expect(err).NotTo(HaveOccurred()) + + params2 := openai.ChatCompletionNewParams{ + Messages: []openai.ChatCompletionMessageParamUnion{ + openai.UserMessage(userMessage), + }, + Model: "lora2", + } + + _, err = openaiclient.Chat.Completions.New(ctx, params2) + Expect(err).NotTo(HaveOccurred()) + + metricsResp, err := client.Get("http://localhost/metrics") + Expect(err).NotTo(HaveOccurred()) + Expect(metricsResp.StatusCode).To(Equal(http.StatusOK)) + + data, err := io.ReadAll(metricsResp.Body) + Expect(err).NotTo(HaveOccurred()) + metrics := string(data) + + // We sent two sequentual requests to two different LoRAs, we expect to see (in this order) + // 1. running_lora_adapter = lora1 + // 2. running_lora_adapter = lora2 + // 3. running_lora_adapter = {} + lora1 := "vllm:lora_requests_info{max_lora=\"1\",running_lora_adapters=\"lora1\",waiting_lora_adapters=\"\"}" + lora2 := "vllm:lora_requests_info{max_lora=\"1\",running_lora_adapters=\"lora2\",waiting_lora_adapters=\"\"}" + empty := "vllm:lora_requests_info{max_lora=\"1\",running_lora_adapters=\"\",waiting_lora_adapters=\"\"}" + + Expect(metrics).To(ContainSubstring(lora1)) + Expect(metrics).To(ContainSubstring(lora2)) + Expect(metrics).To(ContainSubstring(empty)) + + // Check the order + lora1Timestamp := extractTimestamp(metrics, lora1) + lora2Timestamp := extractTimestamp(metrics, lora2) + noLorasTimestamp := extractTimestamp(metrics, empty) + + Expect(lora1Timestamp < lora2Timestamp).To(BeTrue()) + Expect(lora2Timestamp < noLorasTimestamp).To(BeTrue()) + }) + + It("Should send correct lora metrics for parallel requests", func() { + ctx := context.TODO() + args := []string{"cmd", "--model", model, "--mode", common.ModeRandom, + "--time-to-first-token", "2000", + "--lora-modules", "{\"name\":\"lora1\",\"path\":\"/path/to/lora1\"}", + "{\"name\":\"lora2\",\"path\":\"/path/to/lora2\"}"} + + s, client, err := startServerWithArgsAndMetrics(ctx, common.ModeRandom, args, nil, true) + Expect(err).NotTo(HaveOccurred()) + + defer s.unregisterPrometheus() + + openaiclient := openai.NewClient( + option.WithBaseURL(baseURL), + option.WithHTTPClient(client)) + + params1 := openai.ChatCompletionNewParams{ + Messages: []openai.ChatCompletionMessageParamUnion{ + openai.UserMessage(userMessage), + }, + Model: "lora1", + } + + params2 := openai.ChatCompletionNewParams{ + Messages: []openai.ChatCompletionMessageParamUnion{ + openai.UserMessage(userMessage), + }, + Model: "lora2", + } + + var wg sync.WaitGroup + wg.Add(1) + + go func() { + time.Sleep(1 * time.Second) + defer wg.Done() + defer GinkgoRecover() + _, err := openaiclient.Chat.Completions.New(ctx, params2) + Expect(err).NotTo(HaveOccurred()) + }() + + _, err = openaiclient.Chat.Completions.New(ctx, params1) + Expect(err).NotTo(HaveOccurred()) + + wg.Wait() + + metricsResp, err := client.Get("http://localhost/metrics") + Expect(err).NotTo(HaveOccurred()) + Expect(metricsResp.StatusCode).To(Equal(http.StatusOK)) + + data, err := io.ReadAll(metricsResp.Body) + Expect(err).NotTo(HaveOccurred()) + metrics := string(data) + + // We sent two parallel requests: first to lora1 and then to lora2 (with a delay), we expect + // to see (in this order) + // 1. running_lora_adapter = lora1 + // 2. running_lora_adapter = lora2,lora1 (the order of LoRAs doesn't matter here) + // 3. running_lora_adapter = lora2 + // 4. running_lora_adapter = {} + lora1 := "vllm:lora_requests_info{max_lora=\"1\",running_lora_adapters=\"lora1\",waiting_lora_adapters=\"\"}" + lora12 := "vllm:lora_requests_info{max_lora=\"1\",running_lora_adapters=\"lora1,lora2\",waiting_lora_adapters=\"\"}" + lora21 := "vllm:lora_requests_info{max_lora=\"1\",running_lora_adapters=\"lora2,lora1\",waiting_lora_adapters=\"\"}" + lora2 := "vllm:lora_requests_info{max_lora=\"1\",running_lora_adapters=\"lora2\",waiting_lora_adapters=\"\"}" + empty := "vllm:lora_requests_info{max_lora=\"1\",running_lora_adapters=\"\",waiting_lora_adapters=\"\"}" + + Expect(metrics).To(ContainSubstring(lora1)) + Expect(metrics).To(Or(ContainSubstring(lora12), ContainSubstring(lora21))) + Expect(metrics).To(ContainSubstring(lora2)) + Expect(metrics).To(ContainSubstring(empty)) + + // Check the order + lora1Timestamp := extractTimestamp(metrics, lora1) + lora2Timestamp := extractTimestamp(metrics, lora2) + noLorasTimestamp := extractTimestamp(metrics, empty) + var twoLorasTimestamp float64 + if strings.Contains(metrics, lora12) { + twoLorasTimestamp = extractTimestamp(metrics, lora12) + } else { + twoLorasTimestamp = extractTimestamp(metrics, lora21) + } + Expect(lora1Timestamp < twoLorasTimestamp).To(BeTrue()) + Expect(twoLorasTimestamp < lora2Timestamp).To(BeTrue()) + Expect(lora2Timestamp < noLorasTimestamp).To(BeTrue()) + }) + + Context("fake metrics", func() { + It("Should respond with fake metrics to /metrics", func() { + ctx := context.TODO() + args := []string{"cmd", "--model", model, "--mode", common.ModeRandom, + "--fake-metrics", + "{\"running-requests\":10,\"waiting-requests\":30,\"kv-cache-usage\":0.4,\"loras\":[{\"running\":\"lora4,lora2\",\"waiting\":\"lora3\",\"timestamp\":1257894567},{\"running\":\"lora4,lora3\",\"waiting\":\"\",\"timestamp\":1257894569}]}", + } + + s, client, err := startServerWithArgsAndMetrics(ctx, common.ModeRandom, args, nil, true) + Expect(err).NotTo(HaveOccurred()) + + defer s.unregisterPrometheus() + + resp, err := client.Get("http://localhost/metrics") + Expect(err).NotTo(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusOK)) + + data, err := io.ReadAll(resp.Body) + Expect(err).NotTo(HaveOccurred()) + metrics := string(data) + Expect(metrics).To(ContainSubstring("vllm:num_requests_running{model_name=\"my_model\"} 10")) + Expect(metrics).To(ContainSubstring("vllm:num_requests_waiting{model_name=\"my_model\"} 30")) + Expect(metrics).To(ContainSubstring("vllm:gpu_cache_usage_perc{model_name=\"my_model\"} 0.4")) + Expect(metrics).To(ContainSubstring("vllm:lora_requests_info{max_lora=\"1\",running_lora_adapters=\"lora4,lora2\",waiting_lora_adapters=\"lora3\"} 1.257894567e+09")) + Expect(metrics).To(ContainSubstring("vllm:lora_requests_info{max_lora=\"1\",running_lora_adapters=\"lora4,lora3\",waiting_lora_adapters=\"\"} 1.257894569e+09")) + }) + }) +}) + +func extractTimestamp(metrics string, key string) float64 { + re := regexp.MustCompile(key + ` (\S+)`) + result := re.FindStringSubmatch(metrics) + Expect(len(result)).To(BeNumerically(">", 1)) + f, err := strconv.ParseFloat(result[1], 64) + Expect(err).NotTo(HaveOccurred()) + return f +} diff --git a/pkg/llm-d-inference-sim/simulator.go b/pkg/llm-d-inference-sim/simulator.go index d9813996..9f56f798 100644 --- a/pkg/llm-d-inference-sim/simulator.go +++ b/pkg/llm-d-inference-sim/simulator.go @@ -26,7 +26,6 @@ import ( "os" "strings" "sync" - "sync/atomic" "time" "github.com/buaazp/fasthttprouter" @@ -52,6 +51,8 @@ const ( namespaceHeader = "x-inference-namespace" podNameEnv = "POD_NAME" podNsEnv = "POD_NAMESPACE" + + maxNumberOfRequests = 1000 ) // VllmSimulator simulates vLLM server supporting OpenAI API @@ -69,8 +70,12 @@ type VllmSimulator struct { waitingLoras sync.Map // nRunningReqs is the number of inference requests that are currently being processed nRunningReqs int64 + // runReqChan is a channel to update nRunningReqs + runReqChan chan int64 // nWaitingReqs is the number of inference requests that are waiting to be processed nWaitingReqs int64 + // waitingReqChan is a channel to update nWaitingReqs + waitingReqChan chan int64 // loraInfo is prometheus gauge loraInfo *prometheus.GaugeVec // runningRequests is prometheus gauge @@ -93,18 +98,20 @@ type VllmSimulator struct { // New creates a new VllmSimulator instance with the given logger func New(logger logr.Logger) (*VllmSimulator, error) { - toolsValidtor, err := openaiserverapi.CreateValidator() + toolsValidator, err := openaiserverapi.CreateValidator() if err != nil { return nil, fmt.Errorf("failed to create tools validator: %s", err) } return &VllmSimulator{ logger: logger, - reqChan: make(chan *openaiserverapi.CompletionReqCtx, 1000), - toolsValidator: toolsValidtor, + reqChan: make(chan *openaiserverapi.CompletionReqCtx, maxNumberOfRequests), + toolsValidator: toolsValidator, kvcacheHelper: nil, // kvcache helper will be created only if required after reading configuration namespace: os.Getenv(podNsEnv), pod: os.Getenv(podNameEnv), + runReqChan: make(chan int64, maxNumberOfRequests), + waitingReqChan: make(chan int64, maxNumberOfRequests), }, nil } @@ -148,6 +155,9 @@ func (s *VllmSimulator) Start(ctx context.Context) error { for i := 1; i <= s.config.MaxNumSeqs; i++ { go s.reqProcessingWorker(ctx, i) } + + s.startMetricsUpdaters(ctx) + listener, err := s.newListener() if err != nil { return err @@ -378,9 +388,8 @@ func (s *VllmSimulator) handleCompletions(ctx *fasthttp.RequestCtx, isChatComple IsChatCompletion: isChatCompletion, Wg: &wg, } + s.waitingReqChan <- 1 s.reqChan <- reqCtx - atomic.StoreInt64(&(s.nWaitingReqs), int64(len(s.reqChan))) - s.reportWaitingRequests() wg.Wait() } @@ -395,8 +404,8 @@ func (s *VllmSimulator) reqProcessingWorker(ctx context.Context, id int) { s.logger.Info("reqProcessingWorker worker exiting: reqChan closed") return } - atomic.StoreInt64(&(s.nWaitingReqs), int64(len(s.reqChan))) - s.reportWaitingRequests() + + s.waitingReqChan <- -1 req := reqCtx.CompletionReq model := req.GetModel() @@ -419,8 +428,8 @@ func (s *VllmSimulator) reqProcessingWorker(ctx context.Context, id int) { // TODO - check if this request went to the waiting queue - add it to waiting map s.reportLoras() } - atomic.AddInt64(&(s.nRunningReqs), 1) - s.reportRunningRequests() + + s.runReqChan <- 1 var responseTokens []string var finishReason string @@ -491,9 +500,7 @@ func (s *VllmSimulator) reqProcessingWorker(ctx context.Context, id int) { // decrease model usage reference number func (s *VllmSimulator) responseSentCallback(model string) { - - atomic.AddInt64(&(s.nRunningReqs), -1) - s.reportRunningRequests() + s.runReqChan <- -1 // Only LoRA models require reference-count handling. if !s.isLora(model) { diff --git a/pkg/llm-d-inference-sim/simulator_test.go b/pkg/llm-d-inference-sim/simulator_test.go index 88d87759..2641e5b9 100644 --- a/pkg/llm-d-inference-sim/simulator_test.go +++ b/pkg/llm-d-inference-sim/simulator_test.go @@ -49,10 +49,12 @@ func startServer(ctx context.Context, mode string) (*http.Client, error) { } func startServerWithArgs(ctx context.Context, mode string, args []string, envs map[string]string) (*http.Client, error) { - return startServerWithArgsAndMetrics(ctx, mode, args, envs, false) + _, client, err := startServerWithArgsAndMetrics(ctx, mode, args, envs, false) + return client, err } -func startServerWithArgsAndMetrics(ctx context.Context, mode string, args []string, envs map[string]string, setMetrics bool) (*http.Client, error) { +func startServerWithArgsAndMetrics(ctx context.Context, mode string, args []string, envs map[string]string, + setMetrics bool) (*VllmSimulator, *http.Client, error) { oldArgs := os.Args defer func() { os.Args = oldArgs @@ -82,11 +84,11 @@ func startServerWithArgsAndMetrics(ctx context.Context, mode string, args []stri s, err := New(logger) if err != nil { - return nil, err + return nil, nil, err } config, err := common.ParseCommandParamsAndLoadConfig() if err != nil { - return nil, err + return nil, nil, err } s.config = config @@ -99,7 +101,7 @@ func startServerWithArgsAndMetrics(ctx context.Context, mode string, args []stri if setMetrics { err = s.createAndRegisterPrometheus() if err != nil { - return nil, err + return nil, nil, err } } @@ -112,6 +114,8 @@ func startServerWithArgsAndMetrics(ctx context.Context, mode string, args []stri go s.reqProcessingWorker(ctx, i) } + s.startMetricsUpdaters(ctx) + listener := fasthttputil.NewInmemoryListener() // start the http server @@ -121,7 +125,7 @@ func startServerWithArgsAndMetrics(ctx context.Context, mode string, args []stri } }() - return &http.Client{ + return s, &http.Client{ Transport: &http.Transport{ DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { return listener.Dial() @@ -429,209 +433,211 @@ var _ = Describe("Simulator", func() { Expect(resp.StatusCode).To(Equal(http.StatusOK)) }) - It("Should not include namespace and pod headers in chat completion response when env is not set", func() { - ctx := context.TODO() + Context("namespace and pod headers", func() { + It("Should not include namespace and pod headers in chat completion response when env is not set", func() { + ctx := context.TODO() - client, err := startServer(ctx, common.ModeRandom) - Expect(err).NotTo(HaveOccurred()) + client, err := startServer(ctx, common.ModeRandom) + Expect(err).NotTo(HaveOccurred()) - openaiclient := openai.NewClient( - option.WithBaseURL(baseURL), - option.WithHTTPClient(client)) + openaiclient := openai.NewClient( + option.WithBaseURL(baseURL), + option.WithHTTPClient(client)) - params := openai.ChatCompletionNewParams{ - Messages: []openai.ChatCompletionMessageParamUnion{ - openai.UserMessage(userMessage), - }, - Model: model, - } + params := openai.ChatCompletionNewParams{ + Messages: []openai.ChatCompletionMessageParamUnion{ + openai.UserMessage(userMessage), + }, + Model: model, + } - var httpResp *http.Response - resp, err := openaiclient.Chat.Completions.New(ctx, params, option.WithResponseInto(&httpResp)) - Expect(err).NotTo(HaveOccurred()) - Expect(resp).NotTo(BeNil()) + var httpResp *http.Response + resp, err := openaiclient.Chat.Completions.New(ctx, params, option.WithResponseInto(&httpResp)) + Expect(err).NotTo(HaveOccurred()) + Expect(resp).NotTo(BeNil()) - // Check for namespace and pod headers - namespaceHeader := httpResp.Header.Get(namespaceHeader) - podHeader := httpResp.Header.Get(podHeader) + // Check for namespace and pod headers + namespaceHeader := httpResp.Header.Get(namespaceHeader) + podHeader := httpResp.Header.Get(podHeader) - Expect(namespaceHeader).To(BeEmpty(), "Expected namespace header not to be present") - Expect(podHeader).To(BeEmpty(), "Expected pod header not to be present") - }) + Expect(namespaceHeader).To(BeEmpty(), "Expected namespace header not to be present") + Expect(podHeader).To(BeEmpty(), "Expected pod header not to be present") + }) - It("Should include namespace and pod headers in chat completion response", func() { - ctx := context.TODO() + It("Should include namespace and pod headers in chat completion response", func() { + ctx := context.TODO() - testNamespace := "test-namespace" - testPod := "test-pod" - envs := map[string]string{ - podNameEnv: testPod, - podNsEnv: testNamespace, - } - client, err := startServerWithArgs(ctx, common.ModeRandom, nil, envs) - Expect(err).NotTo(HaveOccurred()) + testNamespace := "test-namespace" + testPod := "test-pod" + envs := map[string]string{ + podNameEnv: testPod, + podNsEnv: testNamespace, + } + client, err := startServerWithArgs(ctx, common.ModeRandom, nil, envs) + Expect(err).NotTo(HaveOccurred()) - openaiclient := openai.NewClient( - option.WithBaseURL(baseURL), - option.WithHTTPClient(client)) + openaiclient := openai.NewClient( + option.WithBaseURL(baseURL), + option.WithHTTPClient(client)) - params := openai.ChatCompletionNewParams{ - Messages: []openai.ChatCompletionMessageParamUnion{ - openai.UserMessage(userMessage), - }, - Model: model, - } + params := openai.ChatCompletionNewParams{ + Messages: []openai.ChatCompletionMessageParamUnion{ + openai.UserMessage(userMessage), + }, + Model: model, + } - var httpResp *http.Response - resp, err := openaiclient.Chat.Completions.New(ctx, params, option.WithResponseInto(&httpResp)) - Expect(err).NotTo(HaveOccurred()) - Expect(resp).NotTo(BeNil()) + var httpResp *http.Response + resp, err := openaiclient.Chat.Completions.New(ctx, params, option.WithResponseInto(&httpResp)) + Expect(err).NotTo(HaveOccurred()) + Expect(resp).NotTo(BeNil()) - // Check for namespace and pod headers - namespaceHeader := httpResp.Header.Get(namespaceHeader) - podHeader := httpResp.Header.Get(podHeader) + // Check for namespace and pod headers + namespaceHeader := httpResp.Header.Get(namespaceHeader) + podHeader := httpResp.Header.Get(podHeader) - Expect(namespaceHeader).To(Equal(testNamespace), "Expected namespace header to be present") - Expect(podHeader).To(Equal(testPod), "Expected pod header to be present") - }) + Expect(namespaceHeader).To(Equal(testNamespace), "Expected namespace header to be present") + Expect(podHeader).To(Equal(testPod), "Expected pod header to be present") + }) - It("Should include namespace and pod headers in chat completion streaming response", func() { - ctx := context.TODO() + It("Should include namespace and pod headers in chat completion streaming response", func() { + ctx := context.TODO() - testNamespace := "stream-test-namespace" - testPod := "stream-test-pod" - envs := map[string]string{ - podNameEnv: testPod, - podNsEnv: testNamespace, - } - client, err := startServerWithArgs(ctx, common.ModeRandom, nil, envs) - Expect(err).NotTo(HaveOccurred()) + testNamespace := "stream-test-namespace" + testPod := "stream-test-pod" + envs := map[string]string{ + podNameEnv: testPod, + podNsEnv: testNamespace, + } + client, err := startServerWithArgs(ctx, common.ModeRandom, nil, envs) + Expect(err).NotTo(HaveOccurred()) - openaiclient := openai.NewClient( - option.WithBaseURL(baseURL), - option.WithHTTPClient(client)) + openaiclient := openai.NewClient( + option.WithBaseURL(baseURL), + option.WithHTTPClient(client)) - params := openai.ChatCompletionNewParams{ - Messages: []openai.ChatCompletionMessageParamUnion{ - openai.UserMessage(userMessage), - }, - Model: model, - StreamOptions: openai.ChatCompletionStreamOptionsParam{IncludeUsage: param.NewOpt(true)}, - } + params := openai.ChatCompletionNewParams{ + Messages: []openai.ChatCompletionMessageParamUnion{ + openai.UserMessage(userMessage), + }, + Model: model, + StreamOptions: openai.ChatCompletionStreamOptionsParam{IncludeUsage: param.NewOpt(true)}, + } - var httpResp *http.Response - resp, err := openaiclient.Chat.Completions.New(ctx, params, option.WithResponseInto(&httpResp)) - Expect(err).NotTo(HaveOccurred()) - Expect(resp).NotTo(BeNil()) + var httpResp *http.Response + resp, err := openaiclient.Chat.Completions.New(ctx, params, option.WithResponseInto(&httpResp)) + Expect(err).NotTo(HaveOccurred()) + Expect(resp).NotTo(BeNil()) - // Check for namespace and pod headers - namespaceHeader := httpResp.Header.Get(namespaceHeader) - podHeader := httpResp.Header.Get(podHeader) + // Check for namespace and pod headers + namespaceHeader := httpResp.Header.Get(namespaceHeader) + podHeader := httpResp.Header.Get(podHeader) - Expect(namespaceHeader).To(Equal(testNamespace), "Expected namespace header to be present") - Expect(podHeader).To(Equal(testPod), "Expected pod header to be present") - }) + Expect(namespaceHeader).To(Equal(testNamespace), "Expected namespace header to be present") + Expect(podHeader).To(Equal(testPod), "Expected pod header to be present") + }) - It("Should not include namespace and pod headers in chat completion streaming response when env is not set", func() { - ctx := context.TODO() + It("Should not include namespace and pod headers in chat completion streaming response when env is not set", func() { + ctx := context.TODO() - client, err := startServer(ctx, common.ModeRandom) - Expect(err).NotTo(HaveOccurred()) + client, err := startServer(ctx, common.ModeRandom) + Expect(err).NotTo(HaveOccurred()) - openaiclient := openai.NewClient( - option.WithBaseURL(baseURL), - option.WithHTTPClient(client)) + openaiclient := openai.NewClient( + option.WithBaseURL(baseURL), + option.WithHTTPClient(client)) - params := openai.ChatCompletionNewParams{ - Messages: []openai.ChatCompletionMessageParamUnion{ - openai.UserMessage(userMessage), - }, - Model: model, - StreamOptions: openai.ChatCompletionStreamOptionsParam{IncludeUsage: param.NewOpt(true)}, - } + params := openai.ChatCompletionNewParams{ + Messages: []openai.ChatCompletionMessageParamUnion{ + openai.UserMessage(userMessage), + }, + Model: model, + StreamOptions: openai.ChatCompletionStreamOptionsParam{IncludeUsage: param.NewOpt(true)}, + } - var httpResp *http.Response - resp, err := openaiclient.Chat.Completions.New(ctx, params, option.WithResponseInto(&httpResp)) - Expect(err).NotTo(HaveOccurred()) - Expect(resp).NotTo(BeNil()) + var httpResp *http.Response + resp, err := openaiclient.Chat.Completions.New(ctx, params, option.WithResponseInto(&httpResp)) + Expect(err).NotTo(HaveOccurred()) + Expect(resp).NotTo(BeNil()) - // Check for namespace and pod headers - namespaceHeader := httpResp.Header.Get(namespaceHeader) - podHeader := httpResp.Header.Get(podHeader) + // Check for namespace and pod headers + namespaceHeader := httpResp.Header.Get(namespaceHeader) + podHeader := httpResp.Header.Get(podHeader) - Expect(namespaceHeader).To(BeEmpty(), "Expected namespace header not to be present") - Expect(podHeader).To(BeEmpty(), "Expected pod header not to be present") - }) + Expect(namespaceHeader).To(BeEmpty(), "Expected namespace header not to be present") + Expect(podHeader).To(BeEmpty(), "Expected pod header not to be present") + }) - It("Should include namespace and pod headers in completion response", func() { - ctx := context.TODO() + It("Should include namespace and pod headers in completion response", func() { + ctx := context.TODO() - testNamespace := "test-namespace" - testPod := "test-pod" - envs := map[string]string{ - podNameEnv: testPod, - podNsEnv: testNamespace, - } - client, err := startServerWithArgs(ctx, common.ModeRandom, nil, envs) - Expect(err).NotTo(HaveOccurred()) + testNamespace := "test-namespace" + testPod := "test-pod" + envs := map[string]string{ + podNameEnv: testPod, + podNsEnv: testNamespace, + } + client, err := startServerWithArgs(ctx, common.ModeRandom, nil, envs) + Expect(err).NotTo(HaveOccurred()) - openaiclient := openai.NewClient( - option.WithBaseURL(baseURL), - option.WithHTTPClient(client)) + openaiclient := openai.NewClient( + option.WithBaseURL(baseURL), + option.WithHTTPClient(client)) - params := openai.CompletionNewParams{ - Prompt: openai.CompletionNewParamsPromptUnion{ - OfString: openai.String(userMessage), - }, - Model: openai.CompletionNewParamsModel(model), - } - var httpResp *http.Response - resp, err := openaiclient.Completions.New(ctx, params, option.WithResponseInto(&httpResp)) - Expect(err).NotTo(HaveOccurred()) - Expect(resp).NotTo(BeNil()) + params := openai.CompletionNewParams{ + Prompt: openai.CompletionNewParamsPromptUnion{ + OfString: openai.String(userMessage), + }, + Model: openai.CompletionNewParamsModel(model), + } + var httpResp *http.Response + resp, err := openaiclient.Completions.New(ctx, params, option.WithResponseInto(&httpResp)) + Expect(err).NotTo(HaveOccurred()) + Expect(resp).NotTo(BeNil()) - // Check for namespace and pod headers - namespaceHeader := httpResp.Header.Get(namespaceHeader) - podHeader := httpResp.Header.Get(podHeader) + // Check for namespace and pod headers + namespaceHeader := httpResp.Header.Get(namespaceHeader) + podHeader := httpResp.Header.Get(podHeader) - Expect(namespaceHeader).To(Equal(testNamespace), "Expected namespace header to be present") - Expect(podHeader).To(Equal(testPod), "Expected pod header to be present") - }) + Expect(namespaceHeader).To(Equal(testNamespace), "Expected namespace header to be present") + Expect(podHeader).To(Equal(testPod), "Expected pod header to be present") + }) - It("Should include namespace and pod headers in completion streaming response", func() { - ctx := context.TODO() + It("Should include namespace and pod headers in completion streaming response", func() { + ctx := context.TODO() - testNamespace := "stream-test-namespace" - testPod := "stream-test-pod" - envs := map[string]string{ - podNameEnv: testPod, - podNsEnv: testNamespace, - } - client, err := startServerWithArgs(ctx, common.ModeRandom, nil, envs) - Expect(err).NotTo(HaveOccurred()) + testNamespace := "stream-test-namespace" + testPod := "stream-test-pod" + envs := map[string]string{ + podNameEnv: testPod, + podNsEnv: testNamespace, + } + client, err := startServerWithArgs(ctx, common.ModeRandom, nil, envs) + Expect(err).NotTo(HaveOccurred()) - openaiclient := openai.NewClient( - option.WithBaseURL(baseURL), - option.WithHTTPClient(client)) + openaiclient := openai.NewClient( + option.WithBaseURL(baseURL), + option.WithHTTPClient(client)) - params := openai.CompletionNewParams{ - Prompt: openai.CompletionNewParamsPromptUnion{ - OfString: openai.String(userMessage), - }, - Model: openai.CompletionNewParamsModel(model), - StreamOptions: openai.ChatCompletionStreamOptionsParam{IncludeUsage: param.NewOpt(true)}, - } - var httpResp *http.Response - resp, err := openaiclient.Completions.New(ctx, params, option.WithResponseInto(&httpResp)) - Expect(err).NotTo(HaveOccurred()) - Expect(resp).NotTo(BeNil()) + params := openai.CompletionNewParams{ + Prompt: openai.CompletionNewParamsPromptUnion{ + OfString: openai.String(userMessage), + }, + Model: openai.CompletionNewParamsModel(model), + StreamOptions: openai.ChatCompletionStreamOptionsParam{IncludeUsage: param.NewOpt(true)}, + } + var httpResp *http.Response + resp, err := openaiclient.Completions.New(ctx, params, option.WithResponseInto(&httpResp)) + Expect(err).NotTo(HaveOccurred()) + Expect(resp).NotTo(BeNil()) - // Check for namespace and pod headers - namespaceHeader := httpResp.Header.Get(namespaceHeader) - podHeader := httpResp.Header.Get(podHeader) + // Check for namespace and pod headers + namespaceHeader := httpResp.Header.Get(namespaceHeader) + podHeader := httpResp.Header.Get(podHeader) - Expect(namespaceHeader).To(Equal(testNamespace), "Expected namespace header to be present") - Expect(podHeader).To(Equal(testPod), "Expected pod header to be present") + Expect(namespaceHeader).To(Equal(testNamespace), "Expected namespace header to be present") + Expect(podHeader).To(Equal(testPod), "Expected pod header to be present") + }) }) Context("max-model-len context window validation", func() { @@ -824,30 +830,4 @@ var _ = Describe("Simulator", func() { ) }) - Context("fake metrics", func() { - It("Should respond with fake metrics to /metrics", func() { - ctx := context.TODO() - args := []string{"cmd", "--model", model, "--mode", common.ModeRandom, - "--fake-metrics", - "{\"running-requests\":10,\"waiting-requests\":30,\"kv-cache-usage\":0.4,\"loras\":[{\"running\":\"lora4,lora2\",\"waiting\":\"lora3\",\"timestamp\":1257894567},{\"running\":\"lora4,lora3\",\"waiting\":\"\",\"timestamp\":1257894569}]}", - } - - client, err := startServerWithArgsAndMetrics(ctx, common.ModeRandom, args, nil, true) - Expect(err).NotTo(HaveOccurred()) - - resp, err := client.Get("http://localhost/metrics") - Expect(err).NotTo(HaveOccurred()) - Expect(resp.StatusCode).To(Equal(http.StatusOK)) - - data, err := io.ReadAll(resp.Body) - Expect(err).NotTo(HaveOccurred()) - metrics := string(data) - Expect(metrics).To(ContainSubstring("vllm:num_requests_running{model_name=\"my_model\"} 10")) - Expect(metrics).To(ContainSubstring("vllm:num_requests_waiting{model_name=\"my_model\"} 30")) - Expect(metrics).To(ContainSubstring("vllm:gpu_cache_usage_perc{model_name=\"my_model\"} 0.4")) - Expect(metrics).To(ContainSubstring("vllm:lora_requests_info{max_lora=\"1\",running_lora_adapters=\"lora4,lora2\",waiting_lora_adapters=\"lora3\"} 1.257894567e+09")) - Expect(metrics).To(ContainSubstring("vllm:lora_requests_info{max_lora=\"1\",running_lora_adapters=\"lora4,lora3\",waiting_lora_adapters=\"\"} 1.257894569e+09")) - - }) - }) })