From 97e8f2f9b3cc428d007fcfbbad19762cb64ef363 Mon Sep 17 00:00:00 2001 From: Shmuel Kallner Date: Sun, 16 Nov 2025 12:25:03 +0200 Subject: [PATCH 1/5] Renamed PodInfo EndpointMetadata Signed-off-by: Shmuel Kallner --- .../{podinfo.go => endpoint_metadata.go} | 40 +++++++++---------- ...info_test.go => endpoint_metadata_test.go} | 8 ++-- 2 files changed, 24 insertions(+), 24 deletions(-) rename pkg/epp/datalayer/{podinfo.go => endpoint_metadata.go} (63%) rename pkg/epp/datalayer/{podinfo_test.go => endpoint_metadata_test.go} (92%) diff --git a/pkg/epp/datalayer/podinfo.go b/pkg/epp/datalayer/endpoint_metadata.go similarity index 63% rename from pkg/epp/datalayer/podinfo.go rename to pkg/epp/datalayer/endpoint_metadata.go index 7cbd6d886..70c78e0df 100644 --- a/pkg/epp/datalayer/podinfo.go +++ b/pkg/epp/datalayer/endpoint_metadata.go @@ -30,8 +30,8 @@ type Addressable interface { GetNamespacedName() types.NamespacedName } -// PodInfo represents the relevant Kubernetes Pod state of an inference server. -type PodInfo struct { +// EndpointMetadata represents the relevant Kubernetes Pod state of an inference server. +type EndpointMetadata struct { NamespacedName types.NamespacedName PodName string Address string @@ -40,16 +40,16 @@ type PodInfo struct { Labels map[string]string } -// String returns a string representation of the pod. -func (p *PodInfo) String() string { - if p == nil { +// String returns a string representation of the endpoint. +func (e *EndpointMetadata) String() string { + if e == nil { return "" } - return fmt.Sprintf("%+v", *p) + return fmt.Sprintf("%+v", *e) } // Clone returns a full copy of the object. -func (p *PodInfo) Clone() *PodInfo { +func (p *EndpointMetadata) Clone() *EndpointMetadata { if p == nil { return nil } @@ -58,7 +58,7 @@ func (p *PodInfo) Clone() *PodInfo { for key, value := range p.Labels { clonedLabels[key] = value } - return &PodInfo{ + return &EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: p.NamespacedName.Name, Namespace: p.NamespacedName.Namespace, @@ -71,22 +71,22 @@ func (p *PodInfo) Clone() *PodInfo { } } -// GetNamespacedName gets the namespace name of the Pod. -func (p *PodInfo) GetNamespacedName() types.NamespacedName { - return p.NamespacedName +// GetNamespacedName gets the namespace name of the Endpoint. +func (e *EndpointMetadata) GetNamespacedName() types.NamespacedName { + return e.NamespacedName } -// GetIPAddress returns the Pod's IP address. -func (p *PodInfo) GetIPAddress() string { - return p.Address +// GetIPAddress returns the Endpoint's IP address. +func (e *EndpointMetadata) GetIPAddress() string { + return e.Address } -// GetPort returns the Pod's inference port. -func (p *PodInfo) GetPort() string { - return p.Port +// GetPort returns the Endpoint's inference port. +func (e *EndpointMetadata) GetPort() string { + return e.Port } -// GetMetricsHost returns the pod's metrics host (ip:port) -func (p *PodInfo) GetMetricsHost() string { - return p.MetricsHost +// GetMetricsHost returns the Endpoint's metrics host (ip:port) +func (e *EndpointMetadata) GetMetricsHost() string { + return e.MetricsHost } diff --git a/pkg/epp/datalayer/podinfo_test.go b/pkg/epp/datalayer/endpoint_metadata_test.go similarity index 92% rename from pkg/epp/datalayer/podinfo_test.go rename to pkg/epp/datalayer/endpoint_metadata_test.go index baf804a22..ad25a29f4 100644 --- a/pkg/epp/datalayer/podinfo_test.go +++ b/pkg/epp/datalayer/endpoint_metadata_test.go @@ -48,14 +48,14 @@ var ( PodIP: podip, }, } - expected = &PodInfo{ + expected = &EndpointMetadata{ NamespacedName: types.NamespacedName{Name: name, Namespace: namespace}, Address: podip, Labels: labels, } ) -func TestPodInfoClone(t *testing.T) { +func TestEndpointMetadataClone(t *testing.T) { clone := expected.Clone() assert.NotSame(t, expected, clone) if diff := cmp.Diff(expected, clone); diff != "" { @@ -67,7 +67,7 @@ func TestPodInfoClone(t *testing.T) { } func TestPodInfoString(t *testing.T) { - podinfo := PodInfo{ + endpointMetadata := EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: pod.Name, Namespace: pod.Namespace, @@ -79,7 +79,7 @@ func TestPodInfoString(t *testing.T) { Labels: labels, } - s := podinfo.String() + s := endpointMetadata.String() assert.Contains(t, s, name) assert.Contains(t, s, namespace) assert.Contains(t, s, podip) From 510a97f59fc84945951dfbfb3b40a5cb07f70ef5 Mon Sep 17 00:00:00 2001 From: Shmuel Kallner Date: Sun, 16 Nov 2025 12:26:54 +0200 Subject: [PATCH 2/5] Non test code updates due to PodInfo rename Signed-off-by: Shmuel Kallner --- pkg/epp/backend/metrics/pod_metrics.go | 14 +++--- pkg/epp/backend/metrics/types.go | 2 +- pkg/epp/backend/pod.go | 2 +- pkg/epp/datalayer/collector.go | 2 +- pkg/epp/datalayer/endpoint.go | 18 ++++---- pkg/epp/datalayer/factory.go | 10 ++--- pkg/epp/datalayer/metrics/datasource.go | 4 +- pkg/epp/datastore/datastore.go | 44 +++++++++---------- pkg/epp/metrics/collectors/inference_pool.go | 2 +- pkg/epp/requestcontrol/director.go | 8 ++-- .../saturationdetector/saturationdetector.go | 4 +- .../framework/plugins/multi/prefix/plugin.go | 12 ++--- 12 files changed, 61 insertions(+), 61 deletions(-) diff --git a/pkg/epp/backend/metrics/pod_metrics.go b/pkg/epp/backend/metrics/pod_metrics.go index 4d22ef18c..c5d1393cd 100644 --- a/pkg/epp/backend/metrics/pod_metrics.go +++ b/pkg/epp/backend/metrics/pod_metrics.go @@ -53,10 +53,10 @@ type PodMetricsClient interface { } func (pm *podMetrics) String() string { - return fmt.Sprintf("Pod: %v; Metrics: %v", pm.GetPod(), pm.GetMetrics()) + return fmt.Sprintf("Pod: %v; Metrics: %v", pm.GetMetadata(), pm.GetMetrics()) } -func (pm *podMetrics) GetPod() *backend.Pod { +func (pm *podMetrics) GetMetadata() *backend.Pod { return pm.pod.Load() } @@ -64,7 +64,7 @@ func (pm *podMetrics) GetMetrics() *MetricsState { return pm.metrics.Load() } -func (pm *podMetrics) UpdatePod(pod *datalayer.PodInfo) { +func (pm *podMetrics) UpdateMetadata(pod *datalayer.EndpointMetadata) { pm.pod.Store(pod) } @@ -73,7 +73,7 @@ func (pm *podMetrics) UpdatePod(pod *datalayer.PodInfo) { func (pm *podMetrics) startRefreshLoop(ctx context.Context) { pm.startOnce.Do(func() { go func() { - pm.logger.V(logutil.DEFAULT).Info("Starting refresher", "pod", pm.GetPod()) + pm.logger.V(logutil.DEFAULT).Info("Starting refresher", "pod", pm.GetMetadata()) ticker := time.NewTicker(pm.interval) defer ticker.Stop() for { @@ -84,7 +84,7 @@ func (pm *podMetrics) startRefreshLoop(ctx context.Context) { return case <-ticker.C: // refresh metrics periodically if err := pm.refreshMetrics(); err != nil { - pm.logger.V(logutil.TRACE).Error(err, "Failed to refresh metrics", "pod", pm.GetPod()) + pm.logger.V(logutil.TRACE).Error(err, "Failed to refresh metrics", "pod", pm.GetMetadata()) } } } @@ -95,7 +95,7 @@ func (pm *podMetrics) startRefreshLoop(ctx context.Context) { func (pm *podMetrics) refreshMetrics() error { ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout) defer cancel() - updated, err := pm.pmc.FetchMetrics(ctx, pm.GetPod(), pm.GetMetrics()) + updated, err := pm.pmc.FetchMetrics(ctx, pm.GetMetadata(), pm.GetMetrics()) if err != nil { pm.logger.V(logutil.TRACE).Info("Failed to refreshed metrics:", "err", err) } @@ -115,7 +115,7 @@ func (pm *podMetrics) refreshMetrics() error { } func (pm *podMetrics) stopRefreshLoop() { - pm.logger.V(logutil.DEFAULT).Info("Stopping refresher", "pod", pm.GetPod()) + pm.logger.V(logutil.DEFAULT).Info("Stopping refresher", "pod", pm.GetMetadata()) pm.stopOnce.Do(func() { close(pm.done) }) diff --git a/pkg/epp/backend/metrics/types.go b/pkg/epp/backend/metrics/types.go index 99f15a20f..9c8e4d52c 100644 --- a/pkg/epp/backend/metrics/types.go +++ b/pkg/epp/backend/metrics/types.go @@ -52,7 +52,7 @@ type PodMetricsFactory struct { refreshMetricsInterval time.Duration } -func (f *PodMetricsFactory) NewEndpoint(parentCtx context.Context, pod *datalayer.PodInfo, ds datalayer.PoolInfo) PodMetrics { +func (f *PodMetricsFactory) NewEndpoint(parentCtx context.Context, pod *datalayer.EndpointMetadata, ds datalayer.PoolInfo) PodMetrics { pm := &podMetrics{ pmc: f.pmc, ds: ds, diff --git a/pkg/epp/backend/pod.go b/pkg/epp/backend/pod.go index 324a7479a..e24494042 100644 --- a/pkg/epp/backend/pod.go +++ b/pkg/epp/backend/pod.go @@ -20,4 +20,4 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" ) -type Pod = datalayer.PodInfo +type Pod = datalayer.EndpointMetadata diff --git a/pkg/epp/datalayer/collector.go b/pkg/epp/datalayer/collector.go index 86a8f7b4e..f7be8125b 100644 --- a/pkg/epp/datalayer/collector.go +++ b/pkg/epp/datalayer/collector.go @@ -88,7 +88,7 @@ func (c *Collector) Start(ctx context.Context, ticker Ticker, ep Endpoint, sourc started := false c.startOnce.Do(func() { - logger := log.FromContext(ctx).WithValues("endpoint", ep.GetPod().GetIPAddress()) + logger := log.FromContext(ctx).WithValues("endpoint", ep.GetMetadata().GetIPAddress()) c.ctx, c.cancel = context.WithCancel(ctx) started = true ready = make(chan struct{}) diff --git a/pkg/epp/datalayer/endpoint.go b/pkg/epp/datalayer/endpoint.go index 2d262eb3a..60388890e 100644 --- a/pkg/epp/datalayer/endpoint.go +++ b/pkg/epp/datalayer/endpoint.go @@ -21,10 +21,10 @@ import ( "sync/atomic" ) -// EndpointPodState allows management of the Pod related attributes. -type EndpointPodState interface { - GetPod() *PodInfo - UpdatePod(*PodInfo) +// EndpointMetaState allows management of the EndpointMetadata related attributes. +type EndpointMetaState interface { + GetMetadata() *EndpointMetadata + UpdateMetadata(*EndpointMetadata) GetAttributes() *Attributes } @@ -37,14 +37,14 @@ type EndpointMetricsState interface { // Endpoint represents an inference serving endpoint and its related attributes. type Endpoint interface { fmt.Stringer - EndpointPodState + EndpointMetaState EndpointMetricsState AttributeMap } // ModelServer is an implementation of the Endpoint interface. type ModelServer struct { - pod atomic.Pointer[PodInfo] + pod atomic.Pointer[EndpointMetadata] metrics atomic.Pointer[Metrics] attributes *Attributes } @@ -68,14 +68,14 @@ func NewEndpoint(pod *PodInfo, metrics *Metrics) *ModelServer { // String returns a representation of the ModelServer. For brevity, only names of // extended attributes are returned and not their values. func (srv *ModelServer) String() string { - return fmt.Sprintf("Pod: %v; Metrics: %v; Attributes: %v", srv.GetPod(), srv.GetMetrics(), srv.Keys()) + return fmt.Sprintf("Pod: %v; Metrics: %v; Attributes: %v", srv.GetMetadata(), srv.GetMetrics(), srv.Keys()) } -func (srv *ModelServer) GetPod() *PodInfo { +func (srv *ModelServer) GetMetadata() *EndpointMetadata { return srv.pod.Load() } -func (srv *ModelServer) UpdatePod(pod *PodInfo) { +func (srv *ModelServer) UpdateMetadata(pod *EndpointMetadata) { srv.pod.Store(pod) } diff --git a/pkg/epp/datalayer/factory.go b/pkg/epp/datalayer/factory.go index 58da604a0..53493e0f0 100644 --- a/pkg/epp/datalayer/factory.go +++ b/pkg/epp/datalayer/factory.go @@ -48,7 +48,7 @@ type PoolInfo interface { // providing methods to allocate and retire endpoints. This can potentially be used for // pooled memory or other management chores in the implementation. type EndpointFactory interface { - NewEndpoint(parent context.Context, inpod *PodInfo, poolinfo PoolInfo) Endpoint + NewEndpoint(parent context.Context, inEnpointMetadata *EndpointMetadata, poolinfo PoolInfo) Endpoint ReleaseEndpoint(ep Endpoint) } @@ -73,8 +73,8 @@ func NewEndpointFactory(sources []DataSource, refreshMetricsInterval time.Durati // NewEndpoint implements EndpointFactory.NewEndpoint. // Creates a new endpoint and starts its associated collector with its own ticker. // Guards against multiple concurrent calls for the same endpoint. -func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inpod *PodInfo, _ PoolInfo) Endpoint { - key := types.NamespacedName{Namespace: inpod.GetNamespacedName().Namespace, Name: inpod.GetNamespacedName().Name} +func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inEndpointMetadata *EndpointMetadata, _ PoolInfo) Endpoint { + key := types.NamespacedName{Namespace: inEndpointMetadata.GetNamespacedName().Namespace, Name: inEndpointMetadata.GetNamespacedName().Name} logger := log.FromContext(parent).WithValues("pod", key) if _, ok := lc.collectors.Load(key); ok { @@ -82,7 +82,7 @@ func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inpod *PodInfo, return nil } - endpoint := NewEndpoint(inpod, nil) + endpoint := NewEndpoint(inEndpointMetadata, nil) collector := NewCollector() // TODO or full backward compatibility, set the logger and poolinfo if _, loaded := lc.collectors.LoadOrStore(key, collector); loaded { @@ -104,7 +104,7 @@ func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inpod *PodInfo, // ReleaseEndpoint implements EndpointFactory.ReleaseEndpoint // Stops the collector and cleans up resources for the endpoint func (lc *EndpointLifecycle) ReleaseEndpoint(ep Endpoint) { - key := ep.GetPod().GetNamespacedName() + key := ep.GetMetadata().GetNamespacedName() if value, ok := lc.collectors.LoadAndDelete(key); ok { collector := value.(*Collector) diff --git a/pkg/epp/datalayer/metrics/datasource.go b/pkg/epp/datalayer/metrics/datasource.go index 81723d4e0..6b64d1032 100644 --- a/pkg/epp/datalayer/metrics/datasource.go +++ b/pkg/epp/datalayer/metrics/datasource.go @@ -98,8 +98,8 @@ func (dataSrc *DataSource) AddExtractor(extractor datalayer.Extractor) error { // Collect is triggered by the data layer framework to fetch potentially new // MSP metrics data for an endpoint. func (dataSrc *DataSource) Collect(ctx context.Context, ep datalayer.Endpoint) error { - target := dataSrc.getMetricsEndpoint(ep.GetPod()) - families, err := dataSrc.client.Get(ctx, target, ep.GetPod()) + target := dataSrc.getMetricsEndpoint(ep.GetMetadata()) + families, err := dataSrc.client.Get(ctx, target, ep.GetMetadata()) if err != nil { return err diff --git a/pkg/epp/datastore/datastore.go b/pkg/epp/datastore/datastore.go index 5d6ff751b..36276a020 100644 --- a/pkg/epp/datastore/datastore.go +++ b/pkg/epp/datastore/datastore.go @@ -200,13 +200,13 @@ func (ds *datastore) ObjectiveGetAll() []*v1alpha2.InferenceObjective { // /// Pods/endpoints APIs /// // TODO: add a flag for callers to specify the staleness threshold for metrics. // ref: https://github.com/kubernetes-sigs/gateway-api-inference-extension/pull/1046#discussion_r2246351694 -func (ds *datastore) PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics { - res := []backendmetrics.PodMetrics{} +func (ds *datastore) PodList(predicate func(datalayer.Endpoint) bool) []datalayer.Endpoint { + res := []datalayer.Endpoint{} ds.pods.Range(func(k, v any) bool { - pm := v.(backendmetrics.PodMetrics) - if predicate(pm) { - res = append(res, pm) + ep := v.(datalayer.Endpoint) + if predicate(ep) { + res = append(res, ep) } return true }) @@ -228,14 +228,14 @@ func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool { if len(ds.pool.Spec.TargetPorts) == 1 { modelServerMetricsPort = int(ds.modelServerMetricsPort) } - pods := []*datalayer.PodInfo{} + pods := []*datalayer.EndpointMetadata{} for idx, port := range ds.pool.Spec.TargetPorts { metricsPort := modelServerMetricsPort if metricsPort == 0 { metricsPort = int(port.Number) } pods = append(pods, - &datalayer.PodInfo{ + &datalayer.EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: pod.Name + "-rank-" + strconv.Itoa(idx), Namespace: pod.Namespace, @@ -249,28 +249,28 @@ func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool { } result := true - for _, podInfo := range pods { - var pm backendmetrics.PodMetrics - existing, ok := ds.pods.Load(podInfo.NamespacedName) + for _, endpointMetadata := range pods { + var ep datalayer.Endpoint + existing, ok := ds.pods.Load(endpointMetadata.NamespacedName) if !ok { - pm = ds.epf.NewEndpoint(ds.parentCtx, podInfo, ds) - ds.pods.Store(podInfo.NamespacedName, pm) + ep = ds.epf.NewEndpoint(ds.parentCtx, endpointMetadata, ds) + ds.pods.Store(endpointMetadata.NamespacedName, ep) result = false } else { - pm = existing.(backendmetrics.PodMetrics) + ep = existing.(backendmetrics.PodMetrics) } - // Update pod properties if anything changed. - pm.UpdatePod(podInfo) + // Update endpoint properties if anything changed. + ep.UpdateMetadata(endpointMetadata) } return result } func (ds *datastore) PodDelete(podName string) { ds.pods.Range(func(k, v any) bool { - pm := v.(backendmetrics.PodMetrics) - if pm.GetPod().PodName == podName { + ep := v.(datalayer.Endpoint) + if ep.GetMetadata().PodName == podName { ds.pods.Delete(k) - ds.epf.ReleaseEndpoint(pm) + ds.epf.ReleaseEndpoint(ep) } return true }) @@ -302,10 +302,10 @@ func (ds *datastore) podResyncAll(ctx context.Context, reader client.Reader) err // Remove pods that don't belong to the pool or not ready any more. ds.pods.Range(func(k, v any) bool { - pm := v.(backendmetrics.PodMetrics) - if exist := activePods[pm.GetPod().PodName]; !exist { - logger.V(logutil.VERBOSE).Info("Removing pod", "pod", pm.GetPod()) - ds.PodDelete(pm.GetPod().PodName) + ep := v.(datalayer.Endpoint) + if exist := activePods[ep.GetMetadata().PodName]; !exist { + logger.V(logutil.VERBOSE).Info("Removing pod", "pod", ep.GetMetadata()) + ds.PodDelete(ep.GetMetadata().PodName) } return true }) diff --git a/pkg/epp/metrics/collectors/inference_pool.go b/pkg/epp/metrics/collectors/inference_pool.go index ec3def164..e20a37d73 100644 --- a/pkg/epp/metrics/collectors/inference_pool.go +++ b/pkg/epp/metrics/collectors/inference_pool.go @@ -74,7 +74,7 @@ func (c *inferencePoolMetricsCollector) Collect(ch chan<- prometheus.Metric) { prometheus.GaugeValue, float64(pod.GetMetrics().WaitingQueueSize), pool.Name, - pod.GetPod().NamespacedName.Name, + pod.GetMetadata().NamespacedName.Name, ) } } diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index ecd52d90c..885cbc9f8 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -235,7 +235,7 @@ func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMet podTotalCount := 0 podFilteredList := d.datastore.PodList(func(pm backendmetrics.PodMetrics) bool { podTotalCount++ - if _, found := endpoints[pm.GetPod().GetIPAddress()]; found { + if _, found := endpoints[pm.GetMetadata().GetIPAddress()]; found { return true } return false @@ -279,9 +279,9 @@ func (d *Director) toSchedulerPodMetrics(pods []backendmetrics.PodMetrics) []sch pm := make([]schedulingtypes.Pod, len(pods)) for i, pod := range pods { if pod.GetAttributes() != nil { - pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetPod().Clone(), MetricsState: pod.GetMetrics().Clone(), AttributeMap: pod.GetAttributes().Clone()} + pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetMetadata().Clone(), MetricsState: pod.GetMetrics().Clone(), AttributeMap: pod.GetAttributes().Clone()} } else { - pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetPod().Clone(), MetricsState: pod.GetMetrics().Clone(), AttributeMap: datalayer.NewAttributes()} + pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetMetadata().Clone(), MetricsState: pod.GetMetrics().Clone(), AttributeMap: datalayer.NewAttributes()} } } @@ -338,7 +338,7 @@ func (d *Director) GetRandomPod() *backend.Pod { } number := rand.Intn(len(pods)) pod := pods[number] - return pod.GetPod() + return pod.GetMetadata() } func (d *Director) runPreRequestPlugins(ctx context.Context, request *schedulingtypes.LLMRequest, diff --git a/pkg/epp/saturationdetector/saturationdetector.go b/pkg/epp/saturationdetector/saturationdetector.go index 46b94b22c..0891207e9 100644 --- a/pkg/epp/saturationdetector/saturationdetector.go +++ b/pkg/epp/saturationdetector/saturationdetector.go @@ -91,8 +91,8 @@ func (d *Detector) IsSaturated(ctx context.Context, candidatePods []backendmetri for _, podMetric := range candidatePods { metrics := podMetric.GetMetrics() podNn := "unknown-pod" - if podMetric.GetPod() != nil { - podNn = podMetric.GetPod().NamespacedName.String() + if podMetric.GetMetadata() != nil { + podNn = podMetric.GetMetadata().NamespacedName.String() } if metrics == nil { diff --git a/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go b/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go index d30bc13f5..5576b6f8f 100644 --- a/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go +++ b/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go @@ -28,7 +28,7 @@ import ( k8stypes "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/log" - backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol" @@ -311,14 +311,14 @@ func (m *Plugin) CleanUpInactivePods(ctx context.Context, handle plugins.Handle) case <-ctx.Done(): return case <-ticker.C: - activePodMetrics := handle.PodList(func(_ backendmetrics.PodMetrics) bool { return true }) - activePods := make(map[ServerID]struct{}, len(activePodMetrics)) - for _, pm := range activePodMetrics { - activePods[ServerID(pm.GetPod().NamespacedName)] = struct{}{} + activeEndpoints := handle.PodList(func(_ datalayer.Endpoint) bool { return true }) + activeEndpointNames := make(map[ServerID]struct{}, len(activeEndpoints)) + for _, ep := range activeEndpoints { + activeEndpointNames[ServerID(ep.GetMetadata().NamespacedName)] = struct{}{} } for _, pod := range m.indexer.Pods() { - if _, ok := activePods[pod]; !ok { + if _, ok := activeEndpointNames[pod]; !ok { m.indexer.RemovePod(pod) logger.Info("Removed pod not in active set", "pod", pod) } From 6f41e8b8a5161b2d5fc880dae4544021311b7e9e Mon Sep 17 00:00:00 2001 From: Shmuel Kallner Date: Sun, 16 Nov 2025 12:27:44 +0200 Subject: [PATCH 3/5] Test updates due to PodInfo rename Signed-off-by: Shmuel Kallner --- pkg/epp/backend/metrics/fake.go | 6 ++-- pkg/epp/backend/metrics/pod_metrics_test.go | 2 +- .../inferencepool_reconciler_test.go | 28 +++++++++---------- pkg/epp/controller/pod_reconciler_test.go | 4 +-- pkg/epp/datalayer/collector_test.go | 4 +-- pkg/epp/datastore/datastore_test.go | 26 ++++++++--------- pkg/epp/requestcontrol/director_test.go | 2 +- 7 files changed, 36 insertions(+), 36 deletions(-) diff --git a/pkg/epp/backend/metrics/fake.go b/pkg/epp/backend/metrics/fake.go index 1c7a90528..018241ea8 100644 --- a/pkg/epp/backend/metrics/fake.go +++ b/pkg/epp/backend/metrics/fake.go @@ -38,10 +38,10 @@ type FakePodMetrics struct { } func (fpm *FakePodMetrics) String() string { - return fmt.Sprintf("Pod: %v; Metrics: %v", fpm.GetPod(), fpm.GetMetrics()) + return fmt.Sprintf("Pod: %v; Metrics: %v", fpm.GetMetadata(), fpm.GetMetrics()) } -func (fpm *FakePodMetrics) GetPod() *backend.Pod { +func (fpm *FakePodMetrics) GetMetadata() *backend.Pod { return fpm.Pod } @@ -49,7 +49,7 @@ func (fpm *FakePodMetrics) GetMetrics() *MetricsState { return fpm.Metrics } -func (fpm *FakePodMetrics) UpdatePod(pod *datalayer.PodInfo) { +func (fpm *FakePodMetrics) UpdateMetadata(pod *datalayer.EndpointMetadata) { fpm.Pod = pod } func (fpm *FakePodMetrics) GetAttributes() *datalayer.Attributes { diff --git a/pkg/epp/backend/metrics/pod_metrics_test.go b/pkg/epp/backend/metrics/pod_metrics_test.go index b0297cd1e..8cd3aef0e 100644 --- a/pkg/epp/backend/metrics/pod_metrics_test.go +++ b/pkg/epp/backend/metrics/pod_metrics_test.go @@ -30,7 +30,7 @@ import ( ) var ( - pod1Info = &datalayer.PodInfo{ + pod1Info = &datalayer.EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: "pod1-rank-0", Namespace: "default", diff --git a/pkg/epp/controller/inferencepool_reconciler_test.go b/pkg/epp/controller/inferencepool_reconciler_test.go index a2bce1256..7428229cb 100644 --- a/pkg/epp/controller/inferencepool_reconciler_test.go +++ b/pkg/epp/controller/inferencepool_reconciler_test.go @@ -121,7 +121,7 @@ func TestInferencePoolReconciler(t *testing.T) { if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } - if diff := diffStore(ds, diffStoreParams{wantPool: pool1, wantPods: []string{"pod1-rank-0", "pod2-rank-0"}}); diff != "" { + if diff := diffStore(ds, diffStoreParams{wantPool: pool1, wantEndpoints: []string{"pod1-rank-0", "pod2-rank-0"}}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } @@ -139,7 +139,7 @@ func TestInferencePoolReconciler(t *testing.T) { if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } - if diff := diffStore(ds, diffStoreParams{wantPool: newPool1, wantPods: []string{"pod5-rank-0"}}); diff != "" { + if diff := diffStore(ds, diffStoreParams{wantPool: newPool1, wantEndpoints: []string{"pod5-rank-0"}}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } @@ -154,7 +154,7 @@ func TestInferencePoolReconciler(t *testing.T) { if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } - if diff := diffStore(ds, diffStoreParams{wantPool: newPool1, wantPods: []string{"pod5-rank-0"}}); diff != "" { + if diff := diffStore(ds, diffStoreParams{wantPool: newPool1, wantEndpoints: []string{"pod5-rank-0"}}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } @@ -168,14 +168,14 @@ func TestInferencePoolReconciler(t *testing.T) { if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } - if diff := diffStore(ds, diffStoreParams{wantPods: []string{}}); diff != "" { + if diff := diffStore(ds, diffStoreParams{wantEndpoints: []string{}}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } } type diffStoreParams struct { wantPool *v1.InferencePool - wantPods []string + wantEndpoints []string wantObjectives []*v1alpha2.InferenceObjective } @@ -188,15 +188,15 @@ func diffStore(datastore datastore.Datastore, params diffStoreParams) string { } // Default wantPods if not set because PodGetAll returns an empty slice when empty. - if params.wantPods == nil { - params.wantPods = []string{} + if params.wantEndpoints == nil { + params.wantEndpoints = []string{} } - gotPods := []string{} - for _, pm := range datastore.PodList(backendmetrics.AllPodsPredicate) { - gotPods = append(gotPods, pm.GetPod().NamespacedName.Name) + gotEndpoints := []string{} + for _, em := range datastore.PodList(backendmetrics.AllPodsPredicate) { + gotEndpoints = append(gotEndpoints, em.GetMetadata().NamespacedName.Name) } - if diff := cmp.Diff(params.wantPods, gotPods, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" { - return "pods:" + diff + if diff := cmp.Diff(params.wantEndpoints, gotEndpoints, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" { + return "endpoints:" + diff } // Default wantModels if not set because ModelGetAll returns an empty slice when empty. @@ -348,8 +348,8 @@ func xDiffStore(t *testing.T, datastore datastore.Datastore, params xDiffStorePa params.wantPods = []string{} } gotPods := []string{} - for _, pm := range datastore.PodList(backendmetrics.AllPodsPredicate) { - gotPods = append(gotPods, pm.GetPod().NamespacedName.Name) + for _, em := range datastore.PodList(backendmetrics.AllPodsPredicate) { + gotPods = append(gotPods, em.GetMetadata().NamespacedName.Name) } if diff := cmp.Diff(params.wantPods, gotPods, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" { return "pods:" + diff diff --git a/pkg/epp/controller/pod_reconciler_test.go b/pkg/epp/controller/pod_reconciler_test.go index 28f817310..07f5d66eb 100644 --- a/pkg/epp/controller/pod_reconciler_test.go +++ b/pkg/epp/controller/pod_reconciler_test.go @@ -212,8 +212,8 @@ func TestPodReconciler(t *testing.T) { } var gotPods []*corev1.Pod - for _, pm := range store.PodList(backendmetrics.AllPodsPredicate) { - pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetPod().PodName, Namespace: pm.GetPod().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetPod().GetIPAddress()}} + for _, em := range store.PodList(backendmetrics.AllPodsPredicate) { + pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: em.GetMetadata().PodName, Namespace: em.GetMetadata().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: em.GetMetadata().GetIPAddress()}} gotPods = append(gotPods, pod) } if !cmp.Equal(gotPods, test.wantPods, cmpopts.SortSlices(func(a, b *corev1.Pod) bool { return a.Name < b.Name })) { diff --git a/pkg/epp/datalayer/collector_test.go b/pkg/epp/datalayer/collector_test.go index f0655a7c7..a9a609351 100644 --- a/pkg/epp/datalayer/collector_test.go +++ b/pkg/epp/datalayer/collector_test.go @@ -44,14 +44,14 @@ func (d *DummySource) Collect(ctx context.Context, ep Endpoint) error { } func defaultEndpoint() Endpoint { - pod := &PodInfo{ + meta := &EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: "pod-name", Namespace: "default", }, Address: "1.2.3.4:5678", } - ms := NewEndpoint(pod, nil) + ms := NewEndpoint(meta, nil) return ms } diff --git a/pkg/epp/datastore/datastore_test.go b/pkg/epp/datastore/datastore_test.go index ee59071e6..9083a5479 100644 --- a/pkg/epp/datastore/datastore_test.go +++ b/pkg/epp/datastore/datastore_test.go @@ -407,7 +407,7 @@ func TestPods(t *testing.T) { test.op(ctx, ds) var gotPods []*corev1.Pod for _, pm := range ds.PodList(backendmetrics.AllPodsPredicate) { - pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetPod().PodName, Namespace: pm.GetPod().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetPod().GetIPAddress()}} + pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetMetadata().PodName, Namespace: pm.GetMetadata().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetMetadata().GetIPAddress()}} gotPods = append(gotPods, pod) } if !cmp.Equal(gotPods, test.wantPods, cmpopts.SortSlices(func(a, b *corev1.Pod) bool { return a.Name < b.Name })) { @@ -419,16 +419,16 @@ func TestPods(t *testing.T) { func TestPodInfo(t *testing.T) { tests := []struct { - name string - op func(ctx context.Context, ds Datastore) - pool *v1.InferencePool - existingPods []*corev1.Pod - wantPodInfos []*datalayer.PodInfo + name string + op func(ctx context.Context, ds Datastore) + pool *v1.InferencePool + existingPods []*corev1.Pod + wantEndpointMetas []*datalayer.EndpointMetadata }{ { name: "Add new pod, no existing pods, should add", existingPods: []*corev1.Pod{}, - wantPodInfos: []*datalayer.PodInfo{ + wantEndpointMetas: []*datalayer.EndpointMetadata{ { NamespacedName: types.NamespacedName{ Name: pod1.Name + "-rank-0", @@ -450,7 +450,7 @@ func TestPodInfo(t *testing.T) { { name: "Add new pod, no existing pods, should add, multiple target ports", existingPods: []*corev1.Pod{}, - wantPodInfos: []*datalayer.PodInfo{ + wantEndpointMetas: []*datalayer.EndpointMetadata{ { NamespacedName: types.NamespacedName{ Name: pod1.Name + "-rank-0", @@ -484,7 +484,7 @@ func TestPodInfo(t *testing.T) { { name: "Add new pod, with existing pods, should add, multiple target ports", existingPods: []*corev1.Pod{pod1}, - wantPodInfos: []*datalayer.PodInfo{ + wantEndpointMetas: []*datalayer.EndpointMetadata{ { NamespacedName: types.NamespacedName{ Name: pod1.Name + "-rank-0", @@ -542,7 +542,7 @@ func TestPodInfo(t *testing.T) { { name: "Delete the pod, multiple target ports", existingPods: []*corev1.Pod{pod1, pod2}, - wantPodInfos: []*datalayer.PodInfo{ + wantEndpointMetas: []*datalayer.EndpointMetadata{ { NamespacedName: types.NamespacedName{ Name: pod1.Name + "-rank-0", @@ -589,11 +589,11 @@ func TestPodInfo(t *testing.T) { } test.op(ctx, ds) - var gotPodInfos []*datalayer.PodInfo + var gotEndpointMetas []*datalayer.EndpointMetadata for _, pm := range ds.PodList(backendmetrics.AllPodsPredicate) { - gotPodInfos = append(gotPodInfos, pm.GetPod()) + gotEndpointMetas = append(gotEndpointMetas, pm.GetMetadata()) } - if diff := cmp.Diff(test.wantPodInfos, gotPodInfos, cmpopts.SortSlices(func(a, b *datalayer.PodInfo) bool { return a.NamespacedName.Name < b.NamespacedName.Name })); diff != "" { + if diff := cmp.Diff(test.wantEndpointMetas, gotEndpointMetas, cmpopts.SortSlices(func(a, b *datalayer.EndpointMetadata) bool { return a.NamespacedName.Name < b.NamespacedName.Name })); diff != "" { t.Errorf("ConvertTo() mismatch (-want +got):\n%s", diff) } }) diff --git a/pkg/epp/requestcontrol/director_test.go b/pkg/epp/requestcontrol/director_test.go index e705beefd..762262296 100644 --- a/pkg/epp/requestcontrol/director_test.go +++ b/pkg/epp/requestcontrol/director_test.go @@ -698,7 +698,7 @@ func TestGetCandidatePodsForScheduling(t *testing.T) { got := director.getCandidatePodsForScheduling(context.Background(), test.metadata) diff := cmp.Diff(test.output, got, cmpopts.SortSlices(func(a, b backendmetrics.PodMetrics) bool { - return a.GetPod().NamespacedName.String() < b.GetPod().NamespacedName.String() + return a.GetMetadata().NamespacedName.String() < b.GetMetadata().NamespacedName.String() })) if diff != "" { t.Errorf("Unexpected output (-want +got): %v", diff) From 474bc16a10d5122238d906579b789463d931f862 Mon Sep 17 00:00:00 2001 From: Shmuel Kallner Date: Tue, 18 Nov 2025 13:24:09 +0200 Subject: [PATCH 4/5] Updated new code with new struct names Signed-off-by: Shmuel Kallner --- pkg/epp/datalayer/endpoint.go | 8 ++++---- pkg/epp/datalayer/factory_test.go | 4 ++-- pkg/epp/datalayer/metrics/datasource_test.go | 2 +- pkg/epp/datalayer/metrics/extractor.go | 2 +- pkg/epp/datalayer/metrics/logger_test.go | 4 ++-- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/epp/datalayer/endpoint.go b/pkg/epp/datalayer/endpoint.go index 60388890e..8300771a2 100644 --- a/pkg/epp/datalayer/endpoint.go +++ b/pkg/epp/datalayer/endpoint.go @@ -50,9 +50,9 @@ type ModelServer struct { } // NewEndpoint returns a new ModelServer with the given PodInfo and Metrics. -func NewEndpoint(pod *PodInfo, metrics *Metrics) *ModelServer { - if pod == nil { - pod = &PodInfo{} +func NewEndpoint(meta *EndpointMetadata, metrics *Metrics) *ModelServer { + if meta == nil { + meta = &EndpointMetadata{} } if metrics == nil { metrics = NewMetrics() @@ -60,7 +60,7 @@ func NewEndpoint(pod *PodInfo, metrics *Metrics) *ModelServer { ep := &ModelServer{ attributes: NewAttributes(), } - ep.UpdatePod(pod) + ep.UpdateMetadata(meta) ep.UpdateMetrics(metrics) return ep } diff --git a/pkg/epp/datalayer/factory_test.go b/pkg/epp/datalayer/factory_test.go index 35cce888d..66b724e54 100644 --- a/pkg/epp/datalayer/factory_test.go +++ b/pkg/epp/datalayer/factory_test.go @@ -31,7 +31,7 @@ func TestFactory(t *testing.T) { source := &DummySource{} factory := NewEndpointFactory([]DataSource{source}, 100*time.Millisecond) - pod1 := &PodInfo{ + pod1 := &EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: "pod1", Namespace: "default", @@ -44,7 +44,7 @@ func TestFactory(t *testing.T) { dup := factory.NewEndpoint(context.Background(), pod1, nil) assert.Nil(t, dup, "expected to fail to create a duplicate collector") - pod2 := &PodInfo{ + pod2 := &EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: "pod2", Namespace: "default", diff --git a/pkg/epp/datalayer/metrics/datasource_test.go b/pkg/epp/datalayer/metrics/datasource_test.go index 7c293753f..d61380318 100644 --- a/pkg/epp/datalayer/metrics/datasource_test.go +++ b/pkg/epp/datalayer/metrics/datasource_test.go @@ -50,7 +50,7 @@ func TestDatasource(t *testing.T) { ctx := context.Background() factory := datalayer.NewEndpointFactory([]datalayer.DataSource{source}, 100*time.Millisecond) - pod := &datalayer.PodInfo{ + pod := &datalayer.EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: "pod1", Namespace: "default", diff --git a/pkg/epp/datalayer/metrics/extractor.go b/pkg/epp/datalayer/metrics/extractor.go index 27b1e07cd..b2f2b6894 100644 --- a/pkg/epp/datalayer/metrics/extractor.go +++ b/pkg/epp/datalayer/metrics/extractor.go @@ -138,7 +138,7 @@ func (ext *Extractor) Extract(ctx context.Context, data any, ep datalayer.Endpoi } } - logger := log.FromContext(ctx).WithValues("pod", ep.GetPod().NamespacedName) + logger := log.FromContext(ctx).WithValues("pod", ep.GetMetadata().NamespacedName) if updated { clone.UpdateTime = time.Now() logger.V(logutil.TRACE).Info("Refreshed metrics", "updated", clone) diff --git a/pkg/epp/datalayer/metrics/logger_test.go b/pkg/epp/datalayer/metrics/logger_test.go index 3ba2e7e84..26420671b 100644 --- a/pkg/epp/datalayer/metrics/logger_test.go +++ b/pkg/epp/datalayer/metrics/logger_test.go @@ -78,14 +78,14 @@ func TestLogger(t *testing.T) { assert.Contains(t, logOutput, "\"Stale metrics\": \"[]\"") } -var pod1 = &datalayer.PodInfo{ +var pod1 = &datalayer.EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: "pod1", Namespace: "default", }, Address: "1.2.3.4:5678", } -var pod2 = &datalayer.PodInfo{ +var pod2 = &datalayer.EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: "pod2", Namespace: "default", From 37ce4dab7e6e86e1d8b35df1f36be293ec00fe69 Mon Sep 17 00:00:00 2001 From: Shmuel Kallner Date: Wed, 19 Nov 2025 14:16:47 +0200 Subject: [PATCH 5/5] Changed PodInfo to EndpointMetadata in comments and test function names Signed-off-by: Shmuel Kallner --- pkg/epp/datalayer/endpoint.go | 2 +- pkg/epp/datalayer/endpoint_metadata_test.go | 2 +- pkg/epp/datastore/datastore_test.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/epp/datalayer/endpoint.go b/pkg/epp/datalayer/endpoint.go index 8300771a2..1ba7f939a 100644 --- a/pkg/epp/datalayer/endpoint.go +++ b/pkg/epp/datalayer/endpoint.go @@ -49,7 +49,7 @@ type ModelServer struct { attributes *Attributes } -// NewEndpoint returns a new ModelServer with the given PodInfo and Metrics. +// NewEndpoint returns a new ModelServer with the given EndpointMetadata and Metrics. func NewEndpoint(meta *EndpointMetadata, metrics *Metrics) *ModelServer { if meta == nil { meta = &EndpointMetadata{} diff --git a/pkg/epp/datalayer/endpoint_metadata_test.go b/pkg/epp/datalayer/endpoint_metadata_test.go index ad25a29f4..fb9bc7a93 100644 --- a/pkg/epp/datalayer/endpoint_metadata_test.go +++ b/pkg/epp/datalayer/endpoint_metadata_test.go @@ -66,7 +66,7 @@ func TestEndpointMetadataClone(t *testing.T) { assert.Equal(t, "prod", expected.Labels["env"], "mutating clone should not affect original") } -func TestPodInfoString(t *testing.T) { +func TestEndpointMetadataString(t *testing.T) { endpointMetadata := EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: pod.Name, diff --git a/pkg/epp/datastore/datastore_test.go b/pkg/epp/datastore/datastore_test.go index 9083a5479..f5f574ae2 100644 --- a/pkg/epp/datastore/datastore_test.go +++ b/pkg/epp/datastore/datastore_test.go @@ -417,7 +417,7 @@ func TestPods(t *testing.T) { } } -func TestPodInfo(t *testing.T) { +func TestEndpointMetadata(t *testing.T) { tests := []struct { name string op func(ctx context.Context, ds Datastore)