Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions pkg/epp/backend/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (
LoraInfoMaxAdaptersMetricName = "max_lora"

CacheConfigBlockSizeInfoMetricName = "block_size"
CacheConfigNumGPUBlocksMetricName = "num_gpu_blocks"
)

type PodMetricsClientImpl struct {
Expand Down Expand Up @@ -148,12 +149,16 @@ func (p *PodMetricsClientImpl) promToPodMetrics(
errs = multierr.Append(errs, err)
} else {
for _, v := range cacheMetrics.GetLabel() {
if v.GetName() == CacheConfigBlockSizeInfoMetricName {
switch v.GetName() {
case CacheConfigBlockSizeInfoMetricName:
updated.CacheBlockSize, err = strconv.Atoi(v.GetValue())
if err != nil {
errs = multierr.Append(errs, err)
} else {
break
}
case CacheConfigNumGPUBlocksMetricName:
updated.CacheNumGPUBlocks, err = strconv.Atoi(v.GetValue())
if err != nil {
errs = multierr.Append(errs, err)
}
}
}
Expand Down
89 changes: 89 additions & 0 deletions pkg/epp/backend/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,95 @@ func TestGetLatestLoraMetric(t *testing.T) {
}
}

func TestCacheConfigInfoMetrics(t *testing.T) {
testCases := []struct {
name string
metricFamilies map[string]*dto.MetricFamily
mapping *MetricMapping
existingMetrics *MetricsState
expectedMetrics *MetricsState
expectedErr error
}{
{
name: "successful cache config metrics",
metricFamilies: map[string]*dto.MetricFamily{
"vllm_cache_config": makeMetricFamily("vllm_cache_config",
makeMetric(map[string]string{"block_size": "16", "num_gpu_blocks": "1024"}, 1.0, 1000),
),
},
mapping: &MetricMapping{
CacheConfigInfo: &MetricSpec{MetricName: "vllm_cache_config"},
},
existingMetrics: &MetricsState{},
expectedMetrics: &MetricsState{
CacheBlockSize: 16,
CacheNumGPUBlocks: 1024,
},
expectedErr: nil,
},
{
name: "invalid block_size value",
metricFamilies: map[string]*dto.MetricFamily{
"vllm_cache_config": makeMetricFamily("vllm_cache_config",
makeMetric(map[string]string{"block_size": "invalid", "num_gpu_blocks": "1024"}, 1.0, 1000),
),
},
mapping: &MetricMapping{
CacheConfigInfo: &MetricSpec{MetricName: "vllm_cache_config"},
},
existingMetrics: &MetricsState{},
expectedMetrics: &MetricsState{
CacheNumGPUBlocks: 1024,
},
expectedErr: errors.New("strconv.Atoi: parsing \"invalid\": invalid syntax"),
},
{
name: "invalid num_gpu_blocks value",
metricFamilies: map[string]*dto.MetricFamily{
"vllm_cache_config": makeMetricFamily("vllm_cache_config",
makeMetric(map[string]string{"block_size": "16", "num_gpu_blocks": "invalid"}, 1.0, 1000),
),
},
mapping: &MetricMapping{
CacheConfigInfo: &MetricSpec{MetricName: "vllm_cache_config"},
},
existingMetrics: &MetricsState{},
expectedMetrics: &MetricsState{
CacheBlockSize: 16,
},
expectedErr: errors.New("strconv.Atoi: parsing \"invalid\": invalid syntax"),
},
{
name: "no cache config if not in MetricMapping",
metricFamilies: map[string]*dto.MetricFamily{
"vllm_cache_config": makeMetricFamily("vllm_cache_config",
makeMetric(map[string]string{"block_size": "16", "num_gpu_blocks": "1024"}, 1.0, 1000),
),
},
mapping: &MetricMapping{}, // No CacheConfigInfo defined
existingMetrics: &MetricsState{},
expectedMetrics: &MetricsState{},
expectedErr: nil,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
p := &PodMetricsClientImpl{MetricMapping: tc.mapping}
updated, err := p.promToPodMetrics(tc.metricFamilies, tc.existingMetrics)

if tc.expectedErr != nil {
assert.Error(t, err)
assert.Contains(t, err.Error(), tc.expectedErr.Error())
} else {
assert.NoError(t, err)
assert.Equal(t, tc.expectedMetrics.CacheBlockSize, updated.CacheBlockSize, "CacheBlockSize mismatch")
assert.Equal(t, tc.expectedMetrics.CacheNumGPUBlocks, updated.CacheNumGPUBlocks, "CacheNumGPUBlocks mismatch")
}
})
}
}

func TestPromToPodMetrics(t *testing.T) {
tests := []struct {
name string
Expand Down
3 changes: 3 additions & 0 deletions pkg/epp/datalayer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type Metrics struct {
KVCacheUsagePercent float64
KvCacheMaxTokenCapacity int
CacheBlockSize int
// Number of GPU blocks in the model server for KV Cache.
CacheNumGPUBlocks int

// UpdateTime records the last time when the metrics were updated.
UpdateTime time.Time
Expand Down Expand Up @@ -77,6 +79,7 @@ func (m *Metrics) Clone() *Metrics {
KVCacheUsagePercent: m.KVCacheUsagePercent,
KvCacheMaxTokenCapacity: m.KvCacheMaxTokenCapacity,
CacheBlockSize: m.CacheBlockSize,
CacheNumGPUBlocks: m.CacheNumGPUBlocks,
UpdateTime: m.UpdateTime,
}
}
13 changes: 11 additions & 2 deletions pkg/epp/datalayer/metrics/extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
LoraInfoMaxAdaptersMetricName = "max_lora"

CacheConfigBlockSizeInfoMetricName = "block_size"
CacheConfigNumGPUBlocksMetricName = "num_gpu_blocks"
)

// Extractor implements the metrics extraction based on the model
Expand Down Expand Up @@ -161,11 +162,19 @@ func populateLoRAMetrics(clone *datalayer.Metrics, metric *dto.Metric, errs *[]e
func populateCacheInfoMetrics(clone *datalayer.Metrics, metric *dto.Metric, errs *[]error) {
clone.CacheBlockSize = 0
for _, label := range metric.GetLabel() {
if label.GetName() == CacheConfigBlockSizeInfoMetricName {
switch label.GetName() {
case CacheConfigBlockSizeInfoMetricName:
if label.GetValue() != "" {
if val, err := strconv.Atoi(label.GetValue()); err == nil {
clone.CacheBlockSize = val
break
} else {
*errs = append(*errs, err)
}
}
case CacheConfigNumGPUBlocksMetricName:
if label.GetValue() != "" {
if val, err := strconv.Atoi(label.GetValue()); err == nil {
clone.CacheNumGPUBlocks = val
} else {
*errs = append(*errs, err)
}
Expand Down
40 changes: 22 additions & 18 deletions pkg/epp/scheduling/framework/plugins/multi/prefix/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,32 +31,36 @@ import (
// An indexer maintains an LRU cache of prompt prefix hashes and the server(s) that might have that
// prefix cached.
type indexer struct {
mu sync.RWMutex
hashToPods map[BlockHash]podSet // the lookup data structure to find pods that have the BlockHash cached
podToLRU map[ServerID]*lru.Cache[BlockHash, struct{}] // key is pod namespacedName, value is an LRU cache
maxLRUSize int
mu sync.RWMutex
hashToPods map[BlockHash]podSet // the lookup data structure to find pods that have the BlockHash cached
podToLRU map[ServerID]*lru.Cache[BlockHash, struct{}] // key is pod namespacedName, value is an LRU cache
defaultLRUSize int
}

// newIndexer initializes an indexer with size limits and starts cache size reporting.
func newIndexer(ctx context.Context, maxLRUSize int) *indexer {
func newIndexer(ctx context.Context, defaultLRUSize int) *indexer {
indexer := &indexer{
hashToPods: make(map[BlockHash]podSet),
podToLRU: make(map[ServerID]*lru.Cache[BlockHash, struct{}]),
maxLRUSize: maxLRUSize,
hashToPods: make(map[BlockHash]podSet),
podToLRU: make(map[ServerID]*lru.Cache[BlockHash, struct{}]),
defaultLRUSize: defaultLRUSize,
}

go indexer.reportLRUSize(ctx, time.Second)
return indexer
}

// Add adds a list of prefix hashes to the cache, tied to the server.
func (i *indexer) Add(hashes []BlockHash, pod ServerID) {
func (i *indexer) Add(hashes []BlockHash, pod Server) {
i.mu.Lock()
// Check if the LRU pod exist
lruForPod, exists := i.podToLRU[pod]
lruForPod, exists := i.podToLRU[pod.ServerID]
if !exists {
newLRU, _ := lru.NewWithEvict[BlockHash, struct{}](i.maxLRUSize, i.makeEvictionFn(pod))
i.podToLRU[pod] = newLRU
lruSize := pod.numOfGPUBlocks
if lruSize <= 0 {
lruSize = i.defaultLRUSize
}
newLRU, _ := lru.NewWithEvict(lruSize, i.makeEvictionFn(pod.ServerID))
i.podToLRU[pod.ServerID] = newLRU
lruForPod = newLRU
}

Expand All @@ -70,12 +74,12 @@ func (i *indexer) Add(hashes []BlockHash, pod ServerID) {
// Update hashToPods once under lock
i.mu.Lock()
for _, hash := range hashes {
pods := i.hashToPods[hash]
if pods == nil {
pods = make(podSet)
podIDs := i.hashToPods[hash]
if podIDs == nil {
podIDs = make(podSet)
}
pods[pod] = struct{}{}
i.hashToPods[hash] = pods
podIDs[pod.ServerID] = struct{}{}
i.hashToPods[hash] = podIDs
}

i.mu.Unlock()
Expand Down Expand Up @@ -143,7 +147,7 @@ func (i *indexer) reportLRUSize(ctx context.Context, interval time.Duration) {
"avg entries per pod", avg,
"pod with max cache", maxPodName,
"max pod size", maxPodEntries,
"global max LRU cache capacity per pod", i.maxLRUSize,
"global max LRU cache capacity per pod", i.defaultLRUSize,
)

i.mu.RUnlock()
Expand Down
39 changes: 21 additions & 18 deletions pkg/epp/scheduling/framework/plugins/multi/prefix/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,28 @@ import (
)

func TestIndexer_AddAndGet(t *testing.T) {
i := newIndexer(context.Background(), 2)
server := Server{
ServerID: ServerID{Namespace: "default", Name: "server1"},
numOfGPUBlocks: 2,
}
i := newIndexer(context.Background(), 3) // Initialize with an lruSize greater than server.numOfGPUBlocks to verify server-defined limits take precedence.

hash1 := BlockHash(1)
server := ServerID{Namespace: "default", Name: "server1"}
// Add an entry to the cache
i.Add([]BlockHash{hash1}, server)

// Retrieve the entry
assert.Equal(t, 1, i.podToLRU[server].Len(), "Cache size should be 1 after adding an entry")
assert.Equal(t, 1, i.podToLRU[server.ServerID].Len(), "Cache size should be 1 after adding an entry")
servers := i.Get(hash1)
assert.Contains(t, servers, server, "Cache should contain the added server")
assert.Contains(t, servers, server.ServerID, "Cache should contain the added server")

// Add another entry to the cache, the cache size should be incremented to 2.
i.Add([]BlockHash{BlockHash(2)}, server)
assert.Equal(t, 2, i.podToLRU[server].Len(), "Cache size should be 2 after adding an entry")
assert.Equal(t, 2, i.podToLRU[server.ServerID].Len(), "Cache size should be 2 after adding an entry")

// Add another entry to the cache, which should evict the first one due to max size.
i.Add([]BlockHash{BlockHash(3)}, server)
assert.Equal(t, 2, i.podToLRU[server].Len(), "Cache size should still be 2 after adding an entry")
assert.Equal(t, 2, i.podToLRU[server.ServerID].Len(), "Cache size should still be 2 after adding an entry")

servers = i.Get(BlockHash(4))
assert.Empty(t, servers, "Cache should not contain non-existent hash")
Expand All @@ -52,8 +55,8 @@ func TestIndexer_RemovePodAndEviction(t *testing.T) {

i := newIndexer(context.Background(), indexerSize)

server1 := ServerID{Namespace: "default", Name: "server1"}
server2 := ServerID{Namespace: "default", Name: "server2"}
server1 := Server{ServerID: ServerID{Namespace: "default", Name: "server1"}}
server2 := Server{ServerID: ServerID{Namespace: "default", Name: "server2"}}

// Add indexerSize hashes to both servers
var hashes []BlockHash
Expand All @@ -65,15 +68,15 @@ func TestIndexer_RemovePodAndEviction(t *testing.T) {
}

// Ensure all entries are added
assert.Equal(t, indexerSize, i.podToLRU[server1].Len(), "server1 should have 10 entries")
assert.Equal(t, indexerSize, i.podToLRU[server2].Len(), "server2 should have 10 entries")
assert.Equal(t, indexerSize, i.podToLRU[server1.ServerID].Len(), "server1 should have 10 entries")
assert.Equal(t, indexerSize, i.podToLRU[server2.ServerID].Len(), "server2 should have 10 entries")

// Ensure each hash in hashToPods maps to both server1 and server2
for _, h := range hashes {
pods := i.hashToPods[h]
assert.Len(t, pods, 2, "Each hash should be associated with exactly 2 pods")
assert.Contains(t, pods, server1, "hash should be associated with server1")
assert.Contains(t, pods, server2, "hash should be associated with server2")
assert.Contains(t, pods, server1.ServerID, "hash should be associated with server1")
assert.Contains(t, pods, server2.ServerID, "hash should be associated with server2")
}

// Add indexerSize hash to server1 → should evict BlockHash(0)
Expand All @@ -82,25 +85,25 @@ func TestIndexer_RemovePodAndEviction(t *testing.T) {
i.Add([]BlockHash{newHash}, server1)

// server1 LRU should still be at max capacity
assert.Equal(t, indexerSize, i.podToLRU[server1].Len(), "server1 LRU should maintain max size")
assert.Equal(t, indexerSize, i.podToLRU[server1.ServerID].Len(), "server1 LRU should maintain max size")

// BlockHash(0) should no longer have server1 in hashToPods
pods := i.Get(evictedHash)
assert.NotContains(t, pods, server1, "server1 should be evicted from hashToPods for hash 0")
assert.Contains(t, pods, server2, "server2 should still have hash 0")
assert.NotContains(t, pods, server1.ServerID, "server1 should be evicted from hashToPods for hash 0")
assert.Contains(t, pods, server2.ServerID, "server2 should still have hash 0")

// Remove server2
i.RemovePod(server2)
i.RemovePod(server2.ServerID)

// hashToPods for hash 0 should now be empty
pods = i.Get(evictedHash)
assert.NotContains(t, pods, server2, "server2 should be removed from hash 0")
assert.NotContains(t, pods, server2.ServerID, "server2 should be removed from hash 0")
assert.Empty(t, pods, "hash 0 should have no pods after both eviction and removal")

// All remaining hashes should map only to server1
for hash, pods := range i.hashToPods {
assert.Len(t, pods, 1, "hash %v should have only 1 pod after server2 removal", hash)
assert.Contains(t, pods, server1, "hash %v should only contain server1", hash)
assert.Contains(t, pods, server1.ServerID, "hash %v should only contain server1", hash)
}

// Ensure hashToPods contains exactly indexerSize hashes (post-eviction and server2 removal)
Expand Down
13 changes: 11 additions & 2 deletions pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,19 @@ type podSet map[ServerID]struct{}

type Indexer interface {
Get(hash BlockHash) podSet
Add(hashes []BlockHash, server ServerID)
Add(hashes []BlockHash, server Server)
RemovePod(server ServerID)
Pods() []ServerID
}

// BlockHash is a hash of the block of request body.
type BlockHash uint64

type Server struct {
ServerID
numOfGPUBlocks int
}

type ServerID k8stypes.NamespacedName

func (s ServerID) String() string {
Expand Down Expand Up @@ -224,6 +229,7 @@ func (p *Plugin) Score(ctx context.Context, cycleState *types.CycleState, reques
func (p *Plugin) PreRequest(ctx context.Context, request *types.LLMRequest, schedulingResult *types.SchedulingResult) {
primaryProfileResult := schedulingResult.ProfileResults[schedulingResult.PrimaryProfileName]
targetPod := primaryProfileResult.TargetPods[0].GetPod() // get the first pod of the primary profile
gpuBlocks := primaryProfileResult.TargetPods[0].GetMetrics().CacheNumGPUBlocks

state, err := plugins.ReadPluginStateKey[*SchedulingContextState](p.pluginState, request.RequestId, plugins.StateKey(p.TypedName().String()))
p.pluginState.Delete(request.RequestId) // delete the state explicitly after completing using it
Expand All @@ -238,7 +244,10 @@ func (p *Plugin) PreRequest(ctx context.Context, request *types.LLMRequest, sche
// WaitGroup is added to the Plugin struct to allow waiting in tests.
p.wg.Add(1)
go func() {
p.indexer.Add(state.PrefixHashes, ServerID(targetPod.NamespacedName))
p.indexer.Add(state.PrefixHashes, Server{
ServerID(targetPod.NamespacedName),
gpuBlocks,
})
p.wg.Done()
}()

Expand Down