Skip to content

Commit 45f9dfd

Browse files
committed
Non test code updates due to PodInfo rename
Signed-off-by: Shmuel Kallner <[email protected]>
1 parent b9603e9 commit 45f9dfd

File tree

12 files changed

+61
-61
lines changed

12 files changed

+61
-61
lines changed

pkg/epp/backend/metrics/pod_metrics.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,18 +53,18 @@ type PodMetricsClient interface {
5353
}
5454

5555
func (pm *podMetrics) String() string {
56-
return fmt.Sprintf("Pod: %v; Metrics: %v", pm.GetPod(), pm.GetMetrics())
56+
return fmt.Sprintf("Pod: %v; Metrics: %v", pm.GetMetadata(), pm.GetMetrics())
5757
}
5858

59-
func (pm *podMetrics) GetPod() *backend.Pod {
59+
func (pm *podMetrics) GetMetadata() *backend.Pod {
6060
return pm.pod.Load()
6161
}
6262

6363
func (pm *podMetrics) GetMetrics() *MetricsState {
6464
return pm.metrics.Load()
6565
}
6666

67-
func (pm *podMetrics) UpdatePod(pod *datalayer.PodInfo) {
67+
func (pm *podMetrics) UpdateMetadata(pod *datalayer.EndpointMetadata) {
6868
pm.pod.Store(pod)
6969
}
7070

@@ -73,7 +73,7 @@ func (pm *podMetrics) UpdatePod(pod *datalayer.PodInfo) {
7373
func (pm *podMetrics) startRefreshLoop(ctx context.Context) {
7474
pm.startOnce.Do(func() {
7575
go func() {
76-
pm.logger.V(logutil.DEFAULT).Info("Starting refresher", "pod", pm.GetPod())
76+
pm.logger.V(logutil.DEFAULT).Info("Starting refresher", "pod", pm.GetMetadata())
7777
ticker := time.NewTicker(pm.interval)
7878
defer ticker.Stop()
7979
for {
@@ -84,7 +84,7 @@ func (pm *podMetrics) startRefreshLoop(ctx context.Context) {
8484
return
8585
case <-ticker.C: // refresh metrics periodically
8686
if err := pm.refreshMetrics(); err != nil {
87-
pm.logger.V(logutil.TRACE).Error(err, "Failed to refresh metrics", "pod", pm.GetPod())
87+
pm.logger.V(logutil.TRACE).Error(err, "Failed to refresh metrics", "pod", pm.GetMetadata())
8888
}
8989
}
9090
}
@@ -95,7 +95,7 @@ func (pm *podMetrics) startRefreshLoop(ctx context.Context) {
9595
func (pm *podMetrics) refreshMetrics() error {
9696
ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout)
9797
defer cancel()
98-
updated, err := pm.pmc.FetchMetrics(ctx, pm.GetPod(), pm.GetMetrics())
98+
updated, err := pm.pmc.FetchMetrics(ctx, pm.GetMetadata(), pm.GetMetrics())
9999
if err != nil {
100100
pm.logger.V(logutil.TRACE).Info("Failed to refreshed metrics:", "err", err)
101101
}
@@ -115,7 +115,7 @@ func (pm *podMetrics) refreshMetrics() error {
115115
}
116116

117117
func (pm *podMetrics) stopRefreshLoop() {
118-
pm.logger.V(logutil.DEFAULT).Info("Stopping refresher", "pod", pm.GetPod())
118+
pm.logger.V(logutil.DEFAULT).Info("Stopping refresher", "pod", pm.GetMetadata())
119119
pm.stopOnce.Do(func() {
120120
close(pm.done)
121121
})

pkg/epp/backend/metrics/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ type PodMetricsFactory struct {
5252
refreshMetricsInterval time.Duration
5353
}
5454

55-
func (f *PodMetricsFactory) NewEndpoint(parentCtx context.Context, pod *datalayer.PodInfo, ds datalayer.PoolInfo) PodMetrics {
55+
func (f *PodMetricsFactory) NewEndpoint(parentCtx context.Context, pod *datalayer.EndpointMetadata, ds datalayer.PoolInfo) PodMetrics {
5656
pm := &podMetrics{
5757
pmc: f.pmc,
5858
ds: ds,

pkg/epp/backend/pod.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,4 @@ import (
2020
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
2121
)
2222

23-
type Pod = datalayer.PodInfo
23+
type Pod = datalayer.EndpointMetadata

pkg/epp/datalayer/collector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ func (c *Collector) Start(ctx context.Context, ticker Ticker, ep Endpoint, sourc
8888
started := false
8989

9090
c.startOnce.Do(func() {
91-
logger := log.FromContext(ctx).WithValues("endpoint", ep.GetPod().GetIPAddress())
91+
logger := log.FromContext(ctx).WithValues("endpoint", ep.GetMetadata().GetIPAddress())
9292
c.ctx, c.cancel = context.WithCancel(ctx)
9393
started = true
9494
ready = make(chan struct{})

pkg/epp/datalayer/endpoint.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ import (
2121
"sync/atomic"
2222
)
2323

24-
// EndpointPodState allows management of the Pod related attributes.
25-
type EndpointPodState interface {
26-
GetPod() *PodInfo
27-
UpdatePod(*PodInfo)
24+
// EndpointMetaState allows management of the EndpointMetadata related attributes.
25+
type EndpointMetaState interface {
26+
GetMetadata() *EndpointMetadata
27+
UpdateMetadata(*EndpointMetadata)
2828
GetAttributes() *Attributes
2929
}
3030

@@ -37,14 +37,14 @@ type EndpointMetricsState interface {
3737
// Endpoint represents an inference serving endpoint and its related attributes.
3838
type Endpoint interface {
3939
fmt.Stringer
40-
EndpointPodState
40+
EndpointMetaState
4141
EndpointMetricsState
4242
AttributeMap
4343
}
4444

4545
// ModelServer is an implementation of the Endpoint interface.
4646
type ModelServer struct {
47-
pod atomic.Pointer[PodInfo]
47+
pod atomic.Pointer[EndpointMetadata]
4848
metrics atomic.Pointer[Metrics]
4949
attributes *Attributes
5050
}
@@ -68,14 +68,14 @@ func NewEndpoint(pod *PodInfo, metrics *Metrics) *ModelServer {
6868
// String returns a representation of the ModelServer. For brevity, only names of
6969
// extended attributes are returned and not their values.
7070
func (srv *ModelServer) String() string {
71-
return fmt.Sprintf("Pod: %v; Metrics: %v; Attributes: %v", srv.GetPod(), srv.GetMetrics(), srv.Keys())
71+
return fmt.Sprintf("Pod: %v; Metrics: %v; Attributes: %v", srv.GetMetadata(), srv.GetMetrics(), srv.Keys())
7272
}
7373

74-
func (srv *ModelServer) GetPod() *PodInfo {
74+
func (srv *ModelServer) GetMetadata() *EndpointMetadata {
7575
return srv.pod.Load()
7676
}
7777

78-
func (srv *ModelServer) UpdatePod(pod *PodInfo) {
78+
func (srv *ModelServer) UpdateMetadata(pod *EndpointMetadata) {
7979
srv.pod.Store(pod)
8080
}
8181

pkg/epp/datalayer/factory.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ type PoolInfo interface {
4444
// providing methods to allocate and retire endpoints. This can potentially be used for
4545
// pooled memory or other management chores in the implementation.
4646
type EndpointFactory interface {
47-
NewEndpoint(parent context.Context, inpod *PodInfo, poolinfo PoolInfo) Endpoint
47+
NewEndpoint(parent context.Context, inEnpointMetadata *EndpointMetadata, poolinfo PoolInfo) Endpoint
4848
ReleaseEndpoint(ep Endpoint)
4949
}
5050

@@ -69,16 +69,16 @@ func NewEndpointFactory(sources []DataSource, refreshMetricsInterval time.Durati
6969
// NewEndpoint implements EndpointFactory.NewEndpoint.
7070
// Creates a new endpoint and starts its associated collector with its own ticker.
7171
// Guards against multiple concurrent calls for the same endpoint.
72-
func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inpod *PodInfo, _ PoolInfo) Endpoint {
73-
key := types.NamespacedName{Namespace: inpod.GetNamespacedName().Namespace, Name: inpod.GetNamespacedName().Name}
72+
func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inEndpointMetadata *EndpointMetadata, _ PoolInfo) Endpoint {
73+
key := types.NamespacedName{Namespace: inEndpointMetadata.GetNamespacedName().Namespace, Name: inEndpointMetadata.GetNamespacedName().Name}
7474
logger := log.FromContext(parent).WithValues("pod", key)
7575

7676
if _, ok := lc.collectors.Load(key); ok {
7777
logger.Info("collector already running for endpoint", "endpoint", key)
7878
return nil
7979
}
8080

81-
endpoint := NewEndpoint(inpod, nil)
81+
endpoint := NewEndpoint(inEndpointMetadata, nil)
8282
collector := NewCollector() // TODO or full backward compatibility, set the logger and poolinfo
8383

8484
if _, loaded := lc.collectors.LoadOrStore(key, collector); loaded {
@@ -100,7 +100,7 @@ func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inpod *PodInfo,
100100
// ReleaseEndpoint implements EndpointFactory.ReleaseEndpoint
101101
// Stops the collector and cleans up resources for the endpoint
102102
func (lc *EndpointLifecycle) ReleaseEndpoint(ep Endpoint) {
103-
key := ep.GetPod().GetNamespacedName()
103+
key := ep.GetMetadata().GetNamespacedName()
104104

105105
if value, ok := lc.collectors.LoadAndDelete(key); ok {
106106
collector := value.(*Collector)

pkg/epp/datalayer/metrics/datasource.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@ func (dataSrc *DataSource) AddExtractor(extractor datalayer.Extractor) error {
9898
// Collect is triggered by the data layer framework to fetch potentially new
9999
// MSP metrics data for an endpoint.
100100
func (dataSrc *DataSource) Collect(ctx context.Context, ep datalayer.Endpoint) error {
101-
target := dataSrc.getMetricsEndpoint(ep.GetPod())
102-
families, err := dataSrc.client.Get(ctx, target, ep.GetPod())
101+
target := dataSrc.getMetricsEndpoint(ep.GetMetadata())
102+
families, err := dataSrc.client.Get(ctx, target, ep.GetMetadata())
103103

104104
if err != nil {
105105
return err

pkg/epp/datastore/datastore.go

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -200,13 +200,13 @@ func (ds *datastore) ObjectiveGetAll() []*v1alpha2.InferenceObjective {
200200
// /// Pods/endpoints APIs ///
201201
// TODO: add a flag for callers to specify the staleness threshold for metrics.
202202
// ref: https://github.com/kubernetes-sigs/gateway-api-inference-extension/pull/1046#discussion_r2246351694
203-
func (ds *datastore) PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics {
204-
res := []backendmetrics.PodMetrics{}
203+
func (ds *datastore) PodList(predicate func(datalayer.Endpoint) bool) []datalayer.Endpoint {
204+
res := []datalayer.Endpoint{}
205205

206206
ds.pods.Range(func(k, v any) bool {
207-
pm := v.(backendmetrics.PodMetrics)
208-
if predicate(pm) {
209-
res = append(res, pm)
207+
ep := v.(datalayer.Endpoint)
208+
if predicate(ep) {
209+
res = append(res, ep)
210210
}
211211
return true
212212
})
@@ -228,14 +228,14 @@ func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool {
228228
if len(ds.pool.Spec.TargetPorts) == 1 {
229229
modelServerMetricsPort = int(ds.modelServerMetricsPort)
230230
}
231-
pods := []*datalayer.PodInfo{}
231+
pods := []*datalayer.EndpointMetadata{}
232232
for idx, port := range ds.pool.Spec.TargetPorts {
233233
metricsPort := modelServerMetricsPort
234234
if metricsPort == 0 {
235235
metricsPort = int(port.Number)
236236
}
237237
pods = append(pods,
238-
&datalayer.PodInfo{
238+
&datalayer.EndpointMetadata{
239239
NamespacedName: types.NamespacedName{
240240
Name: pod.Name + "-rank-" + strconv.Itoa(idx),
241241
Namespace: pod.Namespace,
@@ -249,28 +249,28 @@ func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool {
249249
}
250250

251251
result := true
252-
for _, podInfo := range pods {
253-
var pm backendmetrics.PodMetrics
254-
existing, ok := ds.pods.Load(podInfo.NamespacedName)
252+
for _, endpointMetadata := range pods {
253+
var ep datalayer.Endpoint
254+
existing, ok := ds.pods.Load(endpointMetadata.NamespacedName)
255255
if !ok {
256-
pm = ds.epf.NewEndpoint(ds.parentCtx, podInfo, ds)
257-
ds.pods.Store(podInfo.NamespacedName, pm)
256+
ep = ds.epf.NewEndpoint(ds.parentCtx, endpointMetadata, ds)
257+
ds.pods.Store(endpointMetadata.NamespacedName, ep)
258258
result = false
259259
} else {
260-
pm = existing.(backendmetrics.PodMetrics)
260+
ep = existing.(backendmetrics.PodMetrics)
261261
}
262-
// Update pod properties if anything changed.
263-
pm.UpdatePod(podInfo)
262+
// Update endpoint properties if anything changed.
263+
ep.UpdateMetadata(endpointMetadata)
264264
}
265265
return result
266266
}
267267

268268
func (ds *datastore) PodDelete(podName string) {
269269
ds.pods.Range(func(k, v any) bool {
270-
pm := v.(backendmetrics.PodMetrics)
271-
if pm.GetPod().PodName == podName {
270+
ep := v.(datalayer.Endpoint)
271+
if ep.GetMetadata().PodName == podName {
272272
ds.pods.Delete(k)
273-
ds.epf.ReleaseEndpoint(pm)
273+
ds.epf.ReleaseEndpoint(ep)
274274
}
275275
return true
276276
})
@@ -302,10 +302,10 @@ func (ds *datastore) podResyncAll(ctx context.Context, reader client.Reader) err
302302

303303
// Remove pods that don't belong to the pool or not ready any more.
304304
ds.pods.Range(func(k, v any) bool {
305-
pm := v.(backendmetrics.PodMetrics)
306-
if exist := activePods[pm.GetPod().PodName]; !exist {
307-
logger.V(logutil.VERBOSE).Info("Removing pod", "pod", pm.GetPod())
308-
ds.PodDelete(pm.GetPod().PodName)
305+
ep := v.(datalayer.Endpoint)
306+
if exist := activePods[ep.GetMetadata().PodName]; !exist {
307+
logger.V(logutil.VERBOSE).Info("Removing pod", "pod", ep.GetMetadata())
308+
ds.PodDelete(ep.GetMetadata().PodName)
309309
}
310310
return true
311311
})

pkg/epp/metrics/collectors/inference_pool.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func (c *inferencePoolMetricsCollector) Collect(ch chan<- prometheus.Metric) {
7474
prometheus.GaugeValue,
7575
float64(pod.GetMetrics().WaitingQueueSize),
7676
pool.Name,
77-
pod.GetPod().NamespacedName.Name,
77+
pod.GetMetadata().NamespacedName.Name,
7878
)
7979
}
8080
}

pkg/epp/requestcontrol/director.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMet
235235
podTotalCount := 0
236236
podFilteredList := d.datastore.PodList(func(pm backendmetrics.PodMetrics) bool {
237237
podTotalCount++
238-
if _, found := endpoints[pm.GetPod().GetIPAddress()]; found {
238+
if _, found := endpoints[pm.GetMetadata().GetIPAddress()]; found {
239239
return true
240240
}
241241
return false
@@ -279,9 +279,9 @@ func (d *Director) toSchedulerPodMetrics(pods []backendmetrics.PodMetrics) []sch
279279
pm := make([]schedulingtypes.Pod, len(pods))
280280
for i, pod := range pods {
281281
if pod.GetAttributes() != nil {
282-
pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetPod().Clone(), MetricsState: pod.GetMetrics().Clone(), AttributeMap: pod.GetAttributes().Clone()}
282+
pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetMetadata().Clone(), MetricsState: pod.GetMetrics().Clone(), AttributeMap: pod.GetAttributes().Clone()}
283283
} else {
284-
pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetPod().Clone(), MetricsState: pod.GetMetrics().Clone(), AttributeMap: datalayer.NewAttributes()}
284+
pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetMetadata().Clone(), MetricsState: pod.GetMetrics().Clone(), AttributeMap: datalayer.NewAttributes()}
285285
}
286286
}
287287

@@ -338,7 +338,7 @@ func (d *Director) GetRandomPod() *backend.Pod {
338338
}
339339
number := rand.Intn(len(pods))
340340
pod := pods[number]
341-
return pod.GetPod()
341+
return pod.GetMetadata()
342342
}
343343

344344
func (d *Director) runPreRequestPlugins(ctx context.Context, request *schedulingtypes.LLMRequest,

0 commit comments

Comments
 (0)