Skip to content

Commit 3150709

Browse files
learner0810BenjaminBraunDev
authored andcommitted
Auto-configure prefix-cache-scorer parameters from engine metrics (kubernetes-sigs#1629)
1 parent 138f705 commit 3150709

File tree

9 files changed

+116
-20
lines changed

9 files changed

+116
-20
lines changed

cmd/epp/runner/runner.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ var (
113113
kvCacheUsagePercentageMetric = flag.String("kv-cache-usage-percentage-metric", runserver.DefaultKvCacheUsagePercentageMetric, "Prometheus metric for the fraction of KV-cache blocks currently in use (from 0 to 1).")
114114
// LoRA metrics
115115
loraInfoMetric = flag.String("lora-info-metric", runserver.DefaultLoraInfoMetric, "Prometheus metric for the LoRA info metrics (must be in vLLM label format).")
116+
// Cache info metrics
117+
cacheInfoMetric = flag.String("cache-info-metric", runserver.DefaultCacheInfoMetric, "Prometheus metric for the cache info metrics.")
116118
// metrics related flags
117119
refreshMetricsInterval = flag.Duration("refresh-metrics-interval", runserver.DefaultRefreshMetricsInterval, "interval to refresh metrics")
118120
refreshPrometheusMetricsInterval = flag.Duration("refresh-prometheus-metrics-interval", runserver.DefaultRefreshPrometheusMetricsInterval, "interval to flush prometheus metrics")
@@ -481,6 +483,7 @@ func setupMetricsV1(setupLog logr.Logger) (datalayer.EndpointFactory, error) {
481483
*totalRunningRequestsMetric,
482484
*kvCacheUsagePercentageMetric,
483485
*loraInfoMetric,
486+
*cacheInfoMetric,
484487
)
485488
if err != nil {
486489
setupLog.Error(err, "Failed to create metric mapping from flags.")
@@ -525,7 +528,7 @@ func setupDatalayer() (datalayer.EndpointFactory, error) {
525528
extractor, err := dlmetrics.NewExtractor(*totalQueuedRequestsMetric,
526529
*totalRunningRequestsMetric,
527530
*kvCacheUsagePercentageMetric,
528-
*loraInfoMetric)
531+
*loraInfoMetric, *cacheInfoMetric)
529532

530533
if err != nil {
531534
return nil, err
@@ -610,6 +613,9 @@ func verifyMetricMapping(mapping backendmetrics.MetricMapping, logger logr.Logge
610613
if mapping.LoraRequestInfo == nil {
611614
logger.Info("Not scraping metric: LoraRequestInfo")
612615
}
616+
if mapping.CacheConfigInfo == nil {
617+
logger.Info("Not scraping metric: CacheConfigInfo")
618+
}
613619
}
614620

615621
// setupPprofHandlers only implements the pre-defined profiles:

pkg/epp/backend/metrics/metrics.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ const (
3636
LoraInfoRunningAdaptersMetricName = "running_lora_adapters"
3737
LoraInfoWaitingAdaptersMetricName = "waiting_lora_adapters"
3838
LoraInfoMaxAdaptersMetricName = "max_lora"
39+
40+
CacheConfigBlockSizeInfoMetricName = "block_size"
3941
)
4042

4143
type PodMetricsClientImpl struct {
@@ -153,6 +155,24 @@ func (p *PodMetricsClientImpl) promToPodMetrics(
153155
}
154156
}
155157

158+
if p.MetricMapping.CacheConfigInfo != nil {
159+
cacheMetrics, err := p.getMetric(metricFamilies, *p.MetricMapping.CacheConfigInfo)
160+
if err != nil {
161+
errs = multierr.Append(errs, err)
162+
} else {
163+
for _, v := range cacheMetrics.GetLabel() {
164+
if v.GetName() == CacheConfigBlockSizeInfoMetricName {
165+
updated.CacheBlockSize, err = strconv.Atoi(v.GetValue())
166+
if err != nil {
167+
errs = multierr.Append(errs, err)
168+
} else {
169+
break
170+
}
171+
}
172+
}
173+
}
174+
}
175+
156176
return updated, errs
157177
}
158178

pkg/epp/backend/metrics/metrics_spec.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ type MetricMapping struct {
3333
TotalRunningRequests *MetricSpec
3434
KVCacheUtilization *MetricSpec
3535
LoraRequestInfo *MetricSpec
36+
CacheConfigInfo *MetricSpec
3637
}
3738

3839
// stringToMetricSpec converts a string to a MetricSpec.
@@ -94,7 +95,7 @@ func stringToMetricSpec(specStr string) (*MetricSpec, error) {
9495
}
9596

9697
// NewMetricMapping creates a MetricMapping from string values.
97-
func NewMetricMapping(queuedStr, runningStr, kvUsageStr, loraReqInfoStr string) (*MetricMapping, error) {
98+
func NewMetricMapping(queuedStr, runningStr, kvUsageStr, loraReqInfoStr, cacheInfoMetric string) (*MetricMapping, error) {
9899
queuedSpec, err := stringToMetricSpec(queuedStr)
99100
if err != nil {
100101
return nil, fmt.Errorf("error parsing WaitingRequests: %w", err)
@@ -111,11 +112,18 @@ func NewMetricMapping(queuedStr, runningStr, kvUsageStr, loraReqInfoStr string)
111112
if err != nil {
112113
return nil, fmt.Errorf("error parsing loraReqInfoStr: %w", err)
113114
}
115+
116+
cacheInfoSpec, err := stringToMetricSpec(cacheInfoMetric)
117+
if err != nil {
118+
return nil, fmt.Errorf("error parsing cacheInfoMetric: %w", err)
119+
}
120+
114121
mapping := &MetricMapping{
115122
TotalQueuedRequests: queuedSpec,
116123
TotalRunningRequests: runningSpec,
117124
KVCacheUtilization: kvUsageSpec,
118125
LoraRequestInfo: loraReqInfoSpec,
126+
CacheConfigInfo: cacheInfoSpec,
119127
}
120128

121129
return mapping, nil

pkg/epp/datalayer/metrics.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type Metrics struct {
3232
WaitingQueueSize int
3333
KVCacheUsagePercent float64
3434
KvCacheMaxTokenCapacity int
35+
CacheBlockSize int
3536

3637
// UpdateTime records the last time when the metrics were updated.
3738
UpdateTime time.Time
@@ -75,6 +76,7 @@ func (m *Metrics) Clone() *Metrics {
7576
WaitingQueueSize: m.WaitingQueueSize,
7677
KVCacheUsagePercent: m.KVCacheUsagePercent,
7778
KvCacheMaxTokenCapacity: m.KvCacheMaxTokenCapacity,
79+
CacheBlockSize: m.CacheBlockSize,
7880
UpdateTime: m.UpdateTime,
7981
}
8082
}

pkg/epp/datalayer/metrics/extractor.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ const (
3737
LoraInfoRunningAdaptersMetricName = "running_lora_adapters"
3838
LoraInfoWaitingAdaptersMetricName = "waiting_lora_adapters"
3939
LoraInfoMaxAdaptersMetricName = "max_lora"
40+
41+
CacheConfigBlockSizeInfoMetricName = "block_size"
4042
)
4143

4244
// Extractor implements the metrics extraction based on the model
@@ -49,8 +51,8 @@ type Extractor struct {
4951
// configured with the given metrics' specifications.
5052
// These are mandatory metrics per the MSP specification, and are used
5153
// as the basis for the built-in scheduling plugins.
52-
func NewExtractor(queueSpec, runningSpec, kvusageSpec, loraSpec string) (*Extractor, error) {
53-
mapping, err := NewMapping(queueSpec, runningSpec, kvusageSpec, loraSpec)
54+
func NewExtractor(queueSpec, runningSpec, kvusageSpec, loraSpec, cacheInfoSpec string) (*Extractor, error) {
55+
mapping, err := NewMapping(queueSpec, runningSpec, kvusageSpec, loraSpec, cacheInfoSpec)
5456
if err != nil {
5557
return nil, fmt.Errorf("failed to create extractor metrics Mapping - %w", err)
5658
}
@@ -120,6 +122,16 @@ func (ext *Extractor) Extract(ctx context.Context, data any, ep datalayer.Endpoi
120122
}
121123
}
122124

125+
if spec := ext.mapping.CacheInfo; spec != nil { // extract CacheInfo-specific metrics
126+
metric, err := spec.getLatestMetric(families)
127+
if err != nil {
128+
errs = append(errs, err)
129+
} else if metric != nil {
130+
populateCacheInfoMetrics(clone, metric, &errs)
131+
updated = true
132+
}
133+
}
134+
123135
if updated {
124136
clone.UpdateTime = time.Now()
125137
ep.UpdateMetrics(clone)
@@ -154,6 +166,23 @@ func populateLoRAMetrics(clone *datalayer.Metrics, metric *dto.Metric, errs *[]e
154166
}
155167
}
156168

169+
// populateCacheInfoMetrics updates the metrics with cache info from the metric labels.
170+
func populateCacheInfoMetrics(clone *datalayer.Metrics, metric *dto.Metric, errs *[]error) {
171+
clone.CacheBlockSize = 0
172+
for _, label := range metric.GetLabel() {
173+
if label.GetName() == CacheConfigBlockSizeInfoMetricName {
174+
if label.GetValue() != "" {
175+
if val, err := strconv.Atoi(label.GetValue()); err == nil {
176+
clone.CacheBlockSize = val
177+
break
178+
} else {
179+
*errs = append(*errs, err)
180+
}
181+
}
182+
}
183+
}
184+
}
185+
157186
// addAdapters splits a comma-separated adapter list and stores keys with default value 0.
158187
func addAdapters(m map[string]int, csv string) {
159188
for _, name := range strings.Split(csv, ",") {

pkg/epp/datalayer/metrics/mapping.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,11 @@ type Mapping struct {
2727
TotalRunningRequests *Spec
2828
KVCacheUtilization *Spec
2929
LoraRequestInfo *LoRASpec
30+
CacheInfo *Spec
3031
}
3132

3233
// NewMapping creates a metrics.Mapping from the input specification strings.
33-
func NewMapping(queue, running, kvusage, lora string) (*Mapping, error) {
34+
func NewMapping(queue, running, kvusage, lora, cacheInfo string) (*Mapping, error) {
3435
var errs []error
3536

3637
queueSpec, err := parseStringToSpec(queue)
@@ -49,6 +50,12 @@ func NewMapping(queue, running, kvusage, lora string) (*Mapping, error) {
4950
if err != nil {
5051
errs = append(errs, err)
5152
}
53+
54+
cacheInfoSpec, err := parseStringToSpec(cacheInfo)
55+
if err != nil {
56+
errs = append(errs, err)
57+
}
58+
5259
if len(errs) != 0 {
5360
return nil, errors.Join(errs...)
5461
}
@@ -57,5 +64,6 @@ func NewMapping(queue, running, kvusage, lora string) (*Mapping, error) {
5764
TotalRunningRequests: runningSpec,
5865
KVCacheUtilization: kvusageSpec,
5966
LoraRequestInfo: loraSpec,
67+
CacheInfo: cacheInfoSpec,
6068
}, nil
6169
}

pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,18 +61,21 @@ const (
6161

6262
const (
6363
PodActiveCheckInterval = 2 * time.Minute
64+
65+
// An estimated average characters per token, used since the request we cached is not tokenized.
66+
averageCharactersPerToken = 4
6467
)
6568

6669
var DefaultConfig = Config{
67-
BlockSize: DefaultBlockSize,
70+
DefaultBlockSize: DefaultBlockSize,
6871
MaxPrefixBlocksToMatch: DefaultMaxPrefixBlocks,
6972
LRUCapacityPerServer: DefaultLRUCapacityPerServer,
7073
}
7174

7275
type Config struct {
7376
// The input prompt is broken into sizes of BlockSize to calculate block hashes . Requests
7477
// with length shorter than the block size will be ignored.
75-
BlockSize int `json:"blockSize"`
78+
DefaultBlockSize int `json:"blockSize"`
7679
// MaxPrefixBlocksToMatch is the maximum number of prefix blocks to match. Input beyond this limit will
7780
// be ignored.
7881
MaxPrefixBlocksToMatch int `json:"maxPrefixBlocksToMatch"`
@@ -146,7 +149,7 @@ var (
146149
// PrefixCachePluginFactory defines the factory function for Prefix plugin.
147150
func PrefixCachePluginFactory(name string, rawParameters json.RawMessage, handle plugins.Handle) (plugins.Plugin, error) {
148151
parameters := Config{
149-
BlockSize: DefaultBlockSize,
152+
DefaultBlockSize: DefaultBlockSize,
150153
MaxPrefixBlocksToMatch: DefaultMaxPrefixBlocks,
151154
LRUCapacityPerServer: DefaultLRUCapacityPerServer,
152155
}
@@ -196,7 +199,7 @@ func (p *Plugin) WithName(name string) *Plugin {
196199
func (p *Plugin) Score(ctx context.Context, cycleState *types.CycleState, request *types.LLMRequest, pods []types.Pod) map[types.Pod]float64 {
197200

198201
// pre score step, hashing prompt and find longest prefix match.
199-
hashes := hashPrompt(ctx, request, p.config.BlockSize, p.config.MaxPrefixBlocksToMatch)
202+
hashes := hashPrompt(ctx, request, getBlockSize(pods, p.config.DefaultBlockSize), p.config.MaxPrefixBlocksToMatch)
200203
state := &SchedulingContextState{
201204
PrefixHashes: hashes,
202205
PrefixCacheServers: p.matchLongestPrefix(ctx, hashes),
@@ -247,7 +250,9 @@ func (p *Plugin) PreRequest(ctx context.Context, request *types.LLMRequest, sche
247250

248251
total := len(state.PrefixHashes)
249252
matchLen := state.PrefixCacheServers[ServerID(targetPod.NamespacedName)]
250-
metrics.RecordPrefixCacheMatch(matchLen*p.config.BlockSize, total*p.config.BlockSize)
253+
254+
blockSize := getBlockSize(primaryProfileResult.TargetPods, p.config.DefaultBlockSize)
255+
metrics.RecordPrefixCacheMatch(matchLen*blockSize, total*blockSize)
251256
}
252257

253258
// matchLongestPrefix returns a map of servers and length of prefix that each server caches.
@@ -359,3 +364,19 @@ func getUserInputBytes(request *types.LLMRequest) ([]byte, error) {
359364
// must be chat-completions request at this point, return bytes of entire messages
360365
return json.Marshal(request.Body.ChatCompletions.Messages)
361366
}
367+
368+
func getBlockSize(pods []types.Pod, defaultBlockSize int) int {
369+
if len(pods) == 0 {
370+
return defaultBlockSize
371+
}
372+
373+
// Since all PODs originate from the same inference pool, they are considered to have identical configurations.
374+
// Therefore, using the CacheBlockSize value from the first POD suffices.
375+
if pod := pods[0]; pod.GetMetrics() != nil {
376+
cacheBlockSize := pod.GetMetrics().CacheBlockSize
377+
if cacheBlockSize > 0 {
378+
return cacheBlockSize * averageCharactersPerToken
379+
}
380+
}
381+
return defaultBlockSize
382+
}

pkg/epp/scheduling/framework/plugins/multi/prefix/plugin_test.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,20 +28,21 @@ import (
2828
k8stypes "k8s.io/apimachinery/pkg/types"
2929

3030
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
31+
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
3132
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
3233
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
3334
)
3435

3536
func TestPrefixPluginCompletion(t *testing.T) {
3637
config := Config{
37-
BlockSize: 4,
38+
DefaultBlockSize: 4,
3839
MaxPrefixBlocksToMatch: DefaultMaxPrefixBlocks,
3940
LRUCapacityPerServer: DefaultLRUCapacityPerServer,
4041
}
4142
plugin := New(context.Background(), config)
4243

43-
pod1 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}}
44-
pod2 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}}
44+
pod1 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}, MetricsState: backendmetrics.NewMetricsState()}
45+
pod2 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}, MetricsState: backendmetrics.NewMetricsState()}
4546
pods := []types.Pod{pod1, pod2}
4647

4748
// First request.
@@ -200,13 +201,13 @@ func TestPrefixPluginCompletion(t *testing.T) {
200201

201202
func TestPrefixPluginChatCompletions(t *testing.T) {
202203
config := Config{
203-
BlockSize: 4,
204+
DefaultBlockSize: 4,
204205
MaxPrefixBlocksToMatch: DefaultMaxPrefixBlocks,
205206
LRUCapacityPerServer: DefaultLRUCapacityPerServer,
206207
}
207208
plugin := New(context.Background(), config)
208209

209-
pod1 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}}
210+
pod1 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}, MetricsState: &backendmetrics.MetricsState{}}
210211
pods := []types.Pod{pod1}
211212

212213
// Test with chat completions request
@@ -234,14 +235,14 @@ func TestPrefixPluginChatCompletions(t *testing.T) {
234235

235236
func TestPrefixPluginChatCompletionsGrowth(t *testing.T) {
236237
config := Config{
237-
BlockSize: 8, // Use larger block size for more predictable JSON marshaling
238+
DefaultBlockSize: 8, // Use larger block size for more predictable JSON marshaling
238239
MaxPrefixBlocksToMatch: DefaultMaxPrefixBlocks,
239240
LRUCapacityPerServer: DefaultLRUCapacityPerServer,
240241
}
241242
plugin := New(context.Background(), config)
242243

243-
pod1 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}}
244-
pod2 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}}
244+
pod1 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}, MetricsState: &backendmetrics.MetricsState{}}
245+
pod2 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}, MetricsState: &backendmetrics.MetricsState{}}
245246
pods := []types.Pod{pod1, pod2}
246247

247248
// First request with initial conversation
@@ -348,7 +349,7 @@ func BenchmarkPrefixPluginStress(b *testing.B) {
348349
blockSize := 4
349350
maxPrefixBlocks := 50000
350351
config := Config{
351-
BlockSize: blockSize,
352+
DefaultBlockSize: blockSize,
352353
MaxPrefixBlocksToMatch: maxPrefixBlocks,
353354
LRUCapacityPerServer: DefaultLRUCapacityPerServer,
354355
}
@@ -412,7 +413,7 @@ func BenchmarkPrefixPluginChatCompletionsStress(b *testing.B) {
412413
blockSize := 8
413414
maxPrefixBlocks := 50000
414415
config := Config{
415-
BlockSize: blockSize,
416+
DefaultBlockSize: blockSize,
416417
MaxPrefixBlocksToMatch: maxPrefixBlocks,
417418
LRUCapacityPerServer: DefaultLRUCapacityPerServer,
418419
}

pkg/epp/server/runserver.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ const (
8585
DefaultTotalRunningRequestsMetric = "vllm:num_requests_running" // default for --total-running-requests-metric
8686
DefaultKvCacheUsagePercentageMetric = "vllm:gpu_cache_usage_perc" // default for --kv-cache-usage-percentage-metric
8787
DefaultLoraInfoMetric = "vllm:lora_requests_info" // default for --lora-info-metric
88+
DefaultCacheInfoMetric = "vllm:cache_config_info" // default for --cache-info-metric
8889
DefaultCertPath = "" // default for --cert-path
8990
DefaultConfigFile = "" // default for --config-file
9091
DefaultConfigText = "" // default for --config-text

0 commit comments

Comments
 (0)