11package backend
22
33import (
4- "context"
54 "errors"
65 "math/rand"
76 "sync"
8- "time"
97
10- "github.com/google/go-cmp/cmp"
118 "inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
129 logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
1310 corev1 "k8s.io/api/core/v1"
14- v1 "k8s.io/api/core/v1"
15- "k8s.io/apimachinery/pkg/api/meta"
16- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17- "k8s.io/apimachinery/pkg/labels"
18- "k8s.io/client-go/informers"
19- informersv1 "k8s.io/client-go/informers/core/v1"
20- "k8s.io/client-go/kubernetes"
21- clientset "k8s.io/client-go/kubernetes"
22- listersv1 "k8s.io/client-go/listers/core/v1"
23- "k8s.io/client-go/tools/cache"
2411 "k8s.io/klog/v2"
2512)
2613
2714func NewK8sDataStore (options ... K8sDatastoreOption ) * K8sDatastore {
2815 store := & K8sDatastore {
2916 poolMu : sync.RWMutex {},
3017 InferenceModels : & sync.Map {},
18+ pods : & sync.Map {},
3119 }
32-
33- store .podListerFactory = store .createPodLister
3420 for _ , opt := range options {
3521 opt (store )
3622 }
@@ -39,68 +25,29 @@ func NewK8sDataStore(options ...K8sDatastoreOption) *K8sDatastore {
3925
4026// The datastore is a local cache of relevant data for the given InferencePool (currently all pulled from k8s-api)
4127type K8sDatastore struct {
42- client kubernetes.Interface
4328 // poolMu is used to synchronize access to the inferencePool.
44- poolMu sync.RWMutex
45- inferencePool * v1alpha1.InferencePool
46- podListerFactory PodListerFactory
47- podLister * PodLister
48- InferenceModels * sync.Map
29+ poolMu sync.RWMutex
30+ inferencePool * v1alpha1.InferencePool
31+ InferenceModels * sync.Map
32+ pods * sync.Map
4933}
5034
5135type K8sDatastoreOption func (* K8sDatastore )
52- type PodListerFactory func (* v1alpha1.InferencePool ) * PodLister
5336
5437// WithPods can be used in tests to override the pods.
55- func WithPodListerFactory ( factory PodListerFactory ) K8sDatastoreOption {
38+ func WithPods ( pods [] * PodMetrics ) K8sDatastoreOption {
5639 return func (store * K8sDatastore ) {
57- store .podListerFactory = factory
40+ store .pods = & sync.Map {}
41+ for _ , pod := range pods {
42+ store .pods .Store (pod .Pod , true )
43+ }
5844 }
5945}
6046
61- type PodLister struct {
62- Lister listersv1.PodLister
63- sharedInformer informers.SharedInformerFactory
64- }
65-
66- func (l * PodLister ) listEverything () ([]* corev1.Pod , error ) {
67- return l .Lister .List (labels .Everything ())
68-
69- }
70-
71- func (ds * K8sDatastore ) SetClient (client kubernetes.Interface ) {
72- ds .client = client
73- }
74-
7547func (ds * K8sDatastore ) setInferencePool (pool * v1alpha1.InferencePool ) {
7648 ds .poolMu .Lock ()
7749 defer ds .poolMu .Unlock ()
78-
79- if ds .inferencePool != nil && cmp .Equal (ds .inferencePool .Spec .Selector , pool .Spec .Selector ) {
80- // Pool updated, but the selector stayed the same, so no need to change the informer.
81- ds .inferencePool = pool
82- return
83- }
84-
85- // New pool or selector updated.
8650 ds .inferencePool = pool
87-
88- if ds .podLister != nil && ds .podLister .sharedInformer != nil {
89- // Shutdown the old informer async since this takes a few seconds.
90- go func () {
91- ds .podLister .sharedInformer .Shutdown ()
92- }()
93- }
94-
95- if ds .podListerFactory != nil {
96- // Create a new informer with the new selector.
97- ds .podLister = ds .podListerFactory (ds .inferencePool )
98- if ds .podLister != nil && ds .podLister .sharedInformer != nil {
99- ctx := context .Background ()
100- ds .podLister .sharedInformer .Start (ctx .Done ())
101- ds .podLister .sharedInformer .WaitForCacheSync (ctx .Done ())
102- }
103- }
10451}
10552
10653func (ds * K8sDatastore ) getInferencePool () (* v1alpha1.InferencePool , error ) {
@@ -112,58 +59,13 @@ func (ds *K8sDatastore) getInferencePool() (*v1alpha1.InferencePool, error) {
11259 return ds .inferencePool , nil
11360}
11461
115- func (ds * K8sDatastore ) createPodLister (pool * v1alpha1.InferencePool ) * PodLister {
116- if ds .client == nil {
117- return nil
118- }
119- klog .V (logutil .DEFAULT ).Infof ("Creating informer for pool %v" , pool .Name )
120- selectorSet := make (map [string ]string )
121- for k , v := range pool .Spec .Selector {
122- selectorSet [string (k )] = string (v )
123- }
124-
125- newPodInformer := func (cs clientset.Interface , resyncPeriod time.Duration ) cache.SharedIndexInformer {
126- informer := informersv1 .NewFilteredPodInformer (cs , pool .Namespace , resyncPeriod , cache.Indexers {}, func (options * metav1.ListOptions ) {
127- options .LabelSelector = labels .SelectorFromSet (selectorSet ).String ()
128- })
129- err := informer .SetTransform (func (obj interface {}) (interface {}, error ) {
130- // Remove unnecessary fields to improve memory footprint.
131- if accessor , err := meta .Accessor (obj ); err == nil {
132- if accessor .GetManagedFields () != nil {
133- accessor .SetManagedFields (nil )
134- }
135- }
136- return obj , nil
137- })
138- if err != nil {
139- klog .Errorf ("Failed to set pod transformer: %v" , err )
140- }
141- return informer
142- }
143- // 0 means we disable resyncing, it is not really useful to resync every hour (the controller-runtime default),
144- // if things go wrong in the watch, no one will wait for an hour for things to get fixed.
145- // As precedence, kube-scheduler also disables this since it is expensive to list all pods from the api-server regularly.
146- resyncPeriod := time .Duration (0 )
147- sharedInformer := informers .NewSharedInformerFactory (ds .client , resyncPeriod )
148- sharedInformer .InformerFor (& v1.Pod {}, newPodInformer )
149-
150- return & PodLister {
151- Lister : sharedInformer .Core ().V1 ().Pods ().Lister (),
152- sharedInformer : sharedInformer ,
153- }
154- }
155-
156- func (ds * K8sDatastore ) getPods () ([]* corev1.Pod , error ) {
157- ds .poolMu .RLock ()
158- defer ds .poolMu .RUnlock ()
159- if ! ds .HasSynced () {
160- return nil , errors .New ("InferencePool is not initialized in datastore" )
161- }
162- pods , err := ds .podLister .listEverything ()
163- if err != nil {
164- return nil , err
165- }
166- return pods , nil
62+ func (ds * K8sDatastore ) GetPodIPs () []string {
63+ var ips []string
64+ ds .pods .Range (func (name , pod any ) bool {
65+ ips = append (ips , pod .(* corev1.Pod ).Status .PodIP )
66+ return true
67+ })
68+ return ips
16769}
16870
16971func (s * K8sDatastore ) FetchModelData (modelName string ) (returnModel * v1alpha1.InferenceModel ) {
0 commit comments