From 16e44967a0fe787323afb1466ee53bb351c86915 Mon Sep 17 00:00:00 2001 From: Luke Van Drie Date: Mon, 13 Oct 2025 22:18:16 +0000 Subject: [PATCH 1/2] feat: Add initial Flow Control metrics Introduces initial Prometheus metrics for the experimental Flow Contorl layer in EPP. This change adds the following metrics: - inference_extension_flow_control_request_queue_duration_seconds: A histogram to track the total time requests spend in the Flow Control layer, from invocation of EnqueueAndWait to final outcome. - inference_extension_flow_control_queue_size: A gauge to track the number of requests currently being managed by the Flow Control layer. These metrics are labeled by fairness_id, priority, and outcome (for the duration metric). --- pkg/epp/metrics/metrics.go | 41 ++++++ pkg/epp/metrics/metrics_test.go | 130 +++++++++++++++++-- site-src/guides/metrics-and-observability.md | 9 ++ 3 files changed, 170 insertions(+), 10 deletions(-) diff --git a/pkg/epp/metrics/metrics.go b/pkg/epp/metrics/metrics.go index cefc4d43a..db361a48c 100644 --- a/pkg/epp/metrics/metrics.go +++ b/pkg/epp/metrics/metrics.go @@ -235,6 +235,28 @@ var ( }, []string{"commit", "build_ref"}, ) + + // Flow Control Metrics + flowControlRequestQueueDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Subsystem: InferenceExtension, + Name: "flow_control_request_queue_duration_seconds", + Help: metricsutil.HelpMsgWithStability("Distribution of the total time requests spend in the EPP flow control layer, measured from the start of the EnqueueAndWait call until a final outcome is reached.", compbasemetrics.ALPHA), + Buckets: []float64{ + 0.0001, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, + }, + }, + []string{"fairness_id", "priority", "outcome"}, + ) + + flowControlQueueSize = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: InferenceExtension, + Name: "flow_control_queue_size", + Help: metricsutil.HelpMsgWithStability("Current number of requests being actively managed by the EPP flow control layer, from the start of the EnqueueAndWait call until a final outcome is reached.", compbasemetrics.ALPHA), + }, + []string{"fairness_id", "priority"}, + ) ) var registerMetrics sync.Once @@ -260,6 +282,8 @@ func Register(customCollectors ...prometheus.Collector) { metrics.Registry.MustRegister(PrefixCacheSize) metrics.Registry.MustRegister(PrefixCacheHitRatio) metrics.Registry.MustRegister(PrefixCacheHitLength) + metrics.Registry.MustRegister(flowControlRequestQueueDuration) + metrics.Registry.MustRegister(flowControlQueueSize) for _, collector := range customCollectors { metrics.Registry.MustRegister(collector) } @@ -286,6 +310,8 @@ func Reset() { PrefixCacheSize.Reset() PrefixCacheHitRatio.Reset() PrefixCacheHitLength.Reset() + flowControlRequestQueueDuration.Reset() + flowControlQueueSize.Reset() } // RecordRequstCounter records the number of requests. @@ -414,3 +440,18 @@ func RecordPrefixCacheMatch(matchedLength, totalLength int) { func RecordInferenceExtensionInfo(commitSha, buildRef string) { InferenceExtensionInfo.WithLabelValues(commitSha, buildRef).Set(1) } + +// RecordFlowControlRequestQueueDuration records the duration a request spent in the Flow Control layer. +func RecordFlowControlRequestQueueDuration(fairnessID, priority, outcome string, duration time.Duration) { + flowControlRequestQueueDuration.WithLabelValues(fairnessID, priority, outcome).Observe(duration.Seconds()) +} + +// IncFlowControlQueueSize increments the Flow Control queue size gauge. +func IncFlowControlQueueSize(fairnessID, priority string) { + flowControlQueueSize.WithLabelValues(fairnessID, priority).Inc() +} + +// DecFlowControlQueueSize decrements the Flow Control queue size gauge. +func DecFlowControlQueueSize(fairnessID, priority string) { + flowControlQueueSize.WithLabelValues(fairnessID, priority).Dec() +} diff --git a/pkg/epp/metrics/metrics_test.go b/pkg/epp/metrics/metrics_test.go index 837fb8b22..28f941647 100644 --- a/pkg/epp/metrics/metrics_test.go +++ b/pkg/epp/metrics/metrics_test.go @@ -22,6 +22,9 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/require" "k8s.io/component-base/metrics/testutil" "sigs.k8s.io/controller-runtime/pkg/metrics" @@ -44,7 +47,14 @@ const ( PerPodQueueSizeMetrics = InferencePoolComponent + "_per_pod_queue_size" ) +func TestMain(m *testing.M) { + // Register all metrics once for the entire test suite. + Register() + os.Exit(m.Run()) +} + func TestRecordRequestCounterandSizes(t *testing.T) { + Reset() type requests struct { modelName string targetModelName string @@ -78,7 +88,6 @@ func TestRecordRequestCounterandSizes(t *testing.T) { }, }, }} - Register() for _, scenario := range scenarios { t.Run(scenario.name, func(t *testing.T) { for _, req := range scenario.reqs { @@ -114,6 +123,7 @@ func TestRecordRequestCounterandSizes(t *testing.T) { } func TestRecordRequestErrorCounter(t *testing.T) { + Reset() type requests struct { modelName string targetModelName string @@ -150,7 +160,6 @@ func TestRecordRequestErrorCounter(t *testing.T) { }, }, } - Register() for _, scenario := range scenarios { t.Run(scenario.name, func(t *testing.T) { for _, req := range scenario.reqs { @@ -174,6 +183,7 @@ func TestRecordRequestErrorCounter(t *testing.T) { } func TestRecordRequestLatencies(t *testing.T) { + Reset() ctx := logutil.NewTestLoggerIntoContext(context.Background()) timeBaseline := time.Now() type requests struct { @@ -229,7 +239,6 @@ func TestRecordRequestLatencies(t *testing.T) { invalid: true, }, } - Register() for _, scenario := range scenarios { t.Run(scenario.name, func(t *testing.T) { for _, req := range scenario.reqs { @@ -256,6 +265,7 @@ func TestRecordRequestLatencies(t *testing.T) { } func TestRecordNormalizedTimePerOutputToken(t *testing.T) { + Reset() ctx := logutil.NewTestLoggerIntoContext(context.Background()) timeBaseline := time.Now() type tokenRequests struct { @@ -330,7 +340,6 @@ func TestRecordNormalizedTimePerOutputToken(t *testing.T) { invalid: true, }, } - Register() for _, scenario := range scenarios { t.Run(scenario.name, func(t *testing.T) { for _, req := range scenario.reqs { @@ -357,6 +366,7 @@ func TestRecordNormalizedTimePerOutputToken(t *testing.T) { } func TestRecordResponseMetrics(t *testing.T) { + Reset() type responses struct { modelName string targetModelName string @@ -400,7 +410,6 @@ func TestRecordResponseMetrics(t *testing.T) { }, }, }} - Register() for _, scenario := range scenarios { t.Run(scenario.name, func(t *testing.T) { for _, resp := range scenario.resp { @@ -451,6 +460,7 @@ func TestRecordResponseMetrics(t *testing.T) { } func TestRunningRequestsMetrics(t *testing.T) { + Reset() type request struct { modelName string complete bool // true -> request is completed, false -> running request @@ -483,7 +493,6 @@ func TestRunningRequestsMetrics(t *testing.T) { }, } - Register() for _, scenario := range scenarios { t.Run(scenario.name, func(t *testing.T) { for _, req := range scenario.requests { @@ -511,6 +520,7 @@ func TestRunningRequestsMetrics(t *testing.T) { } func TestInferencePoolMetrics(t *testing.T) { + Reset() scenarios := []struct { name string poolName string @@ -524,7 +534,6 @@ func TestInferencePoolMetrics(t *testing.T) { queueSizeAvg: 0.4, }, } - Register() for _, scenario := range scenarios { t.Run(scenario.name, func(t *testing.T) { RecordInferencePoolAvgKVCache(scenario.poolName, scenario.kvCacheAvg) @@ -560,6 +569,7 @@ func TestInferencePoolMetrics(t *testing.T) { } func TestPluginProcessingLatencies(t *testing.T) { + Reset() type pluginLatency struct { extensionPoint string pluginType string @@ -600,7 +610,6 @@ func TestPluginProcessingLatencies(t *testing.T) { }, }, } - Register() for _, scenario := range scenarios { t.Run(scenario.name, func(t *testing.T) { for _, latency := range scenario.latencies { @@ -624,6 +633,7 @@ func TestPluginProcessingLatencies(t *testing.T) { } func TestSchedulerE2ELatency(t *testing.T) { + Reset() scenarios := []struct { name string durations []time.Duration @@ -643,7 +653,6 @@ func TestSchedulerE2ELatency(t *testing.T) { }, }, } - Register() for _, scenario := range scenarios { t.Run(scenario.name, func(t *testing.T) { for _, duration := range scenario.durations { @@ -667,6 +676,7 @@ func TestSchedulerE2ELatency(t *testing.T) { } func TestPrefixCacheMetrics(t *testing.T) { + Reset() const ( PrefixCacheSizeMetric = InferenceExtension + "_prefix_indexer_size" PrefixCacheHitRatioMetric = InferenceExtension + "_prefix_indexer_hit_ratio" @@ -713,7 +723,6 @@ func TestPrefixCacheMetrics(t *testing.T) { }, } - Register() t.Run(scenario.name, func(t *testing.T) { // Record cache size metrics for _, size := range scenario.cacheSizes { @@ -768,3 +777,104 @@ func TestPrefixCacheMetrics(t *testing.T) { } }) } + +func getHistogramVecLabelValues(t *testing.T, h *prometheus.HistogramVec, labelValues ...string) (*dto.Histogram, error) { + t.Helper() + m, err := h.GetMetricWithLabelValues(labelValues...) + if err != nil { + return nil, err + } + metricDto := &dto.Metric{} + if err := m.(prometheus.Histogram).Write(metricDto); err != nil { + return nil, err + } + return metricDto.GetHistogram(), nil +} + +func TestFlowControlQueueDurationMetric(t *testing.T) { + Reset() + + records := []struct { + fairnessID string + priority string + outcome string + duration time.Duration + }{ + {fairnessID: "user-a", priority: "100", outcome: "Dispatched", duration: 10 * time.Millisecond}, + {fairnessID: "user-a", priority: "100", outcome: "Dispatched", duration: 20 * time.Millisecond}, + {fairnessID: "user-b", priority: "100", outcome: "RejectedCapacity", duration: 5 * time.Millisecond}, + {fairnessID: "user-a", priority: "50", outcome: "Dispatched", duration: 100 * time.Millisecond}, + } + + for _, rec := range records { + RecordFlowControlRequestQueueDuration(rec.fairnessID, rec.priority, rec.outcome, rec.duration) + } + + testCases := []struct { + name string + labels prometheus.Labels + expectCount uint64 + expectSum float64 + }{ + { + name: "user-a, prio 100, dispatched", + labels: prometheus.Labels{"fairness_id": "user-a", "priority": "100", "outcome": "Dispatched"}, + expectCount: 2, + expectSum: 0.03, // 0.01 + 0.02 + }, + { + name: "user-b, prio 100, rejected", + labels: prometheus.Labels{"fairness_id": "user-b", "priority": "100", "outcome": "RejectedCapacity"}, + expectCount: 1, + expectSum: 0.005, + }, + { + name: "user-a, prio 50, dispatched", + labels: prometheus.Labels{"fairness_id": "user-a", "priority": "50", "outcome": "Dispatched"}, + expectCount: 1, + expectSum: 0.1, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + labels := []string{tc.labels["fairness_id"], tc.labels["priority"], tc.labels["outcome"]} + hist, err := getHistogramVecLabelValues(t, flowControlRequestQueueDuration, labels...) + require.NoError(t, err, "Failed to get histogram for labels %v", tc.labels) + require.Equal(t, tc.expectCount, hist.GetSampleCount(), "Sample count mismatch for labels %v", tc.labels) + require.InDelta(t, tc.expectSum, hist.GetSampleSum(), 0.00001, "Sample sum mismatch for labels %v", tc.labels) + }) + } +} + +func TestFlowControlQueueSizeMetric(t *testing.T) { + Reset() + + // Basic Inc/Dec + IncFlowControlQueueSize("user-a", "100") + val, err := testutil.GetGaugeMetricValue(flowControlQueueSize.WithLabelValues("user-a", "100")) + require.NoError(t, err, "Failed to get gauge value for user-a/100 after Inc") + require.Equal(t, 1.0, val, "Gauge value should be 1 after Inc for user-a/100") + + DecFlowControlQueueSize("user-a", "100") + val, err = testutil.GetGaugeMetricValue(flowControlQueueSize.WithLabelValues("user-a", "100")) + require.NoError(t, err, "Failed to get gauge value for user-a/100 after Dec") + require.Equal(t, 0.0, val, "Gauge value should be 0 after Dec for user-a/100") + + // Multiple labels + IncFlowControlQueueSize("user-b", "200") + IncFlowControlQueueSize("user-b", "200") + val, err = testutil.GetGaugeMetricValue(flowControlQueueSize.WithLabelValues("user-b", "200")) + require.NoError(t, err, "Failed to get gauge value for user-b/200") + require.Equal(t, 2.0, val, "Gauge value should be 2 for user-b/200") + + DecFlowControlQueueSize("user-b", "200") + val, err = testutil.GetGaugeMetricValue(flowControlQueueSize.WithLabelValues("user-b", "200")) + require.NoError(t, err, "Failed to get gauge value for user-b/200 after one Dec") + require.Equal(t, 1.0, val, "Gauge value should be 1 for user-b/200 after one Dec") + + // Non-existent labels + val, err = testutil.GetGaugeMetricValue(flowControlQueueSize.WithLabelValues("user-c", "100")) + require.NoError(t, err, "Failed to get gauge value for non-existent user-c/100") + require.Equal(t, 0.0, val, "Gauge value for non-existent labels should be 0") +} diff --git a/site-src/guides/metrics-and-observability.md b/site-src/guides/metrics-and-observability.md index 5e1f02e49..8bcd95277 100644 --- a/site-src/guides/metrics-and-observability.md +++ b/site-src/guides/metrics-and-observability.md @@ -53,6 +53,15 @@ This guide describes the current state of exposed metrics and how to scrape them |:---------------------------|:-----------------|:-------------------------------------------------|:------------------------------------------|:------------| | lora_syncer_adapter_status | Gauge | Status of LoRA adapters (1=loaded, 0=not_loaded) | `adapter_name`=<adapter-id> | ALPHA | +### Flow Control Metrics (Experimental) + +These metrics provide insights into the experimental flow control layer within the EPP. + +| **Metric name** | **Metric Type** |
**Description**
|
**Labels**
| **Status** | +|:---|:---|:---|:---|:---| +| inference_extension_flow_control_request_queue_duration_seconds | Distribution | Distribution of the total time requests spend in the flow control layer. This is measured from the moment a request enters the `EnqueueAndWait` function until it reaches a final outcome (e.g., Dispatched, Rejected, Evicted). | `fairness_id`=<flow-id>
`priority`=<flow-priority>
`outcome`=<QueueOutcome> | ALPHA | +| inference_extension_flow_control_queue_size | Gauge | The current number of requests being actively managed by the flow control layer. This counts requests from the moment they enter the `EnqueueAndWait` function until they reach a final outcome. | `fairness_id`=<flow-id>
`priority`=<flow-priority> | ALPHA | + ## Scrape Metrics & Pprof profiles The metrics endpoints are exposed on different ports by default: From 369ae1f0cae7a15f03c20882541b0394d66fcf0d Mon Sep 17 00:00:00 2001 From: Luke Van Drie Date: Tue, 14 Oct 2025 19:48:07 +0000 Subject: [PATCH 2/2] Rebase onto HEAd and resolve conflicts. --- pkg/epp/flowcontrol/controller/controller.go | 9 ++++++++- pkg/epp/flowcontrol/controller/internal/item.go | 6 ++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/pkg/epp/flowcontrol/controller/controller.go b/pkg/epp/flowcontrol/controller/controller.go index c1f9a3004..93d0330c7 100644 --- a/pkg/epp/flowcontrol/controller/controller.go +++ b/pkg/epp/flowcontrol/controller/controller.go @@ -28,6 +28,7 @@ import ( "errors" "fmt" "slices" + "strconv" "sync" "time" @@ -38,6 +39,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/controller/internal" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) @@ -209,6 +211,12 @@ func (fc *FlowController) EnqueueAndWait( return types.QueueOutcomeRejectedOther, errors.New("request cannot be nil") } + flowKey := req.FlowKey() + fairnessID := flowKey.ID + priority := strconv.Itoa(flowKey.Priority) + metrics.IncFlowControlQueueSize(fairnessID, priority) + defer metrics.DecFlowControlQueueSize(fairnessID, priority) + // 1. Create the derived context that governs this request's lifecycle (Parent Cancellation + TTL). reqCtx, cancel, enqueueTime := fc.createRequestContext(ctx, req) defer cancel() @@ -216,7 +224,6 @@ func (fc *FlowController) EnqueueAndWait( // 2. Enter the distribution loop to find a home for the request. // This loop is responsible for retrying on ErrShardDraining. for { - select { // Non-blocking check on controller lifecycle. case <-fc.parentCtx.Done(): return types.QueueOutcomeRejectedOther, fmt.Errorf("%w: %w", types.ErrRejected, types.ErrFlowControllerNotRunning) diff --git a/pkg/epp/flowcontrol/controller/internal/item.go b/pkg/epp/flowcontrol/controller/internal/item.go index 31a28d473..f0d5d3286 100644 --- a/pkg/epp/flowcontrol/controller/internal/item.go +++ b/pkg/epp/flowcontrol/controller/internal/item.go @@ -20,11 +20,13 @@ import ( "context" "errors" "fmt" + "strconv" "sync" "sync/atomic" "time" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" ) // FinalState encapsulates the terminal outcome of a FlowItem's lifecycle. @@ -154,6 +156,10 @@ func (fi *FlowItem) finalizeInternal(outcome types.QueueOutcome, err error) { // Atomically store the pointer. This is the critical memory barrier that publishes the state safely. fi.finalState.Store(finalState) + duration := time.Since(fi.enqueueTime) + flowKey := fi.originalRequest.FlowKey() + metrics.RecordFlowControlRequestQueueDuration(flowKey.ID, strconv.Itoa(flowKey.Priority), outcome.String(), duration) + fi.done <- finalState close(fi.done) }