Skip to content

Commit 57657bf

Browse files
authored
Use channels for metrics updates, added metrics tests (#171)
* Use channels for metrics updates. Metrics tests Signed-off-by: Ira <[email protected]> * Review comments Signed-off-by: Ira <[email protected]> --------- Signed-off-by: Ira <[email protected]>
1 parent 06f6e42 commit 57657bf

File tree

4 files changed

+514
-214
lines changed

4 files changed

+514
-214
lines changed

pkg/llm-d-inference-sim/metrics.go

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ limitations under the License.
1919
package llmdinferencesim
2020

2121
import (
22+
"context"
2223
"strconv"
2324
"strings"
24-
"sync/atomic"
2525
"time"
2626

2727
"github.com/prometheus/client_golang/prometheus"
@@ -157,9 +157,8 @@ func (s *VllmSimulator) reportRunningRequests() {
157157
return
158158
}
159159
if s.runningRequests != nil {
160-
nRunningReqs := atomic.LoadInt64(&(s.nRunningReqs))
161160
s.runningRequests.WithLabelValues(
162-
s.getDisplayedModelName(s.config.Model)).Set(float64(nRunningReqs))
161+
s.getDisplayedModelName(s.config.Model)).Set(float64(s.nRunningReqs))
163162
}
164163
}
165164

