Skip to content

Commit 9e6bdeb

Browse files
committed
Non test code updates due to PodInfo rename
Signed-off-by: Shmuel Kallner <[email protected]>
1 parent f0158ac commit 9e6bdeb

File tree

12 files changed

+60
-60
lines changed

12 files changed

+60
-60
lines changed

pkg/epp/backend/metrics/pod_metrics.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,18 +53,18 @@ type PodMetricsClient interface {
5353
}
5454

5555
func (pm *podMetrics) String() string {
56-
return fmt.Sprintf("Pod: %v; Metrics: %v", pm.GetPod(), pm.GetMetrics())
56+
return fmt.Sprintf("Pod: %v; Metrics: %v", pm.GetMetadata(), pm.GetMetrics())
5757
}
5858

59-
func (pm *podMetrics) GetPod() *backend.Pod {
59+
func (pm *podMetrics) GetMetadata() *backend.Pod {
6060
return pm.pod.Load()
6161
}
6262

6363
func (pm *podMetrics) GetMetrics() *MetricsState {
6464
return pm.metrics.Load()
6565
}
6666

67-
func (pm *podMetrics) UpdatePod(pod *datalayer.PodInfo) {
67+
func (pm *podMetrics) UpdateMetadata(pod *datalayer.EndpointMetadata) {
6868
pm.pod.Store(pod)
6969
}
7070

@@ -73,7 +73,7 @@ func (pm *podMetrics) UpdatePod(pod *datalayer.PodInfo) {
7373
func (pm *podMetrics) startRefreshLoop(ctx context.Context) {
7474
pm.startOnce.Do(func() {
7575
go func() {
76-
pm.logger.V(logutil.DEFAULT).Info("Starting refresher", "pod", pm.GetPod())
76+
pm.logger.V(logutil.DEFAULT).Info("Starting refresher", "pod", pm.GetMetadata())
7777
ticker := time.NewTicker(pm.interval)
7878
defer ticker.Stop()
7979
for {
@@ -84,7 +84,7 @@ func (pm *podMetrics) startRefreshLoop(ctx context.Context) {
8484
return
8585
case <-ticker.C: // refresh metrics periodically
8686
if err := pm.refreshMetrics(); err != nil {
87-
pm.logger.V(logutil.TRACE).Error(err, "Failed to refresh metrics", "pod", pm.GetPod())
87+
pm.logger.V(logutil.TRACE).Error(err, "Failed to refresh metrics", "pod", pm.GetMetadata())
8888
}
8989
}
9090
}
@@ -95,7 +95,7 @@ func (pm *podMetrics) startRefreshLoop(ctx context.Context) {
9595
func (pm *podMetrics) refreshMetrics() error {
9696
ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout)
9797
defer cancel()
98-
updated, err := pm.pmc.FetchMetrics(ctx, pm.GetPod(), pm.GetMetrics())
98+
updated, err := pm.pmc.FetchMetrics(ctx, pm.GetMetadata(), pm.GetMetrics())
9999
if err != nil {
100100
pm.logger.V(logutil.TRACE).Info("Failed to refreshed metrics:", "err", err)
101101
}
@@ -115,7 +115,7 @@ func (pm *podMetrics) refreshMetrics() error {
115115
}
116116

117117
func (pm *podMetrics) stopRefreshLoop() {
118-
pm.logger.V(logutil.DEFAULT).Info("Stopping refresher", "pod", pm.GetPod())
118+
pm.logger.V(logutil.DEFAULT).Info("Stopping refresher", "pod", pm.GetMetadata())
119119
pm.stopOnce.Do(func() {
120120
close(pm.done)
121121
})

pkg/epp/backend/metrics/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ type PodMetricsFactory struct {
5252
refreshMetricsInterval time.Duration
5353
}
5454

55-
func (f *PodMetricsFactory) NewEndpoint(parentCtx context.Context, pod *datalayer.PodInfo, ds datalayer.PoolInfo) PodMetrics {
55+
func (f *PodMetricsFactory) NewEndpoint(parentCtx context.Context, pod *datalayer.EndpointMetadata, ds datalayer.PoolInfo) PodMetrics {
5656
pm := &podMetrics{
5757
pmc: f.pmc,
5858
ds: ds,

pkg/epp/backend/pod.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,4 @@ import (
2020
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
2121
)
2222

23-
type Pod = datalayer.PodInfo
23+
type Pod = datalayer.EndpointMetadata

pkg/epp/datalayer/collector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ func (c *Collector) Start(ctx context.Context, ticker Ticker, ep Endpoint, sourc
8888
started := false
8989

9090
c.startOnce.Do(func() {
91-
logger := log.FromContext(ctx).WithValues("endpoint", ep.GetPod().GetIPAddress())
91+
logger := log.FromContext(ctx).WithValues("endpoint", ep.GetMetadata().GetIPAddress())
9292
c.ctx, c.cancel = context.WithCancel(ctx)
9393
started = true
9494
ready = make(chan struct{})

pkg/epp/datalayer/endpoint.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ import (
2121
"sync/atomic"
2222
)
2323

24-
// EndpointPodState allows management of the Pod related attributes.
25-
type EndpointPodState interface {
26-
GetPod() *PodInfo
27-
UpdatePod(*PodInfo)
24+
// EndpointMetaState allows management of the EndpointMetadata related attributes.
25+
type EndpointMetaState interface {
26+
GetMetadata() *EndpointMetadata
27+
UpdateMetadata(*EndpointMetadata)
2828
}
2929

3030
// EndpointMetricsState allows management of the Metrics related attributes.
@@ -36,14 +36,14 @@ type EndpointMetricsState interface {
3636
// Endpoint represents an inference serving endpoint and its related attributes.
3737
type Endpoint interface {
3838
fmt.Stringer
39-
EndpointPodState
39+
EndpointMetaState
4040
EndpointMetricsState
4141
AttributeMap
4242
}
4343

4444
// ModelServer is an implementation of the Endpoint interface.
4545
type ModelServer struct {
46-
pod atomic.Pointer[PodInfo]
46+
pod atomic.Pointer[EndpointMetadata]
4747
metrics atomic.Pointer[Metrics]
4848
attributes *Attributes
4949
}
@@ -58,14 +58,14 @@ func NewEndpoint() *ModelServer {
5858
// String returns a representation of the ModelServer. For brevity, only names of
5959
// extended attributes are returned and not their values.
6060
func (srv *ModelServer) String() string {
61-
return fmt.Sprintf("Pod: %v; Metrics: %v; Attributes: %v", srv.GetPod(), srv.GetMetrics(), srv.Keys())
61+
return fmt.Sprintf("Pod: %v; Metrics: %v; Attributes: %v", srv.GetMetadata(), srv.GetMetrics(), srv.Keys())
6262
}
6363

64-
func (srv *ModelServer) GetPod() *PodInfo {
64+
func (srv *ModelServer) GetMetadata() *EndpointMetadata {
6565
return srv.pod.Load()
6666
}
6767

68-
func (srv *ModelServer) UpdatePod(pod *PodInfo) {
68+
func (srv *ModelServer) UpdateMetadata(pod *EndpointMetadata) {
6969
srv.pod.Store(pod)
7070
}
7171

pkg/epp/datalayer/factory.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ type PoolInfo interface {
4444
// providing methods to allocate and retire endpoints. This can potentially be used for
4545
// pooled memory or other management chores in the implementation.
4646
type EndpointFactory interface {
47-
NewEndpoint(parent context.Context, inpod *PodInfo, poolinfo PoolInfo) Endpoint
47+
NewEndpoint(parent context.Context, inEnpointMetadata *EndpointMetadata, poolinfo PoolInfo) Endpoint
4848
ReleaseEndpoint(ep Endpoint)
4949
}
5050

@@ -69,8 +69,8 @@ func NewEndpointFactory(sources []DataSource, refreshMetricsInterval time.Durati
6969
// NewEndpoint implements EndpointFactory.NewEndpoint.
7070
// Creates a new endpoint and starts its associated collector with its own ticker.
7171
// Guards against multiple concurrent calls for the same endpoint.
72-
func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inpod *PodInfo, _ PoolInfo) Endpoint {
73-
key := types.NamespacedName{Namespace: inpod.GetNamespacedName().Namespace, Name: inpod.GetNamespacedName().Name}
72+
func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inEnpointMetadata *EndpointMetadata, _ PoolInfo) Endpoint {
73+
key := types.NamespacedName{Namespace: inEnpointMetadata.GetNamespacedName().Namespace, Name: inEnpointMetadata.GetNamespacedName().Name}
7474
logger := log.FromContext(parent).WithValues("pod", key)
7575

7676
if _, ok := lc.collectors.Load(key); ok {
@@ -79,7 +79,7 @@ func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inpod *PodInfo,
7979
}
8080

8181
endpoint := NewEndpoint()
82-
endpoint.UpdatePod(inpod)
82+
endpoint.UpdateMetadata(inEnpointMetadata)
8383
collector := NewCollector() // for full backward compatibility, set the logger and poolinfo
8484

8585
if _, loaded := lc.collectors.LoadOrStore(key, collector); loaded {
@@ -101,7 +101,7 @@ func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inpod *PodInfo,
101101
// ReleaseEndpoint implements EndpointFactory.ReleaseEndpoint
102102
// Stops the collector and cleans up resources for the endpoint
103103
func (lc *EndpointLifecycle) ReleaseEndpoint(ep Endpoint) {
104-
key := ep.GetPod().GetNamespacedName()
104+
key := ep.GetMetadata().GetNamespacedName()
105105

106106
if value, ok := lc.collectors.LoadAndDelete(key); ok {
107107
collector := value.(*Collector)

pkg/epp/datalayer/metrics/datasource.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,8 @@ func (dataSrc *DataSource) AddExtractor(extractor datalayer.Extractor) error {
8686
// Collect is triggered by the data layer framework to fetch potentially new
8787
// MSP metrics data for an endpoint.
8888
func (dataSrc *DataSource) Collect(ctx context.Context, ep datalayer.Endpoint) error {
89-
target := dataSrc.getMetricsEndpoint(ep.GetPod())
90-
families, err := dataSrc.client.Get(ctx, target, ep.GetPod())
89+
target := dataSrc.getMetricsEndpoint(ep.GetMetadata())
90+
families, err := dataSrc.client.Get(ctx, target, ep.GetMetadata())
9191

9292
if err != nil {
9393
return err

pkg/epp/datastore/datastore.go

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -200,13 +200,13 @@ func (ds *datastore) ObjectiveGetAll() []*v1alpha2.InferenceObjective {
200200
// /// Pods/endpoints APIs ///
201201
// TODO: add a flag for callers to specify the staleness threshold for metrics.
202202
// ref: https://github.com/kubernetes-sigs/gateway-api-inference-extension/pull/1046#discussion_r2246351694
203-
func (ds *datastore) PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics {
204-
res := []backendmetrics.PodMetrics{}
203+
func (ds *datastore) PodList(predicate func(datalayer.Endpoint) bool) []datalayer.Endpoint {
204+
res := []datalayer.Endpoint{}
205205

206206
ds.pods.Range(func(k, v any) bool {
207-
pm := v.(backendmetrics.PodMetrics)
208-
if predicate(pm) {
209-
res = append(res, pm)
207+
ep := v.(datalayer.Endpoint)
208+
if predicate(ep) {
209+
res = append(res, ep)
210210
}
211211
return true
212212
})
@@ -228,14 +228,14 @@ func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool {
228228
if len(ds.pool.Spec.TargetPorts) == 1 {
229229
modelServerMetricsPort = int(ds.modelServerMetricsPort)
230230
}
231-
pods := []*datalayer.PodInfo{}
231+
pods := []*datalayer.EndpointMetadata{}
232232
for idx, port := range ds.pool.Spec.TargetPorts {
233233
metricsPort := modelServerMetricsPort
234234
if metricsPort == 0 {
235235
metricsPort = int(port.Number)
236236
}
237237
pods = append(pods,
238-
&datalayer.PodInfo{
238+
&datalayer.EndpointMetadata{
239239
NamespacedName: types.NamespacedName{
240240
Name: pod.Name + "-rank-" + strconv.Itoa(idx),
241241
Namespace: pod.Namespace,
@@ -249,28 +249,28 @@ func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool {
249249
}
250250

251251
result := true
252-
for _, podInfo := range pods {
253-
var pm backendmetrics.PodMetrics
254-
existing, ok := ds.pods.Load(podInfo.NamespacedName)
252+
for _, endpointMetadata := range pods {
253+
var ep datalayer.Endpoint
254+
existing, ok := ds.pods.Load(endpointMetadata.NamespacedName)
255255
if !ok {
256-
pm = ds.epf.NewEndpoint(ds.parentCtx, podInfo, ds)
257-
ds.pods.Store(podInfo.NamespacedName, pm)
256+
ep = ds.epf.NewEndpoint(ds.parentCtx, endpointMetadata, ds)
257+
ds.pods.Store(endpointMetadata.NamespacedName, ep)
258258
result = false
259259
} else {
260-
pm = existing.(backendmetrics.PodMetrics)
260+
ep = existing.(backendmetrics.PodMetrics)
261261
}
262-
// Update pod properties if anything changed.
263-
pm.UpdatePod(podInfo)
262+
// Update endpoint properties if anything changed.
263+
ep.UpdateMetadata(endpointMetadata)
264264
}
265265
return result
266266
}
267267

268268
func (ds *datastore) PodDelete(podName string) {
269269
ds.pods.Range(func(k, v any) bool {
270-
pm := v.(backendmetrics.PodMetrics)
271-
if pm.GetPod().PodName == podName {
270+
ep := v.(datalayer.Endpoint)
271+
if ep.GetMetadata().PodName == podName {
272272
ds.pods.Delete(k)
273-
ds.epf.ReleaseEndpoint(pm)
273+
ds.epf.ReleaseEndpoint(ep)
274274
}
275275
return true
276276
})
@@ -302,10 +302,10 @@ func (ds *datastore) podResyncAll(ctx context.Context, reader client.Reader) err
302302

303303
// Remove pods that don't belong to the pool or not ready any more.
304304
ds.pods.Range(func(k, v any) bool {
305-
pm := v.(backendmetrics.PodMetrics)
306-
if exist := activePods[pm.GetPod().PodName]; !exist {
307-
logger.V(logutil.VERBOSE).Info("Removing pod", "pod", pm.GetPod())
308-
ds.PodDelete(pm.GetPod().PodName)
305+
ep := v.(datalayer.Endpoint)
306+
if exist := activePods[ep.GetMetadata().PodName]; !exist {
307+
logger.V(logutil.VERBOSE).Info("Removing pod", "pod", ep.GetMetadata())
308+
ds.PodDelete(ep.GetMetadata().PodName)
309309
}
310310
return true
311311
})

pkg/epp/metrics/collectors/inference_pool.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func (c *inferencePoolMetricsCollector) Collect(ch chan<- prometheus.Metric) {
7474
prometheus.GaugeValue,
7575
float64(pod.GetMetrics().WaitingQueueSize),
7676
pool.Name,
77-
pod.GetPod().NamespacedName.Name,
77+
pod.GetMetadata().NamespacedName.Name,
7878
)
7979
}
8080
}

pkg/epp/requestcontrol/director.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMet
201201
podTotalCount := 0
202202
podFilteredList := d.datastore.PodList(func(pm backendmetrics.PodMetrics) bool {
203203
podTotalCount++
204-
if _, found := endpoints[pm.GetPod().GetIPAddress()]; found {
204+
if _, found := endpoints[pm.GetMetadata().GetIPAddress()]; found {
205205
return true
206206
}
207207
return false
@@ -244,7 +244,7 @@ func (d *Director) prepareRequest(ctx context.Context, reqCtx *handlers.RequestC
244244
func (d *Director) toSchedulerPodMetrics(pods []backendmetrics.PodMetrics) []schedulingtypes.Pod {
245245
pm := make([]schedulingtypes.Pod, len(pods))
246246
for i, pod := range pods {
247-
pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetPod().Clone(), MetricsState: pod.GetMetrics().Clone()}
247+
pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetMetadata().Clone(), MetricsState: pod.GetMetrics().Clone()}
248248
}
249249

250250
return pm
@@ -300,7 +300,7 @@ func (d *Director) GetRandomPod() *backend.Pod {
300300
}
301301
number := rand.Intn(len(pods))
302302
pod := pods[number]
303-
return pod.GetPod()
303+
return pod.GetMetadata()
304304
}
305305

306306
func (d *Director) runPreRequestPlugins(ctx context.Context, request *schedulingtypes.LLMRequest,

0 commit comments

Comments
 (0)