Skip to content

Commit 40ec02c

Browse files
authored
KV cache usage metric (#192)
* KV cache usage metric, moved KV cache to request execution Signed-off-by: Ira <[email protected]> * Fixed lint error Signed-off-by: Ira <[email protected]> * Review Signed-off-by: Ira <[email protected]> --------- Signed-off-by: Ira <[email protected]>
1 parent 7550708 commit 40ec02c

File tree

9 files changed

+258
-57
lines changed

9 files changed

+258
-57
lines changed

pkg/kv-cache/block_cache.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,12 @@ type blockCache struct {
4040
maxBlocks int // maximum number of blocks in the cache
4141
eventSender *KVEventSender // emmits kv events
4242
eventChan chan EventData // channel for asynchronous event processing
43+
usageChan chan float64 // channel for usage reporting
4344
logger logr.Logger
4445
}
4546

4647
// newBlockCache creates a new blockCache with the specified maximum number of blocks
47-
func newBlockCache(config *common.Configuration, logger logr.Logger) (*blockCache, error) {
48+
func newBlockCache(config *common.Configuration, logger logr.Logger, usageChan chan float64) (*blockCache, error) {
4849
// TODO read size of channel from config
4950
eChan := make(chan EventData, 10000)
5051

@@ -63,6 +64,7 @@ func newBlockCache(config *common.Configuration, logger logr.Logger) (*blockCach
6364
unusedBlocks: make(map[uint64]time.Time),
6465
maxBlocks: config.KVCacheSize,
6566
eventChan: eChan,
67+
usageChan: usageChan,
6668
eventSender: NewKVEventSender(publisher, createTopic(config), eChan, config.EventBatchSize, delay, logger),
6769
logger: logger,
6870
}, nil
@@ -149,6 +151,9 @@ func (bc *blockCache) startRequest(requestID string, blocks []uint64) (int, erro
149151
bc.requestToBlocks[requestID] = make([]uint64, len(blocks))
150152
copy(bc.requestToBlocks[requestID], blocks)
151153

154+
if bc.usageChan != nil {
155+
bc.usageChan <- float64(len(bc.usedBlocks)) / float64(bc.maxBlocks)
156+
}
152157
return len(blockAreadyInUse) + len(blockToMoveToUsed), nil
153158
}
154159

@@ -182,6 +187,10 @@ func (bc *blockCache) finishRequest(requestID string) error {
182187
}
183188
}
184189

190+
if bc.usageChan != nil {
191+
bc.usageChan <- float64(len(bc.usedBlocks)) / float64(bc.maxBlocks)
192+
}
193+
185194
// Remove the request mapping
186195
delete(bc.requestToBlocks, requestID)
187196

pkg/kv-cache/kv_cache.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ type KVCacheHelper struct {
3535
blockSize int
3636
}
3737

38-
func NewKVCacheHelper(config *common.Configuration, logger logr.Logger) (*KVCacheHelper, error) {
38+
func NewKVCacheHelper(config *common.Configuration, logger logr.Logger, usageChan chan float64) (*KVCacheHelper, error) {
3939
tokenProcConfig := kvblock.DefaultTokenProcessorConfig()
4040
tokenProcConfig.BlockSize = config.TokenBlockSize
4141
if config.HashSeed != "" {
@@ -51,7 +51,7 @@ func NewKVCacheHelper(config *common.Configuration, logger logr.Logger) (*KVCach
5151
if err != nil {
5252
return nil, fmt.Errorf("failed to create tokenizer: %w", err)
5353
}
54-
blockCache, err := newBlockCache(config, logger)
54+
blockCache, err := newBlockCache(config, logger, usageChan)
5555
if err != nil {
5656
return nil, fmt.Errorf("failed to create block cache: %w", err)
5757
}
@@ -92,11 +92,14 @@ func (h *KVCacheHelper) OnRequestStart(vllmReq openaiserverapi.CompletionRequest
9292
blockHashes[i] = key.ChunkHash
9393
}
9494

95-
nExistingBlocks, err := h.blockCache.startRequest(requestID, blockHashes)
96-
vllmReq.SetNumberOfCachedPromptTokens(nExistingBlocks * h.blockSize)
95+
nBlocksAlreadyInCache, err := h.blockCache.startRequest(requestID, blockHashes)
96+
if err == nil {
97+
vllmReq.SetNumberOfCachedPromptTokens(nBlocksAlreadyInCache * h.blockSize)
98+
}
99+
97100
return err
98101
}
99102

100-
func (h *KVCacheHelper) OnRequestEnd(vllmReq openaiserverapi.CompletionRequest) error {
101-
return h.blockCache.finishRequest(vllmReq.GetRequestID())
103+
func (h *KVCacheHelper) OnRequestEnd(requestID string) error {
104+
return h.blockCache.finishRequest(requestID)
102105
}

pkg/kv-cache/kv_cache_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ var _ = Describe("KV cache", Ordered, func() {
216216
wg := sync.WaitGroup{}
217217
wg.Add(1)
218218

219-
blockCache, err := newBlockCache(config, GinkgoLogr)
219+
blockCache, err := newBlockCache(config, GinkgoLogr, nil)
220220
Expect(err).NotTo(HaveOccurred())
221221

222222
go func() {
@@ -318,7 +318,7 @@ var _ = Describe("KV cache", Ordered, func() {
318318
wg := sync.WaitGroup{}
319319
wg.Add(1)
320320

321-
blockCache, err := newBlockCache(config, GinkgoLogr)
321+
blockCache, err := newBlockCache(config, GinkgoLogr, nil)
322322
Expect(err).NotTo(HaveOccurred())
323323

324324
go func() {
@@ -422,7 +422,7 @@ var _ = Describe("KV cache", Ordered, func() {
422422
KVCacheSize: testCase.cacheSize,
423423
ZMQMaxConnectAttempts: 3,
424424
}
425-
blockCache, err := newBlockCache(&config, GinkgoLogr)
425+
blockCache, err := newBlockCache(&config, GinkgoLogr, nil)
426426
Expect(err).NotTo(HaveOccurred())
427427
var wg sync.WaitGroup
428428

pkg/llm-d-inference-sim/failures_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ var _ = Describe("Failures", func() {
127127
BeforeEach(func() {
128128
ctx = context.Background()
129129
var err error
130-
client, err = startServerWithArgs(ctx, "failure", []string{
130+
client, err = startServerWithArgs(ctx, "", []string{
131131
"cmd", "--model", model,
132132
"--failure-injection-rate", "100",
133133
}, nil)
@@ -185,7 +185,7 @@ var _ = Describe("Failures", func() {
185185
BeforeEach(func() {
186186
ctx = context.Background()
187187
var err error
188-
client, err = startServerWithArgs(ctx, "failure", []string{
188+
client, err = startServerWithArgs(ctx, "", []string{
189189
"cmd", "--model", model,
190190
"--failure-injection-rate", "100",
191191
"--failure-types", common.FailureTypeRateLimit,
@@ -221,7 +221,7 @@ var _ = Describe("Failures", func() {
221221
BeforeEach(func() {
222222
ctx = context.Background()
223223
var err error
224-
client, err = startServerWithArgs(ctx, "failure", []string{
224+
client, err = startServerWithArgs(ctx, "", []string{
225225
"cmd", "--model", model,
226226
"--failure-injection-rate", "100",
227227
"--failure-types", common.FailureTypeInvalidAPIKey, common.FailureTypeServerError,
@@ -262,7 +262,7 @@ var _ = Describe("Failures", func() {
262262
BeforeEach(func() {
263263
ctx = context.Background()
264264
var err error
265-
client, err = startServerWithArgs(ctx, "failure", []string{
265+
client, err = startServerWithArgs(ctx, "", []string{
266266
"cmd", "--model", model,
267267
"--failure-injection-rate", "0",
268268
}, nil)
@@ -293,7 +293,7 @@ var _ = Describe("Failures", func() {
293293
DescribeTable("should return correct error for each failure type",
294294
func(failureType string, expectedStatusCode int, expectedErrorType string) {
295295
ctx := context.Background()
296-
client, err := startServerWithArgs(ctx, "failure", []string{
296+
client, err := startServerWithArgs(ctx, "", []string{
297297
"cmd", "--model", model,
298298
"--failure-injection-rate", "100",
299299
"--failure-types", failureType,

pkg/llm-d-inference-sim/metrics.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,11 +181,23 @@ func (s *VllmSimulator) reportWaitingRequests() {
181181
}
182182
}
183183

184+
// reportKVCacheUsage sets information about kv cache usage
185+
func (s *VllmSimulator) reportKVCacheUsage(value float64) {
186+
if s.config.FakeMetrics != nil {
187+
return
188+
}
189+
if s.kvCacheUsagePercentage != nil {
190+
s.kvCacheUsagePercentage.WithLabelValues(
191+
s.getDisplayedModelName(s.config.Model)).Set(value)
192+
}
193+
}
194+
184195
// startMetricsUpdaters starts the various metrics updaters
185196
func (s *VllmSimulator) startMetricsUpdaters(ctx context.Context) {
186197
go s.waitingRequestsUpdater(ctx)
187198
go s.runningRequestsUpdater(ctx)
188199
go s.lorasUpdater(ctx)
200+
go s.kvCacheUsageUpdater(ctx)
189201
}
190202

191203
// waitingRequestsUpdater updates the waiting requests metric by listening on the relevant channel
@@ -214,6 +226,18 @@ func (s *VllmSimulator) runningRequestsUpdater(ctx context.Context) {
214226
}
215227
}
216228

229+
// kvCacheUsageUpdater updates the kv cache usage metric by listening on the relevant channel
230+
func (s *VllmSimulator) kvCacheUsageUpdater(ctx context.Context) {
231+
for {
232+
select {
233+
case <-ctx.Done():
234+
return
235+
case value := <-s.kvCacheUsageChan:
236+
s.reportKVCacheUsage(value)
237+
}
238+
}
239+
}
240+
217241
// lorasUpdater updates the running loras metric by listening on the relevant channel
218242
// one function updates both waiting and running loras since they a part of the same prometheus gauge
219243
func (s *VllmSimulator) lorasUpdater(ctx context.Context) {

pkg/llm-d-inference-sim/metrics_test.go

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"errors"
2222
"io"
2323
"net/http"
24+
"os"
2425
"regexp"
2526
"sort"
2627
"strconv"
@@ -314,6 +315,161 @@ var _ = Describe("Simulator metrics", Ordered, func() {
314315
Expect(bothRunningTimestamp <= emptyTimestamp).To(BeTrue())
315316
})
316317

318+
Context("kv cache metrics", func() {
319+
tmpDir := "./tests-tmp/"
320+
AfterAll(func() {
321+
err := os.RemoveAll(tmpDir)
322+
Expect(err).NotTo(HaveOccurred())
323+
})
324+
It("Should send correct kv cache usage metrics", func() {
325+
modelName := "Qwen/Qwen2-0.5B"
326+
// Three requests, there are should be two blocks in the kv cache, because
327+
// the first and the second prompt share a block.
328+
ctx := context.TODO()
329+
args := []string{"cmd", "--model", modelName, "--mode", common.ModeRandom,
330+
"--enable-kvcache", "true", "--kv-cache-size", "16", "--block-size", "8",
331+
"--time-to-first-token", "5000", "--tokenizers-cache-dir", tmpDir}
332+
333+
client, err := startServerWithArgs(ctx, common.ModeRandom, args, nil)
334+
Expect(err).NotTo(HaveOccurred())
335+
336+
openaiclient := openai.NewClient(
337+
option.WithBaseURL(baseURL),
338+
option.WithHTTPClient(client))
339+
340+
paramsArray := []openai.CompletionNewParams{
341+
{
342+
Prompt: openai.CompletionNewParamsPromptUnion{
343+
OfString: openai.String("What is the weather like in Haifa today? Is it cold?"),
344+
},
345+
Model: openai.CompletionNewParamsModel(modelName),
346+
},
347+
{
348+
Prompt: openai.CompletionNewParamsPromptUnion{
349+
OfString: openai.String("What is the weather like in Haifa today?"),
350+
},
351+
Model: openai.CompletionNewParamsModel(modelName),
352+
},
353+
{
354+
Prompt: openai.CompletionNewParamsPromptUnion{
355+
OfString: openai.String("What is the weather like in New York today?"),
356+
},
357+
Model: openai.CompletionNewParamsModel(modelName),
358+
},
359+
}
360+
361+
for _, params := range paramsArray {
362+
go func() {
363+
defer GinkgoRecover()
364+
_, err := openaiclient.Completions.New(ctx, params)
365+
Expect(err).NotTo(HaveOccurred())
366+
}()
367+
}
368+
369+
var wg sync.WaitGroup
370+
wg.Add(1)
371+
go func() {
372+
defer wg.Done()
373+
defer GinkgoRecover()
374+
375+
time.Sleep(4 * time.Second)
376+
metricsResp, err := client.Get(metricsUrl)
377+
Expect(err).NotTo(HaveOccurred())
378+
Expect(metricsResp.StatusCode).To(Equal(http.StatusOK))
379+
380+
data, err := io.ReadAll(metricsResp.Body)
381+
Expect(err).NotTo(HaveOccurred())
382+
metrics := string(data)
383+
// Expect three running requests and two blocks in the kv cache - usage 2/16=0.125
384+
Expect(metrics).To(ContainSubstring("vllm:num_requests_running{model_name=\"Qwen/Qwen2-0.5B\"} 3"))
385+
Expect(metrics).To(ContainSubstring("vllm:num_requests_waiting{model_name=\"Qwen/Qwen2-0.5B\"} 0"))
386+
Expect(metrics).To(ContainSubstring("vllm:gpu_cache_usage_perc{model_name=\"Qwen/Qwen2-0.5B\"} 0.125"))
387+
388+
time.Sleep(3 * time.Second)
389+
metricsResp, err = client.Get(metricsUrl)
390+
Expect(err).NotTo(HaveOccurred())
391+
Expect(metricsResp.StatusCode).To(Equal(http.StatusOK))
392+
393+
data, err = io.ReadAll(metricsResp.Body)
394+
Expect(err).NotTo(HaveOccurred())
395+
metrics = string(data)
396+
// The requests finished running, expect 0 usage
397+
Expect(metrics).To(ContainSubstring("vllm:num_requests_running{model_name=\"Qwen/Qwen2-0.5B\"} 0"))
398+
Expect(metrics).To(ContainSubstring("vllm:num_requests_waiting{model_name=\"Qwen/Qwen2-0.5B\"} 0"))
399+
Expect(metrics).To(ContainSubstring("vllm:gpu_cache_usage_perc{model_name=\"Qwen/Qwen2-0.5B\"} 0"))
400+
}()
401+
wg.Wait()
402+
})
403+
404+
It("Should send correct kv cache usage metrics for sequentual requests", func() {
405+
modelName := "Qwen/Qwen2-0.5B"
406+
ctx := context.TODO()
407+
args := []string{"cmd", "--model", modelName, "--mode", common.ModeRandom,
408+
"--enable-kvcache", "true", "--kv-cache-size", "16", "--block-size", "8",
409+
"--time-to-first-token", "5000", "--tokenizers-cache-dir", tmpDir, "--max-num-seqs", "2"}
410+
411+
client, err := startServerWithArgs(ctx, common.ModeRandom, args, nil)
412+
Expect(err).NotTo(HaveOccurred())
413+
414+
openaiclient := openai.NewClient(
415+
option.WithBaseURL(baseURL),
416+
option.WithHTTPClient(client))
417+
418+
paramsArray := []openai.CompletionNewParams{
419+
{
420+
Prompt: openai.CompletionNewParamsPromptUnion{
421+
OfString: openai.String("What is the weather like in Haifa today? Is it cold?"),
422+
},
423+
Model: openai.CompletionNewParamsModel(modelName),
424+
},
425+
{
426+
Prompt: openai.CompletionNewParamsPromptUnion{
427+
OfString: openai.String("What is the weather like in Haifa today?"),
428+
},
429+
Model: openai.CompletionNewParamsModel(modelName),
430+
},
431+
{
432+
Prompt: openai.CompletionNewParamsPromptUnion{
433+
OfString: openai.String("What is the weather like in New York today?"),
434+
},
435+
Model: openai.CompletionNewParamsModel(modelName),
436+
},
437+
}
438+
439+
for i, params := range paramsArray {
440+
go func() {
441+
defer GinkgoRecover()
442+
time.Sleep(time.Duration(i*500) * time.Millisecond)
443+
_, err := openaiclient.Completions.New(ctx, params)
444+
Expect(err).NotTo(HaveOccurred())
445+
}()
446+
}
447+
448+
var wg sync.WaitGroup
449+
wg.Add(1)
450+
go func() {
451+
defer wg.Done()
452+
defer GinkgoRecover()
453+
454+
time.Sleep(3 * time.Second)
455+
metricsResp, err := client.Get(metricsUrl)
456+
Expect(err).NotTo(HaveOccurred())
457+
Expect(metricsResp.StatusCode).To(Equal(http.StatusOK))
458+
459+
data, err := io.ReadAll(metricsResp.Body)
460+
Expect(err).NotTo(HaveOccurred())
461+
metrics := string(data)
462+
// The requests were sent with 500 millisecond intervals, and the first two should be still running.
463+
// The third is waiting, and is still not in the kv-cache.
464+
// We expect one block in the kv-cache, usage 1/16=0.0625.
465+
Expect(metrics).To(ContainSubstring("vllm:num_requests_running{model_name=\"Qwen/Qwen2-0.5B\"} 2"))
466+
Expect(metrics).To(ContainSubstring("vllm:num_requests_waiting{model_name=\"Qwen/Qwen2-0.5B\"} 1"))
467+
Expect(metrics).To(ContainSubstring("vllm:gpu_cache_usage_perc{model_name=\"Qwen/Qwen2-0.5B\"} 0.0625"))
468+
}()
469+
wg.Wait()
470+
})
471+
})
472+
317473
Context("fake metrics", func() {
318474
It("Should respond with fake metrics to /metrics", func() {
319475
ctx := context.TODO()

0 commit comments

Comments
 (0)