Skip to content

Commit 75a0934

Browse files
committed
Created channel interface for KVEvents processing and added unit test cases (#46)
1 parent 56b4bd5 commit 75a0934

File tree

7 files changed

+984
-6
lines changed

7 files changed

+984
-6
lines changed
Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
/*
2+
Copyright 2025 The llm-d Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package main demonstrates the use of different Channel implementations
18+
// for KV Events processing, showcasing the extensibility provided by the
19+
// Channel interface abstraction introduced in issue #46.
20+
package main
21+
22+
import (
23+
"context"
24+
"fmt"
25+
"os"
26+
"os/signal"
27+
"syscall"
28+
"time"
29+
30+
"github.com/vmihailenco/msgpack/v5"
31+
"k8s.io/klog/v2"
32+
33+
"github.com/llm-d/llm-d-kv-cache-manager/examples/testdata"
34+
"github.com/llm-d/llm-d-kv-cache-manager/pkg/kvcache"
35+
"github.com/llm-d/llm-d-kv-cache-manager/pkg/kvcache/kvblock"
36+
"github.com/llm-d/llm-d-kv-cache-manager/pkg/kvcache/kvevents"
37+
)
38+
39+
const (
40+
envHFToken = "HF_TOKEN"
41+
envChannelType = "CHANNEL_TYPE" // "zmq", "mock", or "http-sse"
42+
)
43+
44+
func main() {
45+
ctx, cancel := context.WithCancel(context.Background())
46+
defer cancel()
47+
48+
logger := klog.FromContext(ctx)
49+
logger.Info("Starting KV Events Channel Interface Demo")
50+
51+
// Get channel type from environment
52+
channelType := os.Getenv(envChannelType)
53+
if channelType == "" {
54+
channelType = "mock" // Default to mock for demo purposes
55+
}
56+
logger.Info("Using channel type", "type", channelType)
57+
58+
// Setup KV Cache Indexer
59+
kvCacheIndexer, err := setupKVCacheIndexer(ctx)
60+
if err != nil {
61+
logger.Error(err, "failed to setup KVCacheIndexer")
62+
return
63+
}
64+
65+
// Setup events pool with the specified channel type
66+
eventsPool, publisher, err := setupEventsPoolWithChannelType(ctx, kvCacheIndexer.KVBlockIndex(), channelType)
67+
if err != nil {
68+
logger.Error(err, "failed to setup events pool")
69+
return
70+
}
71+
defer func() {
72+
if publisher != nil {
73+
publisher.Close()
74+
}
75+
}()
76+
77+
// Start events pool
78+
eventsPool.Start(ctx)
79+
logger.Info("Events pool started", "channelType", channelType)
80+
81+
// Setup graceful shutdown
82+
sigChan := make(chan os.Signal, 1)
83+
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
84+
85+
go func() {
86+
<-sigChan
87+
logger.Info("Received shutdown signal")
88+
cancel()
89+
}()
90+
91+
// Run the demonstration
92+
if err := runChannelDemo(ctx, kvCacheIndexer, publisher, channelType); err != nil {
93+
logger.Error(err, "failed to run channel demo")
94+
return
95+
}
96+
97+
// Wait for shutdown signal
98+
<-ctx.Done()
99+
logger.Info("Shutting down...")
100+
101+
// Graceful shutdown of events pool
102+
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
103+
defer shutdownCancel()
104+
eventsPool.Shutdown(shutdownCtx)
105+
}
106+
107+
func setupKVCacheIndexer(ctx context.Context) (*kvcache.Indexer, error) {
108+
logger := klog.FromContext(ctx)
109+
110+
config := kvcache.NewDefaultConfig()
111+
if token := os.Getenv(envHFToken); token != "" {
112+
config.TokenizersPoolConfig.HuggingFaceToken = token
113+
}
114+
config.TokenProcessorConfig.BlockSize = 256
115+
116+
kvCacheIndexer, err := kvcache.NewKVCacheIndexer(ctx, config)
117+
if err != nil {
118+
return nil, err
119+
}
120+
121+
logger.Info("Created Indexer")
122+
go kvCacheIndexer.Run(ctx)
123+
logger.Info("Started Indexer")
124+
125+
return kvCacheIndexer, nil
126+
}
127+
128+
func setupEventsPoolWithChannelType(ctx context.Context, kvBlockIndex kvblock.Index, channelType string) (*kvevents.Pool, kvevents.Publisher, error) {
129+
logger := klog.FromContext(ctx)
130+
cfg := kvevents.DefaultConfig()
131+
132+
var pool *kvevents.Pool
133+
var publisher kvevents.Publisher
134+
135+
switch channelType {
136+
case "zmq":
137+
// Use default ZMQ implementation
138+
logger.Info("Creating events pool with ZMQ channel", "config", cfg)
139+
pool = kvevents.NewPool(cfg, kvBlockIndex)
140+
// Note: In a real scenario, you'd set up a real ZMQ publisher here
141+
publisher = kvevents.NewMockPublisher() // Using mock for simplicity in this example
142+
143+
case "mock":
144+
// Use mock channel for testing
145+
logger.Info("Creating events pool with Mock channel")
146+
mockChannel := kvevents.NewMockChannel(nil) // Will be set after pool creation
147+
pool = kvevents.NewPoolWithChannel(cfg, kvBlockIndex, mockChannel)
148+
149+
// Update channel reference
150+
mockChannel = kvevents.NewMockChannel(pool)
151+
pool = kvevents.NewPoolWithChannel(cfg, kvBlockIndex, mockChannel)
152+
153+
publisher = &mockChannelPublisher{channel: mockChannel}
154+
155+
case "http-sse":
156+
// Use HTTP SSE implementation
157+
logger.Info("Creating events pool with HTTP SSE channel", "endpoint", cfg.ZMQEndpoint)
158+
httpChannel := kvevents.NewHTTPSSEChannel(pool, "http://localhost:8080/sse")
159+
pool = kvevents.NewPoolWithChannel(cfg, kvBlockIndex, httpChannel)
160+
publisher = kvevents.NewHTTPSSEPublisher("http://localhost:8080/publish")
161+
162+
default:
163+
return nil, nil, fmt.Errorf("unsupported channel type: %s", channelType)
164+
}
165+
166+
return pool, publisher, nil
167+
}
168+
169+
// mockChannelPublisher wraps MockChannel to implement the Publisher interface
170+
type mockChannelPublisher struct {
171+
channel *kvevents.MockChannel
172+
}
173+
174+
func (m *mockChannelPublisher) PublishEvent(ctx context.Context, topic string, batch interface{}) error {
175+
// Convert batch to the expected Message format
176+
batchBytes, err := msgpack.Marshal(batch)
177+
if err != nil {
178+
return fmt.Errorf("failed to marshal batch: %w", err)
179+
}
180+
181+
// Extract pod identifier and model name from topic (format: kv@<pod>@<model>)
182+
parts := []string{"kv", "test-pod", "test-model"}
183+
if len(parts) >= 3 {
184+
// For this demo, we'll use hardcoded values, but normally you'd parse the topic
185+
message := &kvevents.Message{
186+
Topic: topic,
187+
Payload: batchBytes,
188+
Seq: uint64(time.Now().UnixNano()),
189+
PodIdentifier: "test-pod",
190+
ModelName: testdata.ModelName,
191+
}
192+
m.channel.SendMessage(message)
193+
}
194+
195+
return nil
196+
}
197+
198+
func (m *mockChannelPublisher) Close() error {
199+
return m.channel.Close()
200+
}
201+
202+
func runChannelDemo(ctx context.Context, kvCacheIndexer *kvcache.Indexer, publisher kvevents.Publisher, channelType string) error {
203+
logger := klog.FromContext(ctx)
204+
205+
logger.Info("Starting Channel Interface Demo", "channelType", channelType, "model", testdata.ModelName)
206+
207+
// Initial query - should be empty
208+
pods, err := kvCacheIndexer.GetPodScores(ctx, testdata.Prompt, testdata.ModelName, nil)
209+
if err != nil {
210+
return err
211+
}
212+
logger.Info("Initial pod scores (should be empty)", "pods", pods)
213+
214+
// Give the channel a moment to start
215+
time.Sleep(1 * time.Second)
216+
217+
// Simulate publishing BlockStored events using the configured publisher
218+
logger.Info("Publishing BlockStored events via channel", "channelType", channelType)
219+
220+
blockStoredPayload, _ := msgpack.Marshal(kvevents.BlockStored{
221+
BlockHashes: testdata.PromptHashes,
222+
})
223+
224+
eventBatch := kvevents.EventBatch{
225+
TS: float64(time.Now().UnixNano()) / 1e9,
226+
Events: []msgpack.RawMessage{blockStoredPayload},
227+
}
228+
229+
topic := fmt.Sprintf("kv@demo-pod@%s", testdata.ModelName)
230+
if err := publisher.PublishEvent(ctx, topic, eventBatch); err != nil {
231+
return fmt.Errorf("failed to publish event: %w", err)
232+
}
233+
234+
logger.Info("Published BlockStored event", "topic", topic, "blocks", len(testdata.PromptHashes))
235+
236+
// Wait for events to be processed
237+
logger.Info("Waiting for events to be processed...")
238+
time.Sleep(3 * time.Second)
239+
240+
// Query again to see the effect
241+
pods, err = kvCacheIndexer.GetPodScores(ctx, testdata.Prompt, testdata.ModelName, nil)
242+
if err != nil {
243+
return err
244+
}
245+
logger.Info("Pod scores after BlockStored events", "pods", pods, "channelType", channelType)
246+
247+
// Demonstrate successful processing
248+
if len(pods) > 0 {
249+
logger.Info("SUCCESS: Channel interface working correctly!",
250+
"channelType", channelType,
251+
"foundPods", len(pods))
252+
} else {
253+
logger.Info("No pods found - this might be expected depending on the channel implementation")
254+
}
255+
256+
logger.Info("Channel demo completed. Pool continues listening for more events...")
257+
logger.Info("Press Ctrl+C to shutdown")
258+
259+
// Keep running until context is cancelled
260+
<-ctx.Done()
261+
return nil
262+
}

pkg/kvcache/kvevents/channel.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
Copyright 2025 The llm-d Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package kvevents
18+
19+
import (
20+
"context"
21+
)
22+
23+
// Channel represents an abstract message channel for KV events.
24+
// This interface allows for different implementations (ZMQ, HTTP SSE, NATS, test mocks, etc.)
25+
// providing extensibility and better testability as suggested in issue #46.
26+
type Channel interface {
27+
// Start begins listening for messages and forwarding them to the pool.
28+
// It should run until the provided context is canceled.
29+
Start(ctx context.Context)
30+
31+
// Close gracefully shuts down the channel and cleans up resources.
32+
Close() error
33+
}
34+
35+
// Publisher represents an abstract publisher for KV events.
36+
// This interface allows for different publishing implementations.
37+
type Publisher interface {
38+
// PublishEvent publishes a KV cache event batch to the specified topic.
39+
PublishEvent(ctx context.Context, topic string, batch interface{}) error
40+
41+
// Close closes the publisher and cleans up resources.
42+
Close() error
43+
}

0 commit comments

Comments
 (0)