-
Notifications
You must be signed in to change notification settings - Fork 78
Automatic vLLM pod discovery + ZMQ subscription mgmt for KVEvents #212
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
eceea98 to
670d703
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements automatic vLLM pod discovery with dynamic ZMQ subscriber management for KV-cache events. The implementation adds a Kubernetes pod reconciler controller that watches vLLM pods and automatically creates/removes ZMQ subscribers based on pod lifecycle. The kvevents package has been moved from pkg/kvcache/kvevents to pkg/kvevents to reflect its independent status as a top-level component.
Key changes:
- Pod reconciler controller for automatic per-pod ZMQ subscriber management
- Thread-safe SubscriberManager for managing multiple concurrent ZMQ connections
- Configuration support for both global socket mode and pod discovery mode
Reviewed changes
Copilot reviewed 11 out of 15 changed files in this pull request and generated 15 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/kvevents/zmq_subscriber.go | New ZMQ subscriber implementation for connecting to individual vLLM pods |
| pkg/kvevents/subscriber_manager.go | Thread-safe manager for multiple ZMQ subscribers with lifecycle management |
| pkg/kvevents/controller/pod_reconciler.go | Kubernetes controller that watches pods and manages subscribers |
| pkg/kvevents/events.go | Event type definitions moved from kvcache package |
| pkg/kvevents/pool.go | Updated pool configuration with pod discovery support, removed global subscriber |
| pkg/kvevents/doc.go | Package documentation for the kvevents system |
| tests/integration/kv_events_test.go | Integration tests for Pool and SubscriberManager |
| pkg/kvevents/subscriber_manager_test.go | Unit tests for subscriber manager functionality |
| examples/kv_events/pod_reconciler/main.go | Example demonstrating pod reconciler usage |
| examples/kv_events/online/main.go | Updated import path for kvevents package |
| examples/kv_cache_index_service/server/server.go | Updated import path for kvevents package |
| examples/helper/events.go | Updated import path for kvevents package |
| docs/configuration.md | Comprehensive documentation for pod discovery configuration |
| go.mod | Added k8s.io/api dependency for pod reconciler |
| go.sum | Updated dependency checksums |
Comments suppressed due to low confidence (2)
pkg/kvevents/pool.go:293
- Inconsistent casing in device tier default. The constant is named "defaultEventSourceDeviceTier" with value "GPU" (uppercase), but when Medium is provided from vLLM, it's converted to lowercase with strings.ToLower. This means the default tier will be "GPU" while custom tiers will be lowercase like "cpu" or "disk", creating inconsistency. Either the default should be "gpu" (lowercase) or the custom values shouldn't be converted to lowercase.
pkg/kvevents/pool.go:345 - Inconsistent casing in device tier default. The constant is named "defaultEventSourceDeviceTier" with value "GPU" (uppercase), but when Medium is provided from vLLM, it's converted to lowercase with strings.ToLower. This means the default tier will be "GPU" while custom tiers will be lowercase like "cpu" or "disk", creating inconsistency. Either the default should be "gpu" (lowercase) or the custom values shouldn't be converted to lowercase.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| subCtx, cancel := context.WithCancel(ctx) | ||
| go subscriber.Start(subCtx) |
Copilot
AI
Dec 14, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential context leak: The subscriber is started with context.WithCancel(ctx) where ctx is the request context passed to EnsureSubscriber. If this is a reconciler request context, it may be canceled before the subscriber should stop. The subscriber goroutines should likely use a longer-lived context (like a manager context) rather than the request context, with the cancel func used only for explicit cleanup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot's review here seems like an important catch, my understanding is that the controller-runtime cancels the Reconcile context immediately after the reconcile function returns. If that's the case, won't the subscriber goroutine receive that cancellation and shut down almost immediately, and we'll lose the connection?
Should we be using a detached context (like context.Background()) here since the subscription needs to outlive the reconciliation request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot hallucinated this being the request context. The context passed here, as can be seen in the scorer reference, is indeed context.Background(). It's just up to the caller to set the context.
| entry.cancel() | ||
| delete(sm.subscribers, podIdentifier) |
Copilot
AI
Dec 14, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential resource leak: when a subscriber's context is canceled (either by endpoint change on line 70 or during shutdown on line 121), the goroutine running subscriber.Start will exit, but the ZMQ socket cleanup happens in runSubscriber's defer. However, there's no guarantee that the socket is properly closed before the subscriberEntry is removed from the map. Consider adding a sync mechanism or wait group to ensure the subscriber goroutine has fully cleaned up before removing the entry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.
f0b6481 to
ddf6d02
Compare
951f23a to
7ce9191
Compare
5b784dc to
2f91975
Compare
|
Tested ( - type: precise-prefix-cache-scorer
parameters:
tokenProcessorConfig:
blockSize: 64
hashSeed: "42"
indexerConfig:
tokenizersPoolConfig:
modelName: "Qwen/Qwen3-32B"
hf:
tokenizersCacheDir: "/tmp/tokenizers"
kvEventsConfig:
topicFilter: "kv@"
concurrency: 8
discoverPods: true
podDiscoveryConfig:
socketPort: 5556with vLLM (port 5556 exposed): |
| typedName plugins.TypedName | ||
| kvCacheIndexer *kvcache.Indexer | ||
|
|
||
| // until the IGW data-layer is ready to provide endpoint events, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@elevran this will be PR'd into the scheduler. Note that this includes other required updates for when syncing with the to-be v0.5 release.
sagearc
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work @vMaroon! I’ve published part of the review and will follow up with the rest shortly.
| ### KV-Event Pool Configuration (`Config`) | ||
|
|
||
| Configures the ZMQ event processing pool for handling KV cache events. | ||
| Configures the ZMQ event processing pool for handling KV cache events. The pool supports two modes: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current naming (GlobalSocket vs PodReconciler) reflects implementation details. I suggest renaming these to reflect the intent rather than the mechanism, for instance:
Global Socket Mode->Static Endpoint Mode(Implies a fixed target)Pod Reconciler Mode->Auto-Discovery Mode(Implies dynamic finding of targets)
| /* | ||
| Copyright 2025 The llm-d Authors. | ||
|
|
||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
| // initialize the subscribers cache only if pod discovery is enabled | ||
| if config.KVEventsConfig.DiscoverPods == true { | ||
| // initialize the subscribers TTL cache | ||
| subscriptionTimeout := 10 * time.Minute |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not keep the connection alive (assuming a specific pod might actually not receive incoming requests for that period)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As long as any request to any pod comes in, a live pod gets refreshed. If the cluster goes static (no requests at all) or if a pod disappears from the serving list - this timer starts counting. 10 minutes in this state means the pod is likely gone.
| if s.kvEventsConfig.DiscoverPods == true { | ||
| // update subscribers here temporarily | ||
| for _, pod := range pods { | ||
| podObj := pod.GetPod() | ||
| if podObj == nil { | ||
| continue | ||
| } | ||
| podKey := podObj.NamespacedName.String() | ||
| s.subscribersCache.Set(podKey, struct{}{}, 0) // use default TTL | ||
|
|
||
| if err := s.subscribersManager.EnsureSubscriber(context.Background(), podKey, // dont use request ctx | ||
| fmt.Sprintf("tcp://%s:%d", podObj.Address, s.kvEventsConfig.PodDiscoveryConfig.SocketPort), | ||
| s.kvEventsConfig.TopicFilter, true); err != nil { | ||
| logger.Error(err, "Failed to ensure KV-events subscriber for pod", "pod", podKey, | ||
| "endpoint", podObj.Address) | ||
| continue | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implementation mixes network discovery into the critical Score() path.
While a full refactor to an asynchronous mechanism is out of scope for this release, I believe we should document this as technical debt. We can add a TODO noting that this discovery logic belongs in a separate async loop (similar to events.Pool), not in the synchronous scoring path.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a TODO above the subscribersCache - this update mechanism is a short lived state, it does not belong here at all, but is the minimum we can do right now to ship the feature for trial. llm-d precise prefix-cache scheduling guide won't switch to discovery, it will only have a sub active-active HA guide for now.
| func (s *PrecisePrefixCacheScorer) getScores(ctx context.Context, request *types.LLMRequest) (map[string]float64, error) { | ||
| logger := log.FromContext(ctx).WithName(s.typedName.String()) | ||
| traceLogger := logger.V(logutil.TRACE) | ||
|
|
||
| traceLogger.Info("Getting scores", | ||
| "isChatCompletions", request.Body != nil && request.Body.ChatCompletions != nil, | ||
| "isCompletions", request.Body != nil && request.Body.Completions != nil) | ||
|
|
||
| // The upstream parser guarantees exactly one body is populated, but we defensively prioritize chat completions. | ||
| // If an unexpected dual payload slips through (parser regression/new client), log it and use chat semantics. | ||
| if request.Body != nil && request.Body.ChatCompletions != nil { | ||
| if request.Body.Completions != nil { | ||
| traceLogger.Info("Both chat/completions and completions present; defaulting to chat/completions") | ||
| } | ||
|
|
||
| renderReq := &preprocessing.ApplyChatTemplateRequest{ | ||
| Conversation: make([][]preprocessing.Conversation, 0), | ||
| Tools: request.Body.ChatCompletions.Tools, | ||
| Documents: request.Body.ChatCompletions.Documents, | ||
| ChatTemplate: request.Body.ChatCompletions.ChatTemplate, | ||
| ReturnAssistantTokensMask: request.Body.ChatCompletions.ReturnAssistantTokensMask, | ||
| ContinueFinalMessage: request.Body.ChatCompletions.ContinueFinalMessage, | ||
| AddGenerationPrompt: request.Body.ChatCompletions.AddGenerationPrompt, | ||
| ChatTemplateKWArgs: request.Body.ChatCompletions.ChatTemplateKWArgs, | ||
| } | ||
|
|
||
| // Convert messages to the format expected by the renderer | ||
| for _, msg := range request.Body.ChatCompletions.Messages { | ||
| renderReq.Conversation = append(renderReq.Conversation, []preprocessing.Conversation{{ | ||
| Role: msg.Role, | ||
| Content: msg.Content.Raw, | ||
| }}) | ||
| } | ||
|
|
||
| traceLogger.Info("Processing chat completion request", | ||
| "conversationCount", len(renderReq.Conversation), | ||
| "toolsCount", len(renderReq.Tools), | ||
| "documentsCount", len(renderReq.Documents)) | ||
|
|
||
| scores, err := s.kvCacheIndexer.GetPodScores(ctx, renderReq, "", request.TargetModel, nil) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to get pod scores for chat/completions: %w", err) | ||
| } | ||
| return scores, nil | ||
| } | ||
|
|
||
| // For regular completions, use the prompt directly | ||
| if request.Body != nil && request.Body.Completions != nil { | ||
| prompt := request.Body.Completions.Prompt | ||
| traceLogger.Info("Using completion prompt directly", "promptLength", len(prompt)) | ||
|
|
||
| scores, err := s.kvCacheIndexer.GetPodScores(ctx, nil, prompt, request.TargetModel, nil) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to get pod scores for completions: %w", err) | ||
| } | ||
| return scores, nil | ||
| } | ||
|
|
||
| return nil, errors.New("no valid input found in request") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Later we should decouple the chat template rendering to the tokenizer
…ule rename artifacts Signed-off-by: Maroon Ayoub <[email protected]>
Signed-off-by: Maroon Ayoub <[email protected]>
Signed-off-by: Maroon Ayoub <[email protected]>
- fix zmq connection mgmt Signed-off-by: Maroon Ayoub <[email protected]>
Signed-off-by: Maroon Ayoub <[email protected]>
Signed-off-by: Maroon Ayoub <[email protected]>
Signed-off-by: Maroon Ayoub <[email protected]>
fce7a62 to
324a6ae
Compare
|
Thanks @sagearc, addressed. |
sagearc
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation looks good to me. My only concern is the context handling in the Reconcile loop, which will likely cause subscribers to drop immediately
| subCtx, cancel := context.WithCancel(ctx) | ||
| go subscriber.Start(subCtx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot's review here seems like an important catch, my understanding is that the controller-runtime cancels the Reconcile context immediately after the reconcile function returns. If that's the case, won't the subscriber goroutine receive that cancellation and shut down almost immediately, and we'll lose the connection?
Should we be using a detached context (like context.Background()) here since the subscription needs to outlive the reconciliation request?
|
/lgtm |
|
/approve |
| var subscribersCache *ttlcache.Cache[string, struct{}] | ||
|
|
||
| // initialize the subscribers cache only if pod discovery is enabled | ||
| if config.KVEventsConfig.DiscoverPods == true { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| ctx, cancel := context.WithCancel(context.Background()) | ||
| defer cancel() | ||
|
|
||
| // Setup graceful shutdown | ||
| sigChan := make(chan os.Signal, 1) | ||
| signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) | ||
| go func() { | ||
| <-sigChan | ||
| cancel() | ||
| }() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could simplify this signal handling logic by using ctrl.SetupSignalHandler() which does the same thing internally.
| logger.Info("Topic filter", "filter", poolConfig.TopicFilter) | ||
|
|
||
| // Start the manager (this will start the reconciler) | ||
| mgrCtx, mgrCancel := context.WithCancel(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a specific reason for creating a separate mgrCtx here instead of passing the ctx received?
If we apply the ctrl.SetupSignalHandler() suggestion above, since ctx would already handle signal cancellation, it seems like we could simplify this to just mgr.Start(ctx).
|
Once this PR is merged, will the scheduler need to add a reconciler similar to the one in this example to enable the pod discovery feature? |
| "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins" | ||
| "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" | ||
| "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework" | ||
| "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/prefix" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the ttlcache package is being used below but hasn't been imported yet.
Summary
Implemented a pod reconciler controller that manages per-pod ZMQ subscribers for KVEvents processing, and the required logic. Also moved the
kveventsto the same level ofkvcachelibrary, as should have been.Components:
kvevents.Pool- Now works with SubscriberManager instead of a global socketAdded integration + unit tests for all new functionality, updated documentation + examples.
Related Issues