66 "errors"
77 "fmt"
88 "os"
9+ "time"
910
11+ "github.com/jellydator/ttlcache/v3"
1012 "github.com/llm-d/llm-d-kv-cache/pkg/kvcache"
1113 "github.com/llm-d/llm-d-kv-cache/pkg/kvcache/kvblock"
1214 "github.com/llm-d/llm-d-kv-cache/pkg/kvevents"
@@ -46,7 +48,6 @@ var _ framework.Scorer = &PrecisePrefixCacheScorer{}
4648// a new instance of the PrefixCacheTrackingPlugin.
4749func PrecisePrefixCachePluginFactory (name string , rawParameters json.RawMessage ,
4850 handle plugins.Handle ) (plugins.Plugin , error ) {
49-
5051 indexerConfig , err := kvcache .NewDefaultConfig ()
5152 if err != nil {
5253 return nil , fmt .Errorf ("failed to initialize indexer config: %w" , err )
@@ -113,9 +114,39 @@ func New(ctx context.Context, config PrecisePrefixCachePluginConfig) (*PrecisePr
113114 pool := kvevents .NewPool (config .KVEventsConfig , kvCacheIndexer .KVBlockIndex (), tokenProcessor )
114115 pool .Start (ctx )
115116
117+ subscribersManager := kvevents .NewSubscriberManager (pool )
118+ var subscribersCache * ttlcache.Cache [string , struct {}]
119+
120+ // initialize the subscribers cache only if pod discovery is enabled
121+ if config .KVEventsConfig .DiscoverPods {
122+ // initialize the subscribers TTL cache
123+ subscriptionTimeout := 10 * time .Minute
124+ subscribersCache = ttlcache.New [string , struct {}](
125+ ttlcache.WithTTL [string , struct {}](subscriptionTimeout ),
126+ )
127+ subscribersCache .OnEviction (func (ctx context.Context , reason ttlcache.EvictionReason ,
128+ item * ttlcache.Item [string , struct {}],
129+ ) {
130+ if reason == ttlcache .EvictionReasonExpired {
131+ subscribersManager .RemoveSubscriber (ctx , item .Key ())
132+ }
133+ })
134+ go cleanCachePeriodically (ctx , subscribersCache , subscriptionTimeout )
135+ }
136+ if config .KVEventsConfig .ZMQEndpoint != "" {
137+ // setup local subscriber to support global socket mode
138+ if err := subscribersManager .EnsureSubscriber (ctx , "local-subscriber" ,
139+ config .KVEventsConfig .ZMQEndpoint , config .KVEventsConfig .TopicFilter , false ); err != nil {
140+ return nil , fmt .Errorf ("failed to create local subscriber for global socket mode: %w" , err )
141+ }
142+ }
143+
116144 return & PrecisePrefixCacheScorer {
117- typedName : plugins.TypedName {Type : PrecisePrefixCachePluginType },
118- kvCacheIndexer : kvCacheIndexer ,
145+ typedName : plugins.TypedName {Type : PrecisePrefixCachePluginType },
146+ kvCacheIndexer : kvCacheIndexer ,
147+ subscribersCache : subscribersCache ,
148+ subscribersManager : subscribersManager ,
149+ kvEventsConfig : config .KVEventsConfig ,
119150 }, nil
120151}
121152
@@ -127,6 +158,15 @@ func New(ctx context.Context, config PrecisePrefixCachePluginConfig) (*PrecisePr
127158type PrecisePrefixCacheScorer struct {
128159 typedName plugins.TypedName
129160 kvCacheIndexer * kvcache.Indexer
161+
162+ // until the IGW data-layer is ready to provide endpoint events,
163+ // we maintain a TTL cache of known pods that are discovered through
164+ // the scoring process. If a pod is not in the received endpoints list
165+ // during scoring for a certain period, we consider it gone and
166+ // stop its KV events subscription.
167+ subscribersCache * ttlcache.Cache [string , struct {}]
168+ subscribersManager * kvevents.SubscriberManager
169+ kvEventsConfig * kvevents.Config
130170}
131171
132172// TypedName returns the typed name of the plugin.
@@ -146,6 +186,26 @@ func (s *PrecisePrefixCacheScorer) Score(ctx context.Context, cycleState *types.
146186 logger := log .FromContext (ctx ).WithName (s .typedName .String ())
147187 debugLogger := logger .V (logutil .DEBUG )
148188
189+ if s .kvEventsConfig .DiscoverPods {
190+ // update subscribers here temporarily
191+ for _ , pod := range pods {
192+ podObj := pod .GetPod ()
193+ if podObj == nil {
194+ continue
195+ }
196+ podKey := podObj .NamespacedName .String ()
197+ s .subscribersCache .Set (podKey , struct {}{}, 0 ) // use default TTL
198+
199+ if err := s .subscribersManager .EnsureSubscriber (context .Background (), podKey , // dont use request ctx
200+ fmt .Sprintf ("tcp://%s:%d" , podObj .Address , s .kvEventsConfig .PodDiscoveryConfig .SocketPort ),
201+ s .kvEventsConfig .TopicFilter , true ); err != nil {
202+ logger .Error (err , "Failed to ensure KV-events subscriber for pod" , "pod" , podKey ,
203+ "endpoint" , podObj .Address )
204+ continue
205+ }
206+ }
207+ }
208+
149209 if request == nil {
150210 debugLogger .Info ("Request is nil, skipping scoring" )
151211 return nil
0 commit comments