Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion pkg/kv-cache/block_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ type blockCache struct {
maxBlocks int // maximum number of blocks in the cache
eventSender *KVEventSender // emmits kv events
eventChan chan EventData // channel for asynchronous event processing
usageChan chan float64 // channel for usage reporting
logger logr.Logger
}

// newBlockCache creates a new blockCache with the specified maximum number of blocks
func newBlockCache(config *common.Configuration, logger logr.Logger) (*blockCache, error) {
func newBlockCache(config *common.Configuration, logger logr.Logger, usageChan chan float64) (*blockCache, error) {
// TODO read size of channel from config
eChan := make(chan EventData, 10000)

Expand All @@ -63,6 +64,7 @@ func newBlockCache(config *common.Configuration, logger logr.Logger) (*blockCach
unusedBlocks: make(map[uint64]time.Time),
maxBlocks: config.KVCacheSize,
eventChan: eChan,
usageChan: usageChan,
eventSender: NewKVEventSender(publisher, createTopic(config), eChan, config.EventBatchSize, delay, logger),
logger: logger,
}, nil
Expand Down Expand Up @@ -149,6 +151,9 @@ func (bc *blockCache) startRequest(requestID string, blocks []uint64) (int, erro
bc.requestToBlocks[requestID] = make([]uint64, len(blocks))
copy(bc.requestToBlocks[requestID], blocks)

if bc.usageChan != nil {
bc.usageChan <- float64(len(bc.usedBlocks)) / float64(bc.maxBlocks)
}
return len(blockAreadyInUse) + len(blockToMoveToUsed), nil
}

Expand Down Expand Up @@ -182,6 +187,10 @@ func (bc *blockCache) finishRequest(requestID string) error {
}
}

if bc.usageChan != nil {
bc.usageChan <- float64(len(bc.usedBlocks)) / float64(bc.maxBlocks)
}

// Remove the request mapping
delete(bc.requestToBlocks, requestID)

Expand Down
15 changes: 9 additions & 6 deletions pkg/kv-cache/kv_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type KVCacheHelper struct {
blockSize int
}

func NewKVCacheHelper(config *common.Configuration, logger logr.Logger) (*KVCacheHelper, error) {
func NewKVCacheHelper(config *common.Configuration, logger logr.Logger, usageChan chan float64) (*KVCacheHelper, error) {
tokenProcConfig := kvblock.DefaultTokenProcessorConfig()
tokenProcConfig.BlockSize = config.TokenBlockSize
if config.HashSeed != "" {
Expand All @@ -51,7 +51,7 @@ func NewKVCacheHelper(config *common.Configuration, logger logr.Logger) (*KVCach
if err != nil {
return nil, fmt.Errorf("failed to create tokenizer: %w", err)
}
blockCache, err := newBlockCache(config, logger)
blockCache, err := newBlockCache(config, logger, usageChan)
if err != nil {
return nil, fmt.Errorf("failed to create block cache: %w", err)
}
Expand Down Expand Up @@ -92,11 +92,14 @@ func (h *KVCacheHelper) OnRequestStart(vllmReq openaiserverapi.CompletionRequest
blockHashes[i] = key.ChunkHash
}

nExistingBlocks, err := h.blockCache.startRequest(requestID, blockHashes)
vllmReq.SetNumberOfCachedPromptTokens(nExistingBlocks * h.blockSize)
nBlocksAlreadyInCache, err := h.blockCache.startRequest(requestID, blockHashes)
if err == nil {
vllmReq.SetNumberOfCachedPromptTokens(nBlocksAlreadyInCache * h.blockSize)
}

return err
}

func (h *KVCacheHelper) OnRequestEnd(vllmReq openaiserverapi.CompletionRequest) error {
return h.blockCache.finishRequest(vllmReq.GetRequestID())
func (h *KVCacheHelper) OnRequestEnd(requestID string) error {
return h.blockCache.finishRequest(requestID)
}
6 changes: 3 additions & 3 deletions pkg/kv-cache/kv_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ var _ = Describe("KV cache", Ordered, func() {
wg := sync.WaitGroup{}
wg.Add(1)

blockCache, err := newBlockCache(config, GinkgoLogr)
blockCache, err := newBlockCache(config, GinkgoLogr, nil)
Expect(err).NotTo(HaveOccurred())

go func() {
Expand Down Expand Up @@ -318,7 +318,7 @@ var _ = Describe("KV cache", Ordered, func() {
wg := sync.WaitGroup{}
wg.Add(1)

blockCache, err := newBlockCache(config, GinkgoLogr)
blockCache, err := newBlockCache(config, GinkgoLogr, nil)
Expect(err).NotTo(HaveOccurred())

go func() {
Expand Down Expand Up @@ -422,7 +422,7 @@ var _ = Describe("KV cache", Ordered, func() {
KVCacheSize: testCase.cacheSize,
ZMQMaxConnectAttempts: 3,
}
blockCache, err := newBlockCache(&config, GinkgoLogr)
blockCache, err := newBlockCache(&config, GinkgoLogr, nil)
Expect(err).NotTo(HaveOccurred())
var wg sync.WaitGroup

Expand Down
10 changes: 5 additions & 5 deletions pkg/llm-d-inference-sim/failures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ var _ = Describe("Failures", func() {
BeforeEach(func() {
ctx = context.Background()
var err error
client, err = startServerWithArgs(ctx, "failure", []string{
client, err = startServerWithArgs(ctx, "", []string{
"cmd", "--model", model,
"--failure-injection-rate", "100",
}, nil)
Expand Down Expand Up @@ -185,7 +185,7 @@ var _ = Describe("Failures", func() {
BeforeEach(func() {
ctx = context.Background()
var err error
client, err = startServerWithArgs(ctx, "failure", []string{
client, err = startServerWithArgs(ctx, "", []string{
"cmd", "--model", model,
"--failure-injection-rate", "100",
"--failure-types", common.FailureTypeRateLimit,
Expand Down Expand Up @@ -221,7 +221,7 @@ var _ = Describe("Failures", func() {
BeforeEach(func() {
ctx = context.Background()
var err error
client, err = startServerWithArgs(ctx, "failure", []string{
client, err = startServerWithArgs(ctx, "", []string{
"cmd", "--model", model,
"--failure-injection-rate", "100",
"--failure-types", common.FailureTypeInvalidAPIKey, common.FailureTypeServerError,
Expand Down Expand Up @@ -262,7 +262,7 @@ var _ = Describe("Failures", func() {
BeforeEach(func() {
ctx = context.Background()
var err error
client, err = startServerWithArgs(ctx, "failure", []string{
client, err = startServerWithArgs(ctx, "", []string{
"cmd", "--model", model,
"--failure-injection-rate", "0",
}, nil)
Expand Down Expand Up @@ -293,7 +293,7 @@ var _ = Describe("Failures", func() {
DescribeTable("should return correct error for each failure type",
func(failureType string, expectedStatusCode int, expectedErrorType string) {
ctx := context.Background()
client, err := startServerWithArgs(ctx, "failure", []string{
client, err := startServerWithArgs(ctx, "", []string{
"cmd", "--model", model,
"--failure-injection-rate", "100",
"--failure-types", failureType,
Expand Down
24 changes: 24 additions & 0 deletions pkg/llm-d-inference-sim/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,23 @@ func (s *VllmSimulator) reportWaitingRequests() {
}
}

// reportKVCacheUsage sets information about kv cache usage
func (s *VllmSimulator) reportKVCacheUsage(value float64) {
if s.config.FakeMetrics != nil {
return
}
if s.kvCacheUsagePercentage != nil {
s.kvCacheUsagePercentage.WithLabelValues(
s.getDisplayedModelName(s.config.Model)).Set(value)
}
}

// startMetricsUpdaters starts the various metrics updaters
func (s *VllmSimulator) startMetricsUpdaters(ctx context.Context) {
go s.waitingRequestsUpdater(ctx)
go s.runningRequestsUpdater(ctx)
go s.lorasUpdater(ctx)
go s.kvCacheUsageUpdater(ctx)
}

// waitingRequestsUpdater updates the waiting requests metric by listening on the relevant channel
Expand Down Expand Up @@ -214,6 +226,18 @@ func (s *VllmSimulator) runningRequestsUpdater(ctx context.Context) {
}
}

// kvCacheUsageUpdater updates the kv cache usage metric by listening on the relevant channel
func (s *VllmSimulator) kvCacheUsageUpdater(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case value := <-s.kvCacheUsageChan:
s.reportKVCacheUsage(value)
}
}
}

// lorasUpdater updates the running loras metric by listening on the relevant channel
// one function updates both waiting and running loras since they a part of the same prometheus gauge
func (s *VllmSimulator) lorasUpdater(ctx context.Context) {
Expand Down
156 changes: 156 additions & 0 deletions pkg/llm-d-inference-sim/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"io"
"net/http"
"os"
"regexp"
"sort"
"strconv"
Expand Down Expand Up @@ -314,6 +315,161 @@ var _ = Describe("Simulator metrics", Ordered, func() {
Expect(bothRunningTimestamp <= emptyTimestamp).To(BeTrue())
})

Context("kv cache metrics", func() {
tmpDir := "./tests-tmp/"
AfterAll(func() {
err := os.RemoveAll(tmpDir)
Expect(err).NotTo(HaveOccurred())
})
It("Should send correct kv cache usage metrics", func() {
modelName := "Qwen/Qwen2-0.5B"
// Three requests, there are should be two blocks in the kv cache, because
// the first and the second prompt share a block.
ctx := context.TODO()
args := []string{"cmd", "--model", modelName, "--mode", common.ModeRandom,
"--enable-kvcache", "true", "--kv-cache-size", "16", "--block-size", "8",
"--time-to-first-token", "5000", "--tokenizers-cache-dir", tmpDir}

client, err := startServerWithArgs(ctx, common.ModeRandom, args, nil)
Expect(err).NotTo(HaveOccurred())

openaiclient := openai.NewClient(
option.WithBaseURL(baseURL),
option.WithHTTPClient(client))

paramsArray := []openai.CompletionNewParams{
{
Prompt: openai.CompletionNewParamsPromptUnion{
OfString: openai.String("What is the weather like in Haifa today? Is it cold?"),
},
Model: openai.CompletionNewParamsModel(modelName),
},
{
Prompt: openai.CompletionNewParamsPromptUnion{
OfString: openai.String("What is the weather like in Haifa today?"),
},
Model: openai.CompletionNewParamsModel(modelName),
},
{
Prompt: openai.CompletionNewParamsPromptUnion{
OfString: openai.String("What is the weather like in New York today?"),
},
Model: openai.CompletionNewParamsModel(modelName),
},
}

for _, params := range paramsArray {
go func() {
defer GinkgoRecover()
_, err := openaiclient.Completions.New(ctx, params)
Expect(err).NotTo(HaveOccurred())
}()
}

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
defer GinkgoRecover()

time.Sleep(4 * time.Second)
metricsResp, err := client.Get(metricsUrl)
Expect(err).NotTo(HaveOccurred())
Expect(metricsResp.StatusCode).To(Equal(http.StatusOK))

data, err := io.ReadAll(metricsResp.Body)
Expect(err).NotTo(HaveOccurred())
metrics := string(data)
// Expect three running requests and two blocks in the kv cache - usage 2/16=0.125
Expect(metrics).To(ContainSubstring("vllm:num_requests_running{model_name=\"Qwen/Qwen2-0.5B\"} 3"))
Expect(metrics).To(ContainSubstring("vllm:num_requests_waiting{model_name=\"Qwen/Qwen2-0.5B\"} 0"))
Expect(metrics).To(ContainSubstring("vllm:gpu_cache_usage_perc{model_name=\"Qwen/Qwen2-0.5B\"} 0.125"))

time.Sleep(3 * time.Second)
metricsResp, err = client.Get(metricsUrl)
Expect(err).NotTo(HaveOccurred())
Expect(metricsResp.StatusCode).To(Equal(http.StatusOK))

data, err = io.ReadAll(metricsResp.Body)
Expect(err).NotTo(HaveOccurred())
metrics = string(data)
// The requests finished running, expect 0 usage
Expect(metrics).To(ContainSubstring("vllm:num_requests_running{model_name=\"Qwen/Qwen2-0.5B\"} 0"))
Expect(metrics).To(ContainSubstring("vllm:num_requests_waiting{model_name=\"Qwen/Qwen2-0.5B\"} 0"))
Expect(metrics).To(ContainSubstring("vllm:gpu_cache_usage_perc{model_name=\"Qwen/Qwen2-0.5B\"} 0"))
}()
wg.Wait()
})

It("Should send correct kv cache usage metrics for sequentual requests", func() {
modelName := "Qwen/Qwen2-0.5B"
ctx := context.TODO()
args := []string{"cmd", "--model", modelName, "--mode", common.ModeRandom,
"--enable-kvcache", "true", "--kv-cache-size", "16", "--block-size", "8",
"--time-to-first-token", "5000", "--tokenizers-cache-dir", tmpDir, "--max-num-seqs", "2"}

client, err := startServerWithArgs(ctx, common.ModeRandom, args, nil)
Expect(err).NotTo(HaveOccurred())

openaiclient := openai.NewClient(
option.WithBaseURL(baseURL),
option.WithHTTPClient(client))

paramsArray := []openai.CompletionNewParams{
{
Prompt: openai.CompletionNewParamsPromptUnion{
OfString: openai.String("What is the weather like in Haifa today? Is it cold?"),
},
Model: openai.CompletionNewParamsModel(modelName),
},
{
Prompt: openai.CompletionNewParamsPromptUnion{
OfString: openai.String("What is the weather like in Haifa today?"),
},
Model: openai.CompletionNewParamsModel(modelName),
},
{
Prompt: openai.CompletionNewParamsPromptUnion{
OfString: openai.String("What is the weather like in New York today?"),
},
Model: openai.CompletionNewParamsModel(modelName),
},
}

for i, params := range paramsArray {
go func() {
defer GinkgoRecover()
time.Sleep(time.Duration(i*500) * time.Millisecond)
_, err := openaiclient.Completions.New(ctx, params)
Expect(err).NotTo(HaveOccurred())
}()
}

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
defer GinkgoRecover()

time.Sleep(3 * time.Second)
metricsResp, err := client.Get(metricsUrl)
Expect(err).NotTo(HaveOccurred())
Expect(metricsResp.StatusCode).To(Equal(http.StatusOK))

data, err := io.ReadAll(metricsResp.Body)
Expect(err).NotTo(HaveOccurred())
metrics := string(data)
// The requests were sent with 500 millisecond intervals, and the first two should be still running.
// The third is waiting, and is still not in the kv-cache.
// We expect one block in the kv-cache, usage 1/16=0.0625.
Expect(metrics).To(ContainSubstring("vllm:num_requests_running{model_name=\"Qwen/Qwen2-0.5B\"} 2"))
Expect(metrics).To(ContainSubstring("vllm:num_requests_waiting{model_name=\"Qwen/Qwen2-0.5B\"} 1"))
Expect(metrics).To(ContainSubstring("vllm:gpu_cache_usage_perc{model_name=\"Qwen/Qwen2-0.5B\"} 0.0625"))
}()
wg.Wait()
})
})

Context("fake metrics", func() {
It("Should respond with fake metrics to /metrics", func() {
ctx := context.TODO()
Expand Down
Loading
Loading