Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion pkg/epp/flowcontrol/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"errors"
"fmt"
"slices"
"strconv"
"sync"
"time"

Expand All @@ -38,6 +39,7 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/controller/internal"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)

Expand Down Expand Up @@ -209,14 +211,19 @@ func (fc *FlowController) EnqueueAndWait(
return types.QueueOutcomeRejectedOther, errors.New("request cannot be nil")
}

flowKey := req.FlowKey()
fairnessID := flowKey.ID
priority := strconv.Itoa(flowKey.Priority)
metrics.IncFlowControlQueueSize(fairnessID, priority)
defer metrics.DecFlowControlQueueSize(fairnessID, priority)

// 1. Create the derived context that governs this request's lifecycle (Parent Cancellation + TTL).
reqCtx, cancel, enqueueTime := fc.createRequestContext(ctx, req)
defer cancel()

// 2. Enter the distribution loop to find a home for the request.
// This loop is responsible for retrying on ErrShardDraining.
for {

select { // Non-blocking check on controller lifecycle.
case <-fc.parentCtx.Done():
return types.QueueOutcomeRejectedOther, fmt.Errorf("%w: %w", types.ErrRejected, types.ErrFlowControllerNotRunning)
Expand Down
6 changes: 6 additions & 0 deletions pkg/epp/flowcontrol/controller/internal/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
)

// FinalState encapsulates the terminal outcome of a FlowItem's lifecycle.
Expand Down Expand Up @@ -154,6 +156,10 @@ func (fi *FlowItem) finalizeInternal(outcome types.QueueOutcome, err error) {
// Atomically store the pointer. This is the critical memory barrier that publishes the state safely.
fi.finalState.Store(finalState)

duration := time.Since(fi.enqueueTime)
flowKey := fi.originalRequest.FlowKey()
metrics.RecordFlowControlRequestQueueDuration(flowKey.ID, strconv.Itoa(flowKey.Priority), outcome.String(), duration)

fi.done <- finalState
close(fi.done)
}
Expand Down
41 changes: 41 additions & 0 deletions pkg/epp/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,28 @@ var (
},
[]string{"commit", "build_ref"},
)

// Flow Control Metrics
flowControlRequestQueueDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: InferenceExtension,
Name: "flow_control_request_queue_duration_seconds",
Help: metricsutil.HelpMsgWithStability("Distribution of the total time requests spend in the EPP flow control layer, measured from the start of the EnqueueAndWait call until a final outcome is reached.", compbasemetrics.ALPHA),
Buckets: []float64{
0.0001, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0,
},
},
[]string{"fairness_id", "priority", "outcome"},
)

flowControlQueueSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: InferenceExtension,
Name: "flow_control_queue_size",
Help: metricsutil.HelpMsgWithStability("Current number of requests being actively managed by the EPP flow control layer, from the start of the EnqueueAndWait call until a final outcome is reached.", compbasemetrics.ALPHA),
},
[]string{"fairness_id", "priority"},
)
)

var registerMetrics sync.Once
Expand All @@ -260,6 +282,8 @@ func Register(customCollectors ...prometheus.Collector) {
metrics.Registry.MustRegister(PrefixCacheSize)
metrics.Registry.MustRegister(PrefixCacheHitRatio)
metrics.Registry.MustRegister(PrefixCacheHitLength)
metrics.Registry.MustRegister(flowControlRequestQueueDuration)
metrics.Registry.MustRegister(flowControlQueueSize)
for _, collector := range customCollectors {
metrics.Registry.MustRegister(collector)
}
Expand All @@ -286,6 +310,8 @@ func Reset() {
PrefixCacheSize.Reset()
PrefixCacheHitRatio.Reset()
PrefixCacheHitLength.Reset()
flowControlRequestQueueDuration.Reset()
flowControlQueueSize.Reset()
}

// RecordRequstCounter records the number of requests.
Expand Down Expand Up @@ -414,3 +440,18 @@ func RecordPrefixCacheMatch(matchedLength, totalLength int) {
func RecordInferenceExtensionInfo(commitSha, buildRef string) {
InferenceExtensionInfo.WithLabelValues(commitSha, buildRef).Set(1)
}

// RecordFlowControlRequestQueueDuration records the duration a request spent in the Flow Control layer.
func RecordFlowControlRequestQueueDuration(fairnessID, priority, outcome string, duration time.Duration) {
flowControlRequestQueueDuration.WithLabelValues(fairnessID, priority, outcome).Observe(duration.Seconds())
}

// IncFlowControlQueueSize increments the Flow Control queue size gauge.
func IncFlowControlQueueSize(fairnessID, priority string) {
flowControlQueueSize.WithLabelValues(fairnessID, priority).Inc()
}

// DecFlowControlQueueSize decrements the Flow Control queue size gauge.
func DecFlowControlQueueSize(fairnessID, priority string) {
flowControlQueueSize.WithLabelValues(fairnessID, priority).Dec()
}
Loading