@@ -169,8 +168,46 @@ func (s *VllmSimulator) reportWaitingRequests() {
169168
return
170169
}
171170
if s.waitingRequests != nil {
172-
nWaitingReqs := atomic.LoadInt64(&(s.nWaitingReqs))
173171
s.waitingRequests.WithLabelValues(
174-
s.getDisplayedModelName(s.config.Model)).Set(float64(nWaitingReqs))
172+
s.getDisplayedModelName(s.config.Model)).Set(float64(s.nWaitingReqs))
173+
}
174+
}
175+
176+
func (s *VllmSimulator) unregisterPrometheus() {
177+
prometheus.Unregister(s.loraInfo)
178+
prometheus.Unregister(s.runningRequests)
179+
prometheus.Unregister(s.waitingRequests)
180+
prometheus.Unregister(s.kvCacheUsagePercentage)
181+
}
182+
183+
// startMetricsUpdaters starts the various metrics updaters
184+
func (s *VllmSimulator) startMetricsUpdaters(ctx context.Context) {
185+
go s.waitingRequestsUpdater(ctx)
186+
go s.runningRequestsUpdater(ctx)
187+
}
188+
189+
// waitingRequestsUpdater updates the waiting requests metric by listening on the relevant channel
190+
func (s *VllmSimulator) waitingRequestsUpdater(ctx context.Context) {
191+
for {
192+
select {
193+
case <-ctx.Done():
194+
return
195+
case inc := <-s.waitingReqChan:
196+
s.nWaitingReqs += inc
197+
s.reportWaitingRequests()
198+
}
199+
}
200+
}
201+
202+
// runningRequestsUpdater updates the running requests metric by listening on the relevant channel
203+
func (s *VllmSimulator) runningRequestsUpdater(ctx context.Context) {
204+
for {
205+
select {
206+
case <-ctx.Done():
207+
return
208+
case inc := <-s.runReqChan:
209+
s.nRunningReqs += inc
210+
s.reportRunningRequests()
211+
}
175212
}
176213
}
Lines changed: 276 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
/*
2+
Copyright 2025 The llm-d-inference-sim Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package llmdinferencesim
18+
19+
import (
20+
"context"
21+
"io"
22+
"net/http"
23+
"regexp"
24+
"strconv"
25+
"strings"
26+
"sync"
27+
"time"
28+
29+
"github.com/llm-d/llm-d-inference-sim/pkg/common"
30+
. "github.com/onsi/ginkgo/v2"
31+
. "github.com/onsi/gomega"
32+
"github.com/openai/openai-go"
33+
"github.com/openai/openai-go/option"
34+
)
35+
36+
var _ = Describe("Simulator metrics", Ordered, func() {
37+
It("Should send correct running and waiting requests metrics", func() {
38+
modelName := "testmodel"
39+
// Three requests, only two can run in parallel, we expect
40+
// two running requests and one waiting request in the metrics
41+
ctx := context.TODO()
42+
args := []string{"cmd", "--model", modelName, "--mode", common.ModeRandom,
43+
"--time-to-first-token", "3000", "--max-num-seqs", "2"}
44+
45+
s, client, err := startServerWithArgsAndMetrics(ctx, common.ModeRandom, args, nil, true)
46+
Expect(err).NotTo(HaveOccurred())
47+
defer s.unregisterPrometheus()
48+
49+
openaiclient := openai.NewClient(
50+
option.WithBaseURL(baseURL),
51+
option.WithHTTPClient(client))
52+
53+
params := openai.ChatCompletionNewParams{
54+
Messages: []openai.ChatCompletionMessageParamUnion{
55+
openai.UserMessage(userMessage),
56+
},
57+
Model: modelName,
58+
}
59+
60+
var wg sync.WaitGroup
61+
wg.Add(1)
62+
63+
for range 3 {
64+
go func() {
65+
defer GinkgoRecover()
66+
_, err := openaiclient.Chat.Completions.New(ctx, params)
67+
Expect(err).NotTo(HaveOccurred())
68+
}()
69+
}
70+
71+
go func() {
72+
defer wg.Done()
73+
defer GinkgoRecover()
74+
75+
time.Sleep(300 * time.Millisecond)
76+
metricsResp, err := client.Get("http://localhost/metrics")
77+
Expect(err).NotTo(HaveOccurred())
78+
Expect(metricsResp.StatusCode).To(Equal(http.StatusOK))
79+
80+
data, err := io.ReadAll(metricsResp.Body)
81+
Expect(err).NotTo(HaveOccurred())
82+
metrics := string(data)
83+
Expect(metrics).To(ContainSubstring("vllm:num_requests_running{model_name=\"testmodel\"} 2"))
84+
Expect(metrics).To(ContainSubstring("vllm:num_requests_waiting{model_name=\"testmodel\"} 1"))
85+
}()
86+
87+
wg.Wait()
88+
})
89+
90+
It("Should send correct lora metrics", func() {
91+
ctx := context.TODO()
92+
args := []string{"cmd", "--model", model, "--mode", common.ModeRandom,
93+
"--time-to-first-token", "3000",
94+
"--lora-modules", "{\"name\":\"lora1\",\"path\":\"/path/to/lora1\"}",
95+
"{\"name\":\"lora2\",\"path\":\"/path/to/lora2\"}"}
96+
97+
s, client, err := startServerWithArgsAndMetrics(ctx, common.ModeRandom, args, nil, true)
98+
Expect(err).NotTo(HaveOccurred())
99+
defer s.unregisterPrometheus()
100+
101+
openaiclient := openai.NewClient(
102+
option.WithBaseURL(baseURL),
103+
option.WithHTTPClient(client))
104+
105+
params1 := openai.ChatCompletionNewParams{
106+
Messages: []openai.ChatCompletionMessageParamUnion{
107+
openai.UserMessage(userMessage),
108+
},
109+
Model: "lora1",
110+
}
111+
112+
_, err = openaiclient.Chat.Completions.New(ctx, params1)
113+
Expect(err).NotTo(HaveOccurred())
114+
115+
params2 := openai.ChatCompletionNewParams{
116+
Messages: []openai.ChatCompletionMessageParamUnion{
117+
openai.UserMessage(userMessage),
118+
},
119+
Model: "lora2",
120+
}
121+
122+
_, err = openaiclient.Chat.Completions.New(ctx, params2)
123+
Expect(err).NotTo(HaveOccurred())
124+
125+
metricsResp, err := client.Get("http://localhost/metrics")
126+
Expect(err).NotTo(HaveOccurred())
127+
Expect(metricsResp.StatusCode).To(Equal(http.StatusOK))
128+
129+
data, err := io.ReadAll(metricsResp.Body)
130+
Expect(err).NotTo(HaveOccurred())
131+
metrics := string(data)
132+
133+
// We sent two sequentual requests to two different LoRAs, we expect to see (in this order)
134+
// 1. running_lora_adapter = lora1
135+
// 2. running_lora_adapter = lora2
136+
// 3. running_lora_adapter = {}
137+
lora1 := "vllm:lora_requests_info{max_lora=\"1\",running_lora_adapters=\"lora1\",waiting_lora_adapters=\"\"}"
138+
lora2 := "vllm:lora_requests_info{max_lora=\"1\",running_lora_adapters=\"lora2\",waiting_lora_adapters=\"\"}"
139+
empty := "vllm:lora_requests_info{max_lora=\"1\",running_lora_adapters=\"\",waiting_lora_adapters=\"\"}"
140+
141+
Expect(metrics).To(ContainSubstring(lora1))
142+
Expect(metrics).To(ContainSubstring(lora2))
143+
Expect(metrics).To(ContainSubstring(empty))
144+
145+
// Check the order
146+
lora1Timestamp := extractTimestamp(metrics, lora1)
147+
lora2Timestamp := extractTimestamp(metrics, lora2)
148+
noLorasTimestamp := extractTimestamp(metrics, empty)
149+
150+
Expect(lora1Timestamp < lora2Timestamp).To(BeTrue())
151+
Expect(lora2Timestamp < noLorasTimestamp).To(BeTrue())
152+
})
153+
154+
It("Should send correct lora metrics for parallel requests", func() {
155+
ctx := context.TODO()
156+
args := []string{"cmd", "--model", model, "--mode", common.ModeRandom,
157+
"--time-to-first-token", "2000",
158+
"--lora-modules", "{\"name\":\"lora1\",\"path\":\"/path/to/lora1\"}",
159+
"{\"name\":\"lora2\",\"path\":\"/path/to/lora2\"}"}
160+
161+
s, client, err := startServerWithArgsAndMetrics(ctx, common.ModeRandom, args, nil, true)
162+
Expect(err).NotTo(HaveOccurred())
163+
164+
defer s.unregisterPrometheus()
165+
166+
openaiclient := openai.NewClient(
167+
option.WithBaseURL(baseURL),
168+
option.WithHTTPClient(client))
169+
170+
params1 := openai.ChatCompletionNewParams{
171+
Messages: []openai.ChatCompletionMessageParamUnion{
172+
openai.UserMessage(userMessage),
173+
},
174+
Model: "lora1",
175+
}
176+
177+
params2 := openai.ChatCompletionNewParams{
178+
Messages: []openai.ChatCompletionMessageParamUnion{
179+
openai.UserMessage(userMessage),
180+
},
181+
Model: "lora2",
182+
}
183+
184+
var wg sync.WaitGroup
185+
wg.Add(1)
186+
187+
go func() {
188+
time.Sleep(1 * time.Second)
189+
defer wg.Done()
190+
defer GinkgoRecover()
191+
_, err := openaiclient.Chat.Completions.New(ctx, params2)
192+
Expect(err).NotTo(HaveOccurred())
193+
}()
194+
195+
_, err = openaiclient.Chat.Completions.New(ctx, params1)
196+
Expect(err).NotTo(HaveOccurred())
197+
198+
wg.Wait()
199+
200+
metricsResp, err := client.Get("http://localhost/metrics")
201+
Expect(err).NotTo(HaveOccurred())
202+
Expect(metricsResp.StatusCode).To(Equal(http.StatusOK))
203+
204+
data, err := io.ReadAll(metricsResp.Body)
205+
Expect(err).NotTo(HaveOccurred())
206+
metrics := string(data)
207+
208+
// We sent two parallel requests: first to lora1 and then to lora2 (with a delay), we expect
209+
// to see (in this order)
210+
// 1. running_lora_adapter = lora1
211+
// 2. running_lora_adapter = lora2,lora1 (the order of LoRAs doesn't matter here)
212+
// 3. running_lora_adapter = lora2
213+
// 4. running_lora_adapter = {}
214+
lora1 := "vllm:lora_requests_info{max_lora=\"1\",running_lora_adapters=\"lora1\",waiting_lora_adapters=\"\"}"
215+
lora12 := "vllm:lora_requests_info{max_lora=\"1\",running_lora_adapters=\"lora1,lora2\",waiting_lora_adapters=\"\"}"
216+
lora21 := "vllm:lora_requests_info{max_lora=\"1\",running_lora_adapters=\"lora2,lora1\",waiting_lora_adapters=\"\"}"
217+
lora2 := "vllm:lora_requests_info{max_lora=\"1\",running_lora_adapters=\"lora2\",waiting_lora_adapters=\"\"}"
218+
empty := "vllm:lora_requests_info{max_lora=\"1\",running_lora_adapters=\"\",waiting_lora_adapters=\"\"}"
219+
220+
Expect(metrics).To(ContainSubstring(lora1))
221+
Expect(metrics).To(Or(ContainSubstring(lora12), ContainSubstring(lora21)))
222+
Expect(metrics).To(ContainSubstring(lora2))
223+
Expect(metrics).To(ContainSubstring(empty))
224+
225+
// Check the order
226+
lora1Timestamp := extractTimestamp(metrics, lora1)
227+
lora2Timestamp := extractTimestamp(metrics, lora2)
228+
noLorasTimestamp := extractTimestamp(metrics, empty)
229+
var twoLorasTimestamp float64
230+
if strings.Contains(metrics, lora12) {
231+
twoLorasTimestamp = extractTimestamp(metrics, lora12)
232+
} else {
233+
twoLorasTimestamp = extractTimestamp(metrics, lora21)
234+
}
235+
Expect(lora1Timestamp < twoLorasTimestamp).To(BeTrue())
236+
Expect(twoLorasTimestamp < lora2Timestamp).To(BeTrue())
237+
Expect(lora2Timestamp < noLorasTimestamp).To(BeTrue())
238+
})
239+
240+
Context("fake metrics", func() {
241+
It("Should respond with fake metrics to /metrics", func() {
242+
ctx := context.TODO()
243+
args := []string{"cmd", "--model", model, "--mode", common.ModeRandom,
244+
"--fake-metrics",
245+
"{\"running-requests\":10,\"waiting-requests\":30,\"kv-cache-usage\":0.4,\"loras\":[{\"running\":\"lora4,lora2\",\"waiting\":\"lora3\",\"timestamp\":1257894567},{\"running\":\"lora4,lora3\",\"waiting\":\"\",\"timestamp\":1257894569}]}",
246+
}
247+
248+
s, client, err := startServerWithArgsAndMetrics(ctx, common.ModeRandom, args, nil, true)
249+
Expect(err).NotTo(HaveOccurred())
250+
251+
defer s.unregisterPrometheus()
252+
253+
resp, err := client.Get("http://localhost/metrics")
254+
Expect(err).NotTo(HaveOccurred())
255+
Expect(resp.StatusCode).To(Equal(http.StatusOK))
256+
257+
data, err := io.ReadAll(resp.Body)
258+
Expect(err).NotTo(HaveOccurred())
259+
metrics := string(data)
260+
Expect(metrics).To(ContainSubstring("vllm:num_requests_running{model_name=\"my_model\"} 10"))
261+
Expect(metrics).To(ContainSubstring("vllm:num_requests_waiting{model_name=\"my_model\"} 30"))
262+
Expect(metrics).To(ContainSubstring("vllm:gpu_cache_usage_perc{model_name=\"my_model\"} 0.4"))
263+
Expect(metrics).To(ContainSubstring("vllm:lora_requests_info{max_lora=\"1\",running_lora_adapters=\"lora4,lora2\",waiting_lora_adapters=\"lora3\"} 1.257894567e+09"))
264+
Expect(metrics).To(ContainSubstring("vllm:lora_requests_info{max_lora=\"1\",running_lora_adapters=\"lora4,lora3\",waiting_lora_adapters=\"\"} 1.257894569e+09"))
265+
})
266+
})
267+
})
268+
269+
func extractTimestamp(metrics string, key string) float64 {
270+
re := regexp.MustCompile(key + ` (\S+)`)
271+
result := re.FindStringSubmatch(metrics)
272+
Expect(len(result)).To(BeNumerically(">", 1))
273+
f, err := strconv.ParseFloat(result[1], 64)
274+
Expect(err).NotTo(HaveOccurred())
275+
return f
276+
}

0 commit comments

Comments
 (0)