Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ var (
kvCacheUsagePercentageMetric = flag.String("kv-cache-usage-percentage-metric", runserver.DefaultKvCacheUsagePercentageMetric, "Prometheus metric for the fraction of KV-cache blocks currently in use (from 0 to 1).")
// LoRA metrics
loraInfoMetric = flag.String("lora-info-metric", runserver.DefaultLoraInfoMetric, "Prometheus metric for the LoRA info metrics (must be in vLLM label format).")
// Cache info metrics
cacheInfoMetric = flag.String("cache-info-metric", runserver.DefaultCacheInfoMetric, "Prometheus metric for the cache info metrics.")
// metrics related flags
refreshMetricsInterval = flag.Duration("refresh-metrics-interval", runserver.DefaultRefreshMetricsInterval, "interval to refresh metrics")
refreshPrometheusMetricsInterval = flag.Duration("refresh-prometheus-metrics-interval", runserver.DefaultRefreshPrometheusMetricsInterval, "interval to flush prometheus metrics")
Expand Down Expand Up @@ -371,6 +373,7 @@ func setupMetricsV1(setupLog logr.Logger) (datalayer.EndpointFactory, error) {
*totalQueuedRequestsMetric,
*kvCacheUsagePercentageMetric,
*loraInfoMetric,
*cacheInfoMetric,
)
if err != nil {
setupLog.Error(err, "Failed to create metric mapping from flags.")
Expand Down Expand Up @@ -414,7 +417,7 @@ func setupDatalayer() (datalayer.EndpointFactory, error) {
nil)
extractor, err := dlmetrics.NewExtractor(*totalQueuedRequestsMetric,
*kvCacheUsagePercentageMetric,
*loraInfoMetric)
*loraInfoMetric, *cacheInfoMetric)

if err != nil {
return nil, err
Expand Down Expand Up @@ -499,6 +502,9 @@ func verifyMetricMapping(mapping backendmetrics.MetricMapping, logger logr.Logge
if mapping.LoraRequestInfo == nil {
logger.Info("Not scraping metric: LoraRequestInfo")
}
if mapping.CacheConfigInfo == nil {
logger.Info("Not scraping metric: CacheConfigInfo")
}
}

// setupPprofHandlers only implements the pre-defined profiles:
Expand Down
20 changes: 20 additions & 0 deletions pkg/epp/backend/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const (
LoraInfoRunningAdaptersMetricName = "running_lora_adapters"
LoraInfoWaitingAdaptersMetricName = "waiting_lora_adapters"
LoraInfoMaxAdaptersMetricName = "max_lora"

CacheConfigBlockSizeInfoMetricName = "block_size"
)

type PodMetricsClientImpl struct {
Expand Down Expand Up @@ -144,6 +146,24 @@ func (p *PodMetricsClientImpl) promToPodMetrics(
}
}

if p.MetricMapping.CacheConfigInfo != nil {
cacheMetrics, err := p.getMetric(metricFamilies, *p.MetricMapping.CacheConfigInfo)
if err != nil {
errs = multierr.Append(errs, err)
} else {
for _, v := range cacheMetrics.GetLabel() {
if v.GetName() == CacheConfigBlockSizeInfoMetricName {
updated.CacheBlockSize, err = strconv.Atoi(v.GetValue())
if err != nil {
errs = multierr.Append(errs, err)
} else {
break
}
}
}
}
}

return updated, errs
}

Expand Down
10 changes: 9 additions & 1 deletion pkg/epp/backend/metrics/metrics_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type MetricMapping struct {
TotalQueuedRequests *MetricSpec
KVCacheUtilization *MetricSpec
LoraRequestInfo *MetricSpec
CacheConfigInfo *MetricSpec
}

// stringToMetricSpec converts a string to a MetricSpec.
Expand Down Expand Up @@ -93,7 +94,7 @@ func stringToMetricSpec(specStr string) (*MetricSpec, error) {
}

// NewMetricMapping creates a MetricMapping from string values.
func NewMetricMapping(queuedStr, kvUsageStr, loraReqInfoStr string) (*MetricMapping, error) {
func NewMetricMapping(queuedStr, kvUsageStr, loraReqInfoStr, cacheInfoMetric string) (*MetricMapping, error) {
queuedSpec, err := stringToMetricSpec(queuedStr)
if err != nil {
return nil, fmt.Errorf("error parsing WaitingRequests: %w", err)
Expand All @@ -106,10 +107,17 @@ func NewMetricMapping(queuedStr, kvUsageStr, loraReqInfoStr string) (*MetricMapp
if err != nil {
return nil, fmt.Errorf("error parsing loraReqInfoStr: %w", err)
}

cacheInfoSpec, err := stringToMetricSpec(cacheInfoMetric)
if err != nil {
return nil, fmt.Errorf("error parsing cacheInfoMetric: %w", err)
}

mapping := &MetricMapping{
TotalQueuedRequests: queuedSpec,
KVCacheUtilization: kvUsageSpec,
LoraRequestInfo: loraReqInfoSpec,
CacheConfigInfo: cacheInfoSpec,
}

return mapping, nil
Expand Down
2 changes: 2 additions & 0 deletions pkg/epp/datalayer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Metrics struct {
WaitingQueueSize int
KVCacheUsagePercent float64
KvCacheMaxTokenCapacity int
CacheBlockSize int

// UpdateTime records the last time when the metrics were updated.
UpdateTime time.Time
Expand Down Expand Up @@ -75,6 +76,7 @@ func (m *Metrics) Clone() *Metrics {
WaitingQueueSize: m.WaitingQueueSize,
KVCacheUsagePercent: m.KVCacheUsagePercent,
KvCacheMaxTokenCapacity: m.KvCacheMaxTokenCapacity,
CacheBlockSize: m.CacheBlockSize,
UpdateTime: m.UpdateTime,
}
}
33 changes: 31 additions & 2 deletions pkg/epp/datalayer/metrics/extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const (
LoraInfoRunningAdaptersMetricName = "running_lora_adapters"
LoraInfoWaitingAdaptersMetricName = "waiting_lora_adapters"
LoraInfoMaxAdaptersMetricName = "max_lora"

CacheConfigBlockSizeInfoMetricName = "block_size"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you plan to add the cache size metric as well? maybe in a follow up PR? That's a much more useful metric than the block size. In fact, the block size doesn't necessarily need to be the same with vllm

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, If IGW requires cache size metric, I'd be happy to add that in a subsequent pull request.

)

// Extractor implements the metrics extraction based on the model
Expand All @@ -49,8 +51,8 @@ type Extractor struct {
// configured with the given metrics' specifications.
// These are mandatory metrics per the MSP specification, and are used
// as the basis for the built-in scheduling plugins.
func NewExtractor(queueSpec, kvusageSpec, loraSpec string) (*Extractor, error) {
mapping, err := NewMapping(queueSpec, kvusageSpec, loraSpec)
func NewExtractor(queueSpec, kvusageSpec, loraSpec, cacheInfoSpec string) (*Extractor, error) {
mapping, err := NewMapping(queueSpec, kvusageSpec, loraSpec, cacheInfoSpec)
if err != nil {
return nil, fmt.Errorf("failed to create extractor metrics Mapping - %w", err)
}
Expand Down Expand Up @@ -111,6 +113,16 @@ func (ext *Extractor) Extract(ctx context.Context, data any, ep datalayer.Endpoi
}
}

if spec := ext.mapping.CacheInfo; spec != nil { // extract CacheInfo-specific metrics
metric, err := spec.getLatestMetric(families)
if err != nil {
errs = append(errs, err)
} else if metric != nil {
populateCacheInfoMetrics(clone, metric, &errs)
updated = true
}
}

if updated {
clone.UpdateTime = time.Now()
ep.UpdateMetrics(clone)
Expand Down Expand Up @@ -145,6 +157,23 @@ func populateLoRAMetrics(clone *datalayer.Metrics, metric *dto.Metric, errs *[]e
}
}

// populateCacheInfoMetrics updates the metrics with cache info from the metric labels.
func populateCacheInfoMetrics(clone *datalayer.Metrics, metric *dto.Metric, errs *[]error) {
clone.CacheBlockSize = 0
for _, label := range metric.GetLabel() {
if label.GetName() == CacheConfigBlockSizeInfoMetricName {
if label.GetValue() != "" {
if val, err := strconv.Atoi(label.GetValue()); err == nil {
clone.CacheBlockSize = val
break
} else {
*errs = append(*errs, err)
}
}
}
}
}

// addAdapters splits a comma-separated adapter list and stores keys with default value 0.
func addAdapters(m map[string]int, csv string) {
for _, name := range strings.Split(csv, ",") {
Expand Down
10 changes: 9 additions & 1 deletion pkg/epp/datalayer/metrics/mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ type Mapping struct {
TotalQueuedRequests *Spec
KVCacheUtilization *Spec
LoraRequestInfo *LoRASpec
CacheInfo *Spec
}

// NewMapping creates a metrics.Mapping from the input specification strings.
func NewMapping(queue, kvusage, lora string) (*Mapping, error) {
func NewMapping(queue, kvusage, lora, cacheInfo string) (*Mapping, error) {
var errs []error

queueSpec, err := parseStringToSpec(queue)
Expand All @@ -44,12 +45,19 @@ func NewMapping(queue, kvusage, lora string) (*Mapping, error) {
if err != nil {
errs = append(errs, err)
}

cacheInfoSpec, err := parseStringToSpec(cacheInfo)
if err != nil {
errs = append(errs, err)
}

if len(errs) != 0 {
return nil, errors.Join(errs...)
}
return &Mapping{
TotalQueuedRequests: queueSpec,
KVCacheUtilization: kvusageSpec,
LoraRequestInfo: loraSpec,
CacheInfo: cacheInfoSpec,
}, nil
}
6 changes: 5 additions & 1 deletion pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,11 @@ func (p *Plugin) WithName(name string) *Plugin {
// Score returns the scoring result for the given list of pods based on context.
func (p *Plugin) Score(ctx context.Context, cycleState *types.CycleState, request *types.LLMRequest, pods []types.Pod) map[types.Pod]float64 {
// pre score step, hashing prompt and find longest prefix match.
hashes := hashPrompt(ctx, request, p.config.BlockSize, p.config.MaxPrefixBlocksToMatch)
blockSize := pods[0].GetMetrics().CacheBlockSize * 4
if blockSize <= 0 {
blockSize = p.config.BlockSize
}
hashes := hashPrompt(ctx, request, blockSize, p.config.MaxPrefixBlocksToMatch)
state := &SchedulingContextState{
PrefixHashes: hashes,
PrefixCacheServers: p.matchLongestPrefix(ctx, hashes),
Expand Down
11 changes: 6 additions & 5 deletions pkg/epp/scheduling/framework/plugins/multi/prefix/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
k8stypes "k8s.io/apimachinery/pkg/types"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
)
Expand All @@ -41,8 +42,8 @@ func TestPrefixPluginCompletion(t *testing.T) {
}
plugin := New(context.Background(), config)

pod1 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}}
pod2 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}}
pod1 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}, MetricsState: backendmetrics.NewMetricsState()}
pod2 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}, MetricsState: backendmetrics.NewMetricsState()}
pods := []types.Pod{pod1, pod2}

// First request.
Expand Down Expand Up @@ -207,7 +208,7 @@ func TestPrefixPluginChatCompletions(t *testing.T) {
}
plugin := New(context.Background(), config)

pod1 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}}
pod1 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}, MetricsState: &backendmetrics.MetricsState{}}
pods := []types.Pod{pod1}

// Test with chat completions request
Expand Down Expand Up @@ -241,8 +242,8 @@ func TestPrefixPluginChatCompletionsGrowth(t *testing.T) {
}
plugin := New(context.Background(), config)

pod1 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}}
pod2 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}}
pod1 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}, MetricsState: &backendmetrics.MetricsState{}}
pod2 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}, MetricsState: &backendmetrics.MetricsState{}}
pods := []types.Pod{pod1, pod2}

// First request with initial conversation
Expand Down
1 change: 1 addition & 0 deletions pkg/epp/server/runserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ const (
DefaultTotalQueuedRequestsMetric = "vllm:num_requests_waiting" // default for --total-queued-requests-metric
DefaultKvCacheUsagePercentageMetric = "vllm:gpu_cache_usage_perc" // default for --kv-cache-usage-percentage-metric
DefaultLoraInfoMetric = "vllm:lora_requests_info" // default for --lora-info-metric
DefaultCacheInfoMetric = "vllm:cache_config_info" // default for --cache-info-metric
DefaultCertPath = "" // default for --cert-path
DefaultConfigFile = "" // default for --config-file
DefaultConfigText = "" // default for --config-text
Expand Down