diff --git a/pkg/epp/backend/metrics/metrics.go b/pkg/epp/backend/metrics/metrics.go index 0fd2c6f46..5d2a85e96 100644 --- a/pkg/epp/backend/metrics/metrics.go +++ b/pkg/epp/backend/metrics/metrics.go @@ -38,6 +38,7 @@ const ( LoraInfoMaxAdaptersMetricName = "max_lora" CacheConfigBlockSizeInfoMetricName = "block_size" + CacheConfigNumGPUBlocksMetricName = "num_gpu_blocks" ) type PodMetricsClientImpl struct { @@ -148,12 +149,16 @@ func (p *PodMetricsClientImpl) promToPodMetrics( errs = multierr.Append(errs, err) } else { for _, v := range cacheMetrics.GetLabel() { - if v.GetName() == CacheConfigBlockSizeInfoMetricName { + switch v.GetName() { + case CacheConfigBlockSizeInfoMetricName: updated.CacheBlockSize, err = strconv.Atoi(v.GetValue()) if err != nil { errs = multierr.Append(errs, err) - } else { - break + } + case CacheConfigNumGPUBlocksMetricName: + updated.CacheNumGPUBlocks, err = strconv.Atoi(v.GetValue()) + if err != nil { + errs = multierr.Append(errs, err) } } } diff --git a/pkg/epp/backend/metrics/metrics_test.go b/pkg/epp/backend/metrics/metrics_test.go index 502ad6f09..f1256ec6b 100644 --- a/pkg/epp/backend/metrics/metrics_test.go +++ b/pkg/epp/backend/metrics/metrics_test.go @@ -373,6 +373,95 @@ func TestGetLatestLoraMetric(t *testing.T) { } } +func TestCacheConfigInfoMetrics(t *testing.T) { + testCases := []struct { + name string + metricFamilies map[string]*dto.MetricFamily + mapping *MetricMapping + existingMetrics *MetricsState + expectedMetrics *MetricsState + expectedErr error + }{ + { + name: "successful cache config metrics", + metricFamilies: map[string]*dto.MetricFamily{ + "vllm_cache_config": makeMetricFamily("vllm_cache_config", + makeMetric(map[string]string{"block_size": "16", "num_gpu_blocks": "1024"}, 1.0, 1000), + ), + }, + mapping: &MetricMapping{ + CacheConfigInfo: &MetricSpec{MetricName: "vllm_cache_config"}, + }, + existingMetrics: &MetricsState{}, + expectedMetrics: &MetricsState{ + CacheBlockSize: 16, + CacheNumGPUBlocks: 1024, + }, + expectedErr: nil, + }, + { + name: "invalid block_size value", + metricFamilies: map[string]*dto.MetricFamily{ + "vllm_cache_config": makeMetricFamily("vllm_cache_config", + makeMetric(map[string]string{"block_size": "invalid", "num_gpu_blocks": "1024"}, 1.0, 1000), + ), + }, + mapping: &MetricMapping{ + CacheConfigInfo: &MetricSpec{MetricName: "vllm_cache_config"}, + }, + existingMetrics: &MetricsState{}, + expectedMetrics: &MetricsState{ + CacheNumGPUBlocks: 1024, + }, + expectedErr: errors.New("strconv.Atoi: parsing \"invalid\": invalid syntax"), + }, + { + name: "invalid num_gpu_blocks value", + metricFamilies: map[string]*dto.MetricFamily{ + "vllm_cache_config": makeMetricFamily("vllm_cache_config", + makeMetric(map[string]string{"block_size": "16", "num_gpu_blocks": "invalid"}, 1.0, 1000), + ), + }, + mapping: &MetricMapping{ + CacheConfigInfo: &MetricSpec{MetricName: "vllm_cache_config"}, + }, + existingMetrics: &MetricsState{}, + expectedMetrics: &MetricsState{ + CacheBlockSize: 16, + }, + expectedErr: errors.New("strconv.Atoi: parsing \"invalid\": invalid syntax"), + }, + { + name: "no cache config if not in MetricMapping", + metricFamilies: map[string]*dto.MetricFamily{ + "vllm_cache_config": makeMetricFamily("vllm_cache_config", + makeMetric(map[string]string{"block_size": "16", "num_gpu_blocks": "1024"}, 1.0, 1000), + ), + }, + mapping: &MetricMapping{}, // No CacheConfigInfo defined + existingMetrics: &MetricsState{}, + expectedMetrics: &MetricsState{}, + expectedErr: nil, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + p := &PodMetricsClientImpl{MetricMapping: tc.mapping} + updated, err := p.promToPodMetrics(tc.metricFamilies, tc.existingMetrics) + + if tc.expectedErr != nil { + assert.Error(t, err) + assert.Contains(t, err.Error(), tc.expectedErr.Error()) + } else { + assert.NoError(t, err) + assert.Equal(t, tc.expectedMetrics.CacheBlockSize, updated.CacheBlockSize, "CacheBlockSize mismatch") + assert.Equal(t, tc.expectedMetrics.CacheNumGPUBlocks, updated.CacheNumGPUBlocks, "CacheNumGPUBlocks mismatch") + } + }) + } +} + func TestPromToPodMetrics(t *testing.T) { tests := []struct { name string diff --git a/pkg/epp/datalayer/metrics.go b/pkg/epp/datalayer/metrics.go index 2febcb4d0..7deecb9a3 100644 --- a/pkg/epp/datalayer/metrics.go +++ b/pkg/epp/datalayer/metrics.go @@ -33,6 +33,8 @@ type Metrics struct { KVCacheUsagePercent float64 KvCacheMaxTokenCapacity int CacheBlockSize int + // Number of GPU blocks in the model server for KV Cache. + CacheNumGPUBlocks int // UpdateTime records the last time when the metrics were updated. UpdateTime time.Time @@ -77,6 +79,7 @@ func (m *Metrics) Clone() *Metrics { KVCacheUsagePercent: m.KVCacheUsagePercent, KvCacheMaxTokenCapacity: m.KvCacheMaxTokenCapacity, CacheBlockSize: m.CacheBlockSize, + CacheNumGPUBlocks: m.CacheNumGPUBlocks, UpdateTime: m.UpdateTime, } } diff --git a/pkg/epp/datalayer/metrics/extractor.go b/pkg/epp/datalayer/metrics/extractor.go index 562f2223b..52bb814f0 100644 --- a/pkg/epp/datalayer/metrics/extractor.go +++ b/pkg/epp/datalayer/metrics/extractor.go @@ -39,6 +39,7 @@ const ( LoraInfoMaxAdaptersMetricName = "max_lora" CacheConfigBlockSizeInfoMetricName = "block_size" + CacheConfigNumGPUBlocksMetricName = "num_gpu_blocks" ) // Extractor implements the metrics extraction based on the model @@ -161,11 +162,19 @@ func populateLoRAMetrics(clone *datalayer.Metrics, metric *dto.Metric, errs *[]e func populateCacheInfoMetrics(clone *datalayer.Metrics, metric *dto.Metric, errs *[]error) { clone.CacheBlockSize = 0 for _, label := range metric.GetLabel() { - if label.GetName() == CacheConfigBlockSizeInfoMetricName { + switch label.GetName() { + case CacheConfigBlockSizeInfoMetricName: if label.GetValue() != "" { if val, err := strconv.Atoi(label.GetValue()); err == nil { clone.CacheBlockSize = val - break + } else { + *errs = append(*errs, err) + } + } + case CacheConfigNumGPUBlocksMetricName: + if label.GetValue() != "" { + if val, err := strconv.Atoi(label.GetValue()); err == nil { + clone.CacheNumGPUBlocks = val } else { *errs = append(*errs, err) } diff --git a/pkg/epp/scheduling/framework/plugins/multi/prefix/indexer.go b/pkg/epp/scheduling/framework/plugins/multi/prefix/indexer.go index 8b68132dc..98de269bc 100644 --- a/pkg/epp/scheduling/framework/plugins/multi/prefix/indexer.go +++ b/pkg/epp/scheduling/framework/plugins/multi/prefix/indexer.go @@ -31,18 +31,18 @@ import ( // An indexer maintains an LRU cache of prompt prefix hashes and the server(s) that might have that // prefix cached. type indexer struct { - mu sync.RWMutex - hashToPods map[BlockHash]podSet // the lookup data structure to find pods that have the BlockHash cached - podToLRU map[ServerID]*lru.Cache[BlockHash, struct{}] // key is pod namespacedName, value is an LRU cache - maxLRUSize int + mu sync.RWMutex + hashToPods map[BlockHash]podSet // the lookup data structure to find pods that have the BlockHash cached + podToLRU map[ServerID]*lru.Cache[BlockHash, struct{}] // key is pod namespacedName, value is an LRU cache + defaultLRUSize int } // newIndexer initializes an indexer with size limits and starts cache size reporting. -func newIndexer(ctx context.Context, maxLRUSize int) *indexer { +func newIndexer(ctx context.Context, defaultLRUSize int) *indexer { indexer := &indexer{ - hashToPods: make(map[BlockHash]podSet), - podToLRU: make(map[ServerID]*lru.Cache[BlockHash, struct{}]), - maxLRUSize: maxLRUSize, + hashToPods: make(map[BlockHash]podSet), + podToLRU: make(map[ServerID]*lru.Cache[BlockHash, struct{}]), + defaultLRUSize: defaultLRUSize, } go indexer.reportLRUSize(ctx, time.Second) @@ -50,13 +50,17 @@ func newIndexer(ctx context.Context, maxLRUSize int) *indexer { } // Add adds a list of prefix hashes to the cache, tied to the server. -func (i *indexer) Add(hashes []BlockHash, pod ServerID) { +func (i *indexer) Add(hashes []BlockHash, pod Server) { i.mu.Lock() // Check if the LRU pod exist - lruForPod, exists := i.podToLRU[pod] + lruForPod, exists := i.podToLRU[pod.ServerID] if !exists { - newLRU, _ := lru.NewWithEvict[BlockHash, struct{}](i.maxLRUSize, i.makeEvictionFn(pod)) - i.podToLRU[pod] = newLRU + lruSize := pod.numOfGPUBlocks + if lruSize <= 0 { + lruSize = i.defaultLRUSize + } + newLRU, _ := lru.NewWithEvict(lruSize, i.makeEvictionFn(pod.ServerID)) + i.podToLRU[pod.ServerID] = newLRU lruForPod = newLRU } @@ -70,12 +74,12 @@ func (i *indexer) Add(hashes []BlockHash, pod ServerID) { // Update hashToPods once under lock i.mu.Lock() for _, hash := range hashes { - pods := i.hashToPods[hash] - if pods == nil { - pods = make(podSet) + podIDs := i.hashToPods[hash] + if podIDs == nil { + podIDs = make(podSet) } - pods[pod] = struct{}{} - i.hashToPods[hash] = pods + podIDs[pod.ServerID] = struct{}{} + i.hashToPods[hash] = podIDs } i.mu.Unlock() @@ -143,7 +147,7 @@ func (i *indexer) reportLRUSize(ctx context.Context, interval time.Duration) { "avg entries per pod", avg, "pod with max cache", maxPodName, "max pod size", maxPodEntries, - "global max LRU cache capacity per pod", i.maxLRUSize, + "global max LRU cache capacity per pod", i.defaultLRUSize, ) i.mu.RUnlock() diff --git a/pkg/epp/scheduling/framework/plugins/multi/prefix/indexer_test.go b/pkg/epp/scheduling/framework/plugins/multi/prefix/indexer_test.go index c35af8e27..c512c1a06 100644 --- a/pkg/epp/scheduling/framework/plugins/multi/prefix/indexer_test.go +++ b/pkg/epp/scheduling/framework/plugins/multi/prefix/indexer_test.go @@ -23,25 +23,28 @@ import ( ) func TestIndexer_AddAndGet(t *testing.T) { - i := newIndexer(context.Background(), 2) + server := Server{ + ServerID: ServerID{Namespace: "default", Name: "server1"}, + numOfGPUBlocks: 2, + } + i := newIndexer(context.Background(), 3) // Initialize with an lruSize greater than server.numOfGPUBlocks to verify server-defined limits take precedence. hash1 := BlockHash(1) - server := ServerID{Namespace: "default", Name: "server1"} // Add an entry to the cache i.Add([]BlockHash{hash1}, server) // Retrieve the entry - assert.Equal(t, 1, i.podToLRU[server].Len(), "Cache size should be 1 after adding an entry") + assert.Equal(t, 1, i.podToLRU[server.ServerID].Len(), "Cache size should be 1 after adding an entry") servers := i.Get(hash1) - assert.Contains(t, servers, server, "Cache should contain the added server") + assert.Contains(t, servers, server.ServerID, "Cache should contain the added server") // Add another entry to the cache, the cache size should be incremented to 2. i.Add([]BlockHash{BlockHash(2)}, server) - assert.Equal(t, 2, i.podToLRU[server].Len(), "Cache size should be 2 after adding an entry") + assert.Equal(t, 2, i.podToLRU[server.ServerID].Len(), "Cache size should be 2 after adding an entry") // Add another entry to the cache, which should evict the first one due to max size. i.Add([]BlockHash{BlockHash(3)}, server) - assert.Equal(t, 2, i.podToLRU[server].Len(), "Cache size should still be 2 after adding an entry") + assert.Equal(t, 2, i.podToLRU[server.ServerID].Len(), "Cache size should still be 2 after adding an entry") servers = i.Get(BlockHash(4)) assert.Empty(t, servers, "Cache should not contain non-existent hash") @@ -52,8 +55,8 @@ func TestIndexer_RemovePodAndEviction(t *testing.T) { i := newIndexer(context.Background(), indexerSize) - server1 := ServerID{Namespace: "default", Name: "server1"} - server2 := ServerID{Namespace: "default", Name: "server2"} + server1 := Server{ServerID: ServerID{Namespace: "default", Name: "server1"}} + server2 := Server{ServerID: ServerID{Namespace: "default", Name: "server2"}} // Add indexerSize hashes to both servers var hashes []BlockHash @@ -65,15 +68,15 @@ func TestIndexer_RemovePodAndEviction(t *testing.T) { } // Ensure all entries are added - assert.Equal(t, indexerSize, i.podToLRU[server1].Len(), "server1 should have 10 entries") - assert.Equal(t, indexerSize, i.podToLRU[server2].Len(), "server2 should have 10 entries") + assert.Equal(t, indexerSize, i.podToLRU[server1.ServerID].Len(), "server1 should have 10 entries") + assert.Equal(t, indexerSize, i.podToLRU[server2.ServerID].Len(), "server2 should have 10 entries") // Ensure each hash in hashToPods maps to both server1 and server2 for _, h := range hashes { pods := i.hashToPods[h] assert.Len(t, pods, 2, "Each hash should be associated with exactly 2 pods") - assert.Contains(t, pods, server1, "hash should be associated with server1") - assert.Contains(t, pods, server2, "hash should be associated with server2") + assert.Contains(t, pods, server1.ServerID, "hash should be associated with server1") + assert.Contains(t, pods, server2.ServerID, "hash should be associated with server2") } // Add indexerSize hash to server1 → should evict BlockHash(0) @@ -82,25 +85,25 @@ func TestIndexer_RemovePodAndEviction(t *testing.T) { i.Add([]BlockHash{newHash}, server1) // server1 LRU should still be at max capacity - assert.Equal(t, indexerSize, i.podToLRU[server1].Len(), "server1 LRU should maintain max size") + assert.Equal(t, indexerSize, i.podToLRU[server1.ServerID].Len(), "server1 LRU should maintain max size") // BlockHash(0) should no longer have server1 in hashToPods pods := i.Get(evictedHash) - assert.NotContains(t, pods, server1, "server1 should be evicted from hashToPods for hash 0") - assert.Contains(t, pods, server2, "server2 should still have hash 0") + assert.NotContains(t, pods, server1.ServerID, "server1 should be evicted from hashToPods for hash 0") + assert.Contains(t, pods, server2.ServerID, "server2 should still have hash 0") // Remove server2 - i.RemovePod(server2) + i.RemovePod(server2.ServerID) // hashToPods for hash 0 should now be empty pods = i.Get(evictedHash) - assert.NotContains(t, pods, server2, "server2 should be removed from hash 0") + assert.NotContains(t, pods, server2.ServerID, "server2 should be removed from hash 0") assert.Empty(t, pods, "hash 0 should have no pods after both eviction and removal") // All remaining hashes should map only to server1 for hash, pods := range i.hashToPods { assert.Len(t, pods, 1, "hash %v should have only 1 pod after server2 removal", hash) - assert.Contains(t, pods, server1, "hash %v should only contain server1", hash) + assert.Contains(t, pods, server1.ServerID, "hash %v should only contain server1", hash) } // Ensure hashToPods contains exactly indexerSize hashes (post-eviction and server2 removal) diff --git a/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go b/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go index c58c16791..9def3e4e9 100644 --- a/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go +++ b/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go @@ -96,7 +96,7 @@ type podSet map[ServerID]struct{} type Indexer interface { Get(hash BlockHash) podSet - Add(hashes []BlockHash, server ServerID) + Add(hashes []BlockHash, server Server) RemovePod(server ServerID) Pods() []ServerID } @@ -104,6 +104,11 @@ type Indexer interface { // BlockHash is a hash of the block of request body. type BlockHash uint64 +type Server struct { + ServerID + numOfGPUBlocks int +} + type ServerID k8stypes.NamespacedName func (s ServerID) String() string { @@ -224,6 +229,7 @@ func (p *Plugin) Score(ctx context.Context, cycleState *types.CycleState, reques func (p *Plugin) PreRequest(ctx context.Context, request *types.LLMRequest, schedulingResult *types.SchedulingResult) { primaryProfileResult := schedulingResult.ProfileResults[schedulingResult.PrimaryProfileName] targetPod := primaryProfileResult.TargetPods[0].GetPod() // get the first pod of the primary profile + gpuBlocks := primaryProfileResult.TargetPods[0].GetMetrics().CacheNumGPUBlocks state, err := plugins.ReadPluginStateKey[*SchedulingContextState](p.pluginState, request.RequestId, plugins.StateKey(p.TypedName().String())) p.pluginState.Delete(request.RequestId) // delete the state explicitly after completing using it @@ -238,7 +244,10 @@ func (p *Plugin) PreRequest(ctx context.Context, request *types.LLMRequest, sche // WaitGroup is added to the Plugin struct to allow waiting in tests. p.wg.Add(1) go func() { - p.indexer.Add(state.PrefixHashes, ServerID(targetPod.NamespacedName)) + p.indexer.Add(state.PrefixHashes, Server{ + ServerID(targetPod.NamespacedName), + gpuBlocks, + }) p.wg.Done() }()