Skip to content
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
5945cfb
partial draft
capri-xiyue Oct 31, 2025
370f1a3
refactor
capri-xiyue Nov 2, 2025
85e4622
fixed some ut
capri-xiyue Nov 6, 2025
579b17e
make epp controller ut pass
capri-xiyue Nov 6, 2025
e9d704d
make ut pass
capri-xiyue Nov 6, 2025
42cb218
fixed build
capri-xiyue Nov 7, 2025
943c53d
fixed build
capri-xiyue Nov 7, 2025
1e38821
fixed build failure
capri-xiyue Nov 10, 2025
30fd667
fixed lint
capri-xiyue Nov 10, 2025
9e85377
fix format
capri-xiyue Nov 10, 2025
43c87fa
merge conflicts
capri-xiyue Nov 14, 2025
0267569
fixed import format
capri-xiyue Nov 14, 2025
200dbf4
rename and refactor
capri-xiyue Nov 17, 2025
9d1514a
added epp name in env
capri-xiyue Nov 17, 2025
ba89d24
rename to endpointPool
capri-xiyue Nov 18, 2025
568e9ee
merge conflict
capri-xiyue Nov 18, 2025
ba90213
refactor in ut
capri-xiyue Nov 18, 2025
bc6a4c6
fixed format
capri-xiyue Nov 18, 2025
93ab791
fixed format
capri-xiyue Nov 18, 2025
c3ac6f2
changed error message
capri-xiyue Nov 18, 2025
c4b8c32
changed error message
capri-xiyue Nov 18, 2025
513590d
debug
capri-xiyue Nov 18, 2025
84b2275
remove debug logging
capri-xiyue Nov 18, 2025
0bd692f
fixed format
capri-xiyue Nov 18, 2025
9b7656d
merge conflicts
capri-xiyue Nov 18, 2025
72bb590
fixed import
capri-xiyue Nov 18, 2025
71f9fe5
updated to use epp name instead of pod name
capri-xiyue Nov 18, 2025
770f96c
fixed compiler
capri-xiyue Nov 18, 2025
a96edb0
verify
capri-xiyue Nov 18, 2025
1f15673
don't set endpointpool in datastore for inferencepool at start
capri-xiyue Nov 18, 2025
d079423
rename endpoints to endpointsmeta
capri-xiyue Nov 18, 2025
76b4eaa
rename import package
capri-xiyue Nov 18, 2025
2c67bf8
rename test utility
capri-xiyue Nov 18, 2025
7798ebd
added logging info
capri-xiyue Nov 19, 2025
1167364
resolve merge conflicts
capri-xiyue Nov 19, 2025
e33233b
change endpointpool struct
capri-xiyue Nov 20, 2025
86ac1f8
fixed variable naming
capri-xiyue Nov 20, 2025
11a8a68
fixed linter
capri-xiyue Nov 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 153 additions & 36 deletions cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ import (
"net/http"
"net/http/pprof"
"os"
"regexp"
"runtime"
"strconv"
"strings"
"sync/atomic"

"github.com/go-logr/logr"
Expand All @@ -34,16 +37,18 @@ import (
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
healthPb "google.golang.org/grpc/health/grpc_health_v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/rest"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

configapi "sigs.k8s.io/gateway-api-inference-extension/apix/config/v1alpha1"
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
"sigs.k8s.io/gateway-api-inference-extension/pkg/common"
Expand Down Expand Up @@ -111,6 +116,8 @@ var (
poolName = flag.String("pool-name", runserver.DefaultPoolName, "Name of the InferencePool this Endpoint Picker is associated with.")
poolGroup = flag.String("pool-group", runserver.DefaultPoolGroup, "group of the InferencePool this Endpoint Picker is associated with.")
poolNamespace = flag.String("pool-namespace", "", "Namespace of the InferencePool this Endpoint Picker is associated with.")
endpointSelector = flag.String("endpoint-selector", "", "selector to filter model server pods on, only key=value paris is supported. Format: a comma-separated list of key value paris, e.g., 'app=vllm-llama3-8b-instruct,env=prod'.")
endpointTargetPorts = flag.String("endpoint-target-ports", "", "target ports of model server pods. Format: a comma-separated list of numbers, e.g., '3000,3001,3002'")
logVerbosity = flag.Int("v", logging.DEFAULT, "number for the log level verbosity")
secureServing = flag.Bool("secure-serving", runserver.DefaultSecureServing, "Enables secure serving. Defaults to true.")
healthChecking = flag.Bool("health-checking", runserver.DefaultHealthChecking, "Enables health checking")
Expand Down Expand Up @@ -231,16 +238,25 @@ func (r *Runner) Run(ctx context.Context) error {
if err != nil {
return err
}
datastore := datastore.NewDatastore(ctx, epf, int32(*modelServerMetricsPort))

eppConfig, err := r.parseConfigurationPhaseTwo(ctx, rawConfig, datastore)
gknn, err := extractGKNN()
if err != nil {
setupLog.Error(err, "Failed to extract GKNN")
return err
}
ds, err := setupDataStore(setupLog, ctx, epf, int32(*modelServerMetricsPort), *gknn)
if err != nil {
setupLog.Error(err, "Failed to setup datastore")
return err
}
eppConfig, err := r.parseConfigurationPhaseTwo(ctx, rawConfig, ds)
if err != nil {
setupLog.Error(err, "Failed to parse configuration")
return err
}

// --- Setup Metrics Server ---
r.customCollectors = append(r.customCollectors, collectors.NewInferencePoolMetricsCollector(datastore))
r.customCollectors = append(r.customCollectors, collectors.NewInferencePoolMetricsCollector(ds))
metrics.Register(r.customCollectors...)
metrics.RecordInferenceExtensionInfo(version.CommitSHA, version.BuildRef)
// Register metrics handler.
Expand All @@ -259,34 +275,10 @@ func (r *Runner) Run(ctx context.Context) error {
}(),
}

// Determine pool namespace: if --pool-namespace is non-empty, use it; else NAMESPACE env var; else default
resolvePoolNamespace := func() string {
if *poolNamespace != "" {
return *poolNamespace
}
if nsEnv := os.Getenv("NAMESPACE"); nsEnv != "" {
return nsEnv
}
return runserver.DefaultPoolNamespace
}
resolvedPoolNamespace := resolvePoolNamespace()
poolNamespacedName := types.NamespacedName{
Name: *poolName,
Namespace: resolvedPoolNamespace,
}
poolGroupKind := schema.GroupKind{
Group: *poolGroup,
Kind: "InferencePool",
}
poolGKNN := common.GKNN{
NamespacedName: poolNamespacedName,
GroupKind: poolGroupKind,
}

isLeader := &atomic.Bool{}
isLeader.Store(false)

mgr, err := runserver.NewDefaultManager(poolGKNN, cfg, metricsServerOptions, *haEnableLeaderElection)
mgr, err := runserver.NewDefaultManager(*gknn, cfg, metricsServerOptions, *haEnableLeaderElection)
if err != nil {
setupLog.Error(err, "Failed to create controller manager")
return err
Expand Down Expand Up @@ -353,14 +345,17 @@ func (r *Runner) Run(ctx context.Context) error {
admissionController = requestcontrol.NewLegacyAdmissionController(saturationDetector)
}

director := requestcontrol.NewDirectorWithConfig(datastore, scheduler, admissionController, r.requestControlConfig)
director := requestcontrol.NewDirectorWithConfig(
ds,
scheduler,
admissionController,
r.requestControlConfig)

// --- Setup ExtProc Server Runner ---
serverRunner := &runserver.ExtProcServerRunner{
GrpcPort: *grpcPort,
PoolNamespacedName: poolNamespacedName,
PoolGKNN: poolGKNN,
Datastore: datastore,
GKNN: *gknn,
Datastore: ds,
SecureServing: *secureServing,
HealthChecking: *healthChecking,
CertPath: *certPath,
Expand All @@ -377,7 +372,7 @@ func (r *Runner) Run(ctx context.Context) error {

// --- Add Runnables to Manager ---
// Register health server.
if err := registerHealthServer(mgr, ctrl.Log.WithName("health"), datastore, *grpcHealthPort, isLeader, *haEnableLeaderElection); err != nil {
if err := registerHealthServer(mgr, ctrl.Log.WithName("health"), ds, *grpcHealthPort, isLeader, *haEnableLeaderElection); err != nil {
return err
}

Expand All @@ -397,6 +392,31 @@ func (r *Runner) Run(ctx context.Context) error {
return nil
}

func setupDataStore(setupLog logr.Logger, ctx context.Context, epFactory datalayer.EndpointFactory, modelServerMetricsPort int32, gknn common.GKNN) (datastore.Datastore, error) {
if gknn.Kind == "InferencePool" {
return datastore.NewDatastore(ctx, epFactory, modelServerMetricsPort), nil
}
if gknn.Kind == "Deployment" {
endpointPool := datalayer.NewEndpointPool(true, gknn)
labelsMap, err := labels.ConvertSelectorToLabelsMap(*endpointSelector)
if err != nil {
setupLog.Error(err, "Failed to parse flag %q with error: %w", "endpoint-selector", err)
return nil, err
}
endpointPool.EndpointMeta.Selector = labelsMap
endpointPool.EndpointMeta.TargetPorts, err = strToUniqueIntSlice(*endpointTargetPorts)
if err != nil {
setupLog.Error(err, "Failed to parse flag %q with error: %w", "endpoint-target-ports", err)
return nil, err
}

endpointPoolOption := datastore.WithEndpointPool(endpointPool)
return datastore.NewDatastore(ctx, epFactory, modelServerMetricsPort, endpointPoolOption), nil
}
return nil, fmt.Errorf("invalid gknn kind %s", gknn.Kind)

}

// registerInTreePlugins registers the factory functions of all known plugins
func (r *Runner) registerInTreePlugins() {
plugins.Register(prefix.PrefixCachePluginType, prefix.PrefixCachePluginFactory)
Expand Down Expand Up @@ -635,9 +655,19 @@ func registerHealthServer(mgr manager.Manager, logger logr.Logger, ds datastore.
}

func validateFlags() error {
if *poolName == "" {
return fmt.Errorf("required %q flag not set", "poolName")
if (*poolName != "" && *endpointSelector != "") || (*poolName == "" && *endpointSelector == "") {
return errors.New("either poolName or endpoint-selector must be set")
}
if *endpointSelector != "" {
targetPortsList, err := strToUniqueIntSlice(*endpointTargetPorts)
if err != nil {
return fmt.Errorf("unexpected value for %q flag with error %w", "endpoint-target-ports", err)
}
if len(targetPortsList) == 0 || len(targetPortsList) > 8 {
return fmt.Errorf("flag %q should have length from 1 to 8", "endpoint-target-ports")
}
}

if *configText != "" && *configFile != "" {
return fmt.Errorf("both the %q and %q flags can not be set at the same time", "configText", "configFile")
}
Expand All @@ -648,6 +678,34 @@ func validateFlags() error {
return nil
}

func strToUniqueIntSlice(s string) ([]int, error) {
seen := sets.NewInt()
var intList []int

if s == "" {
return intList, nil
}

strList := strings.Split(s, ",")

for _, str := range strList {
trimmedStr := strings.TrimSpace(str)
if trimmedStr == "" {
continue
}
portInt, err := strconv.Atoi(trimmedStr)
if err != nil {
return nil, fmt.Errorf("invalid number: '%s' is not an integer", trimmedStr)
}

if _, ok := seen[portInt]; !ok {
seen[portInt] = struct{}{}
intList = append(intList, portInt)
}
}
return intList, nil
}

func verifyMetricMapping(mapping backendmetrics.MetricMapping, logger logr.Logger) {
if mapping.TotalQueuedRequests == nil {
logger.Info("Not scraping metric: TotalQueuedRequests")
Expand Down Expand Up @@ -683,3 +741,62 @@ func setupPprofHandlers(mgr ctrl.Manager) error {
}
return nil
}

func extractDeploymentName(podName string) (string, error) {
regex := regexp.MustCompile(`^(.+)-[a-z0-9]+-[a-z0-9]+$`)

matches := regex.FindStringSubmatch(podName)
if len(matches) == 2 {
return matches[1], nil
}
return "", fmt.Errorf("failed to parse deployment name from pod name %s", podName)
}

func extractGKNN() (*common.GKNN, error) {
if *poolName != "" {
// Determine pool namespace: if --pool-namespace is non-empty, use it; else NAMESPACE env var; else default
resolvedPoolNamespace := resolvePoolNamespace()
poolNamespacedName := types.NamespacedName{
Name: *poolName,
Namespace: resolvedPoolNamespace,
}
poolGroupKind := schema.GroupKind{
Group: *poolGroup,
Kind: "InferencePool",
}
return &common.GKNN{
NamespacedName: poolNamespacedName,
GroupKind: poolGroupKind,
}, nil
}

if *endpointSelector != "" {
// Determine EPP namespace: NAMESPACE env var; else default
resolvedPoolNamespace := resolvePoolNamespace()
// Determine EPP name: POD_NAME env var
eppPodNameEnv := os.Getenv("POD_NAME")
if eppPodNameEnv == "" {
return nil, errors.New("failed to get environment variable POD_NAME")

}
eppName, err := extractDeploymentName(eppPodNameEnv)
if err != nil {
return nil, err
}
return &common.GKNN{
NamespacedName: types.NamespacedName{Namespace: resolvedPoolNamespace, Name: eppName},
GroupKind: schema.GroupKind{Kind: "Deployment", Group: "apps"},
}, nil
}
return nil, errors.New("can't construct gknn as both pool-name and endpoint-selector are missing")
}

func resolvePoolNamespace() string {
if *poolNamespace != "" {
return *poolNamespace
}
if nsEnv := os.Getenv("NAMESPACE"); nsEnv != "" {
return nsEnv
}
return runserver.DefaultPoolNamespace
}
4 changes: 4 additions & 0 deletions config/charts/inferencepool/templates/epp-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
{{- if .Values.inferenceExtension.tracing.enabled }}
- name: OTEL_SERVICE_NAME
value: "gateway-api-inference-extension"
Expand Down
6 changes: 3 additions & 3 deletions pkg/epp/backend/metrics/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func refreshPrometheusMetrics(logger logr.Logger, datastore datalayer.PoolInfo,
}

podTotalCount := len(podMetrics)
metrics.RecordInferencePoolAvgKVCache(pool.Name, kvCacheTotal/float64(podTotalCount))
metrics.RecordInferencePoolAvgQueueSize(pool.Name, float64(queueTotal/podTotalCount))
metrics.RecordInferencePoolReadyPods(pool.Name, float64(podTotalCount))
metrics.RecordInferencePoolAvgKVCache(pool.GKNN.Name, kvCacheTotal/float64(podTotalCount))
metrics.RecordInferencePoolAvgQueueSize(pool.GKNN.Name, float64(queueTotal/podTotalCount))
metrics.RecordInferencePoolReadyPods(pool.GKNN.Name, float64(podTotalCount))
}
5 changes: 2 additions & 3 deletions pkg/epp/backend/metrics/pod_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/types"

v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
)

Expand Down Expand Up @@ -86,8 +85,8 @@ func TestMetricsRefresh(t *testing.T) {

type fakeDataStore struct{}

func (f *fakeDataStore) PoolGet() (*v1.InferencePool, error) {
return &v1.InferencePool{Spec: v1.InferencePoolSpec{TargetPorts: []v1.Port{{Number: 8000}}}}, nil
func (f *fakeDataStore) PoolGet() (*datalayer.EndpointPool, error) {
return &datalayer.EndpointPool{}, nil
}

func (f *fakeDataStore) PodList(func(PodMetrics) bool) []PodMetrics {
Expand Down
4 changes: 2 additions & 2 deletions pkg/epp/controller/inferenceobjective_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ import (
"fmt"

"k8s.io/apimachinery/pkg/api/errors"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2"
"sigs.k8s.io/gateway-api-inference-extension/pkg/common"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
Expand Down Expand Up @@ -55,7 +55,7 @@ func (c *InferenceObjectiveReconciler) Reconcile(ctx context.Context, req ctrl.R
}

if notFound || !infObjective.DeletionTimestamp.IsZero() || infObjective.Spec.PoolRef.Name != v1alpha2.ObjectName(c.PoolGKNN.Name) || infObjective.Spec.PoolRef.Group != v1alpha2.Group(c.PoolGKNN.Group) {
// InferenceObjective object got deleted or changed the referenced pool.
// InferenceObjective object got deleted or changed the referenced inferencePool.
c.Datastore.ObjectiveDelete(req.NamespacedName)
return ctrl.Result{}, nil
}
Expand Down
Loading