Skip to content

Commit 3a375de

Browse files
committed
Add request enqueue latency metric
1 parent ecd30fa commit 3a375de

File tree

4 files changed

+245
-1
lines changed

4 files changed

+245
-1
lines changed

pkg/epp/flowcontrol/controller/internal/processor.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"errors"
2222
"fmt"
23+
"strconv"
2324
"sync"
2425
"sync/atomic"
2526
"time"
@@ -208,14 +209,24 @@ func (sp *ShardProcessor) Run(ctx context.Context) {
208209
// enqueue processes an item received from the enqueueChan.
209210
// It handles capacity checks, checks for external finalization, and either admits the item to a queue or rejects it.
210211
func (sp *ShardProcessor) enqueue(item *FlowItem) {
212+
213+
outcome := item.FinalState()
211214
req := item.OriginalRequest()
212215
key := req.FlowKey()
216+
priorityStr := strconv.Itoa(key.Priority)
217+
218+
startTime := time.Now()
219+
220+
defer func() {
221+
metrics.RecordFlowControlRequestEnqueueDuration(priorityStr, outcome.Outcome.String(), time.Since(startTime))
222+
223+
}()
213224

214225
// --- Optimistic External Finalization Check ---
215226
// Check if the item was finalized by the Controller (due to TTL/cancellation) while it was buffered in enqueueChan.
216227
// This is an optimistic check to avoid unnecessary processing on items already considered dead.
217228
// The ultimate guarantee of cleanup for any races is the runCleanupSweep mechanism.
218-
if finalState := item.FinalState(); finalState != nil {
229+
if finalState := outcome; finalState != nil {
219230
sp.logger.V(logutil.TRACE).Info("Item finalized externally before processing, discarding.",
220231
"outcome", finalState.Outcome, "err", finalState.Err, "flowKey", key, "reqID", req.ID())
221232
return

pkg/epp/metrics/metrics.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,18 @@ var (
399399
append([]string{"fairness_id", "priority", "outcome", "inference_pool"}, ModelLabels...),
400400
)
401401

402+
flowControlRequestEnqueueDuration = prometheus.NewHistogramVec(
403+
prometheus.HistogramOpts{
404+
Subsystem: InferenceExtension,
405+
Name: "flow_control_request_enqueue_duration_seconds",
406+
Help: metricsutil.HelpMsgWithStability("Distribution of the time taken to enqueue requests by the EPP flow control layer.", compbasemetrics.ALPHA),
407+
Buckets: []float64{
408+
5, 10, 25, 50, 100, 250, 500, 1000,
409+
},
410+
},
411+
[]string{"priority", "outcome"},
412+
)
413+
402414
flowControlQueueSize = prometheus.NewGaugeVec(
403415
prometheus.GaugeOpts{
404416
Subsystem: InferenceExtension,
@@ -461,6 +473,7 @@ func Register(customCollectors ...prometheus.Collector) {
461473
metrics.Registry.MustRegister(PrefixCacheHitLength)
462474
metrics.Registry.MustRegister(flowControlRequestQueueDuration)
463475
metrics.Registry.MustRegister(flowControlQueueSize)
476+
metrics.Registry.MustRegister(flowControlRequestEnqueueDuration)
464477
metrics.Registry.MustRegister(inferenceModelRewriteDecisionsTotal)
465478
for _, collector := range customCollectors {
466479
metrics.Registry.MustRegister(collector)
@@ -507,6 +520,7 @@ func Reset() {
507520
PrefixCacheHitLength.Reset()
508521
flowControlRequestQueueDuration.Reset()
509522
flowControlQueueSize.Reset()
523+
flowControlRequestEnqueueDuration.Reset()
510524
inferenceModelRewriteDecisionsTotal.Reset()
511525
}
512526

@@ -787,6 +801,16 @@ func RecordFlowControlRequestQueueDuration(
787801
).Observe(duration.Seconds())
788802
}
789803

804+
// RecordFlowControlRequestQueueDuration records the duration a request was in the enqueuing process in the Flow Control layer.
805+
func RecordFlowControlRequestEnqueueDuration(
806+
priority string, outcome string,
807+
duration time.Duration,
808+
) {
809+
flowControlRequestEnqueueDuration.WithLabelValues(
810+
priority, outcome,
811+
).Observe(duration.Seconds())
812+
}
813+
790814
// IncFlowControlQueueSize increments the Flow Control queue size gauge.
791815
func IncFlowControlQueueSize(fairnessID, priority, inferencePool, modelName, targetModelName string) {
792816
flowControlQueueSize.WithLabelValues(fairnessID, priority, inferencePool, modelName, targetModelName).Inc()

pkg/epp/metrics/metrics_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -685,6 +685,73 @@ func TestSchedulerE2ELatency(t *testing.T) {
685685
}
686686
}
687687

688+
func TestFlowControlEnqueueDurationMetric(t *testing.T) {
689+
Reset()
690+
691+
scenarios := []struct {
692+
name string
693+
priorities []string
694+
outcomes []string
695+
durations []time.Duration
696+
}{
697+
{
698+
name: "multiple enqueue latencies",
699+
priorities: []string{
700+
"1", "1", "1", "1", "1", "1", "1",
701+
"2", "2", "2", "2", "2", "2", "2",
702+
},
703+
outcomes: []string{
704+
"Dispatched", "NotYetFinalized", "RejectedCapacity", "RejectedOther", "EvictedTTL", "EvictedContextCancelled", "EvictedOther",
705+
"Dispatched", "NotYetFinalized", "RejectedCapacity", "RejectedOther", "EvictedTTL", "EvictedContextCancelled", "EvictedOther",
706+
},
707+
durations: []time.Duration{
708+
200 * time.Nanosecond,
709+
800 * time.Nanosecond,
710+
1500 * time.Nanosecond,
711+
3 * time.Nanosecond,
712+
8 * time.Nanosecond,
713+
15 * time.Nanosecond,
714+
30 * time.Nanosecond,
715+
75 * time.Nanosecond,
716+
150 * time.Nanosecond,
717+
200 * time.Nanosecond,
718+
800 * time.Nanosecond,
719+
1500 * time.Nanosecond,
720+
3 * time.Nanosecond,
721+
15 * time.Nanosecond,
722+
},
723+
},
724+
}
725+
726+
for _, scenario := range scenarios {
727+
t.Run(scenario.name, func(t *testing.T) {
728+
// FIX: Loop through the indices to record one event at a time
729+
for i := range scenario.priorities {
730+
RecordFlowControlRequestEnqueueDuration(
731+
scenario.priorities[i],
732+
scenario.outcomes[i],
733+
scenario.durations[i],
734+
)
735+
}
736+
737+
// Validate results
738+
func() {
739+
wantEnqueueLatency, err := os.Open("testdata/flow_control_enqueue_duration_seconds_metric")
740+
if err != nil {
741+
t.Fatal(err)
742+
}
743+
defer wantEnqueueLatency.Close()
744+
745+
if err := testutil.GatherAndCompare(metrics.Registry, wantEnqueueLatency, "inference_extension_flow_control_enqueue_duration_seconds"); err != nil {
746+
t.Error(err)
747+
}
748+
}()
749+
})
750+
}
751+
}
752+
753+
// TODO (7028): Research histogram bins using real-world data to ensure they are optimal.
754+
688755
func TestSchedulerAttemptsTotal(t *testing.T) {
689756

690757
scenarios := []struct {

0 commit comments

Comments
 (0)