Skip to content

Commit 966992a

Browse files
committed
Created channel interface for KVEvents processing and added unit test cases (#46)
1 parent 56b4bd5 commit 966992a

File tree

7 files changed

+1007
-6
lines changed

7 files changed

+1007
-6
lines changed
Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
/*
2+
Copyright 2025 The llm-d Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package main demonstrates the use of different Channel implementations
18+
// for KV Events processing, showcasing the extensibility provided by the
19+
// Channel interface abstraction introduced in issue #46.
20+
package main
21+
22+
import (
23+
"context"
24+
"fmt"
25+
"os"
26+
"os/signal"
27+
"syscall"
28+
"time"
29+
30+
"github.com/vmihailenco/msgpack/v5"
31+
"k8s.io/klog/v2"
32+
33+
"github.com/llm-d/llm-d-kv-cache-manager/examples/testdata"
34+
"github.com/llm-d/llm-d-kv-cache-manager/pkg/kvcache"
35+
"github.com/llm-d/llm-d-kv-cache-manager/pkg/kvcache/kvblock"
36+
"github.com/llm-d/llm-d-kv-cache-manager/pkg/kvcache/kvevents"
37+
)
38+
39+
const (
40+
envHFToken = "HF_TOKEN"
41+
envChannelType = "CHANNEL_TYPE" // "zmq", "mock", or "http-sse"
42+
)
43+
44+
func main() {
45+
ctx, cancel := context.WithCancel(context.Background())
46+
defer cancel()
47+
48+
logger := klog.FromContext(ctx)
49+
logger.Info("Starting KV Events Channel Interface Demo")
50+
51+
// Get channel type from environment
52+
channelType := os.Getenv(envChannelType)
53+
if channelType == "" {
54+
channelType = "mock" // Default to mock for demo purposes
55+
}
56+
logger.Info("Using channel type", "type", channelType)
57+
58+
// Setup KV Cache Indexer
59+
kvCacheIndexer, err := setupKVCacheIndexer(ctx)
60+
if err != nil {
61+
logger.Error(err, "failed to setup KVCacheIndexer")
62+
return
63+
}
64+
65+
// Setup events pool with the specified channel type
66+
eventsPool, publisher, err := setupEventsPoolWithChannelType(ctx, kvCacheIndexer.KVBlockIndex(), channelType)
67+
if err != nil {
68+
logger.Error(err, "failed to setup events pool")
69+
return
70+
}
71+
defer func() {
72+
if publisher != nil {
73+
publisher.Close()
74+
}
75+
}()
76+
77+
// Start events pool
78+
eventsPool.Start(ctx)
79+
logger.Info("Events pool started", "channelType", channelType)
80+
81+
// Setup graceful shutdown
82+
sigChan := make(chan os.Signal, 1)
83+
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
84+
85+
go func() {
86+
<-sigChan
87+
logger.Info("Received shutdown signal")
88+
cancel()
89+
}()
90+
91+
// Run the demonstration
92+
if err := runChannelDemo(ctx, kvCacheIndexer, publisher, channelType); err != nil {
93+
logger.Error(err, "failed to run channel demo")
94+
return
95+
}
96+
97+
// Wait for shutdown signal
98+
<-ctx.Done()
99+
logger.Info("Shutting down...")
100+
101+
// Graceful shutdown of events pool
102+
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
103+
defer shutdownCancel()
104+
eventsPool.Shutdown(shutdownCtx)
105+
}
106+
107+
func setupKVCacheIndexer(ctx context.Context) (*kvcache.Indexer, error) {
108+
logger := klog.FromContext(ctx)
109+
110+
config := kvcache.NewDefaultConfig()
111+
if token := os.Getenv(envHFToken); token != "" {
112+
config.TokenizersPoolConfig.HuggingFaceToken = token
113+
}
114+
config.TokenProcessorConfig.BlockSize = 256
115+
116+
kvCacheIndexer, err := kvcache.NewKVCacheIndexer(ctx, config)
117+
if err != nil {
118+
return nil, err
119+
}
120+
121+
logger.Info("Created Indexer")
122+
go kvCacheIndexer.Run(ctx)
123+
logger.Info("Started Indexer")
124+
125+
return kvCacheIndexer, nil
126+
}
127+
128+
func setupEventsPoolWithChannelType(ctx context.Context, kvBlockIndex kvblock.Index,
129+
channelType string) (*kvevents.Pool, kvevents.Publisher, error) {
130+
logger := klog.FromContext(ctx)
131+
cfg := kvevents.DefaultConfig()
132+
133+
var pool *kvevents.Pool
134+
var publisher kvevents.Publisher
135+
136+
switch channelType {
137+
case "zmq":
138+
// Use default ZMQ implementation
139+
logger.Info("Creating events pool with ZMQ channel", "config", cfg)
140+
pool = kvevents.NewPool(cfg, kvBlockIndex)
141+
// Note: In a real scenario, you'd set up a real ZMQ publisher here
142+
publisher = kvevents.NewMockPublisher() // Using mock for simplicity in this example
143+
144+
case "mock":
145+
// Use mock channel for testing
146+
logger.Info("Creating events pool with Mock channel")
147+
mockChannel := kvevents.NewMockChannel(nil) // Will be set after pool creation
148+
pool = kvevents.NewPoolWithChannel(cfg, kvBlockIndex, mockChannel)
149+
150+
// Update channel reference
151+
mockChannel = kvevents.NewMockChannel(pool)
152+
pool = kvevents.NewPoolWithChannel(cfg, kvBlockIndex, mockChannel)
153+
154+
publisher = &mockChannelPublisher{channel: mockChannel}
155+
156+
case "http-sse":
157+
// Use HTTP SSE implementation
158+
logger.Info("Creating events pool with HTTP SSE channel", "endpoint", cfg.ZMQEndpoint)
159+
httpChannel := kvevents.NewHTTPSSEChannel(pool, "http://localhost:8080/sse")
160+
pool = kvevents.NewPoolWithChannel(cfg, kvBlockIndex, httpChannel)
161+
publisher = kvevents.NewHTTPSSEPublisher("http://localhost:8080/publish")
162+
163+
default:
164+
return nil, nil, fmt.Errorf("unsupported channel type: %s", channelType)
165+
}
166+
167+
return pool, publisher, nil
168+
}
169+
170+
// mockChannelPublisher wraps MockChannel to implement the Publisher interface.
171+
type mockChannelPublisher struct {
172+
channel *kvevents.MockChannel
173+
}
174+
175+
func (m *mockChannelPublisher) PublishEvent(ctx context.Context, topic string, batch interface{}) error {
176+
// Convert batch to the expected Message format
177+
batchBytes, err := msgpack.Marshal(batch)
178+
if err != nil {
179+
return fmt.Errorf("failed to marshal batch: %w", err)
180+
}
181+
182+
// Extract pod identifier and model name from topic (format: kv@<pod>@<model>)
183+
parts := []string{"kv", "test-pod", "test-model"}
184+
if len(parts) >= 3 {
185+
// For this demo, we'll use hardcoded values, but normally you'd parse the topic
186+
message := &kvevents.Message{
187+
Topic: topic,
188+
Payload: batchBytes,
189+
Seq: uint64(time.Now().Unix()),
190+
PodIdentifier: "test-pod",
191+
ModelName: testdata.ModelName,
192+
}
193+
m.channel.SendMessage(message)
194+
}
195+
196+
return nil
197+
}
198+
199+
func (m *mockChannelPublisher) Close() error {
200+
return m.channel.Close()
201+
}
202+
203+
func runChannelDemo(ctx context.Context, kvCacheIndexer *kvcache.Indexer,
204+
publisher kvevents.Publisher, channelType string) error {
205+
logger := klog.FromContext(ctx)
206+
207+
logger.Info("Starting Channel Interface Demo", "channelType", channelType, "model", testdata.ModelName)
208+
209+
// Initial query - should be empty
210+
pods, err := kvCacheIndexer.GetPodScores(ctx, testdata.Prompt, testdata.ModelName, nil)
211+
if err != nil {
212+
return err
213+
}
214+
logger.Info("Initial pod scores (should be empty)", "pods", pods)
215+
216+
// Give the channel a moment to start
217+
time.Sleep(1 * time.Second)
218+
219+
// Simulate publishing BlockStored events using the configured publisher
220+
logger.Info("Publishing BlockStored events via channel", "channelType", channelType)
221+
222+
blockStoredPayload, err := msgpack.Marshal(kvevents.BlockStored{
223+
BlockHashes: testdata.PromptHashes,
224+
})
225+
if err != nil {
226+
return fmt.Errorf("failed to marshal BlockStored event: %w", err)
227+
}
228+
229+
eventBatch := kvevents.EventBatch{
230+
TS: float64(time.Now().UnixNano()) / 1e9,
231+
Events: []msgpack.RawMessage{blockStoredPayload},
232+
}
233+
234+
topic := fmt.Sprintf("kv@demo-pod@%s", testdata.ModelName)
235+
if err := publisher.PublishEvent(ctx, topic, eventBatch); err != nil {
236+
return fmt.Errorf("failed to publish event: %w", err)
237+
}
238+
239+
logger.Info("Published BlockStored event", "topic", topic, "blocks", len(testdata.PromptHashes))
240+
241+
// Wait for events to be processed
242+
logger.Info("Waiting for events to be processed...")
243+
time.Sleep(3 * time.Second)
244+
245+
// Query again to see the effect
246+
pods, err = kvCacheIndexer.GetPodScores(ctx, testdata.Prompt, testdata.ModelName, nil)
247+
if err != nil {
248+
return err
249+
}
250+
logger.Info("Pod scores after BlockStored events", "pods", pods, "channelType", channelType)
251+
252+
// Demonstrate successful processing
253+
if len(pods) > 0 {
254+
logger.Info("SUCCESS: Channel interface working correctly!",
255+
"channelType", channelType,
256+
"foundPods", len(pods))
257+
} else {
258+
logger.Info("No pods found - this might be expected depending on the channel implementation")
259+
}
260+
261+
logger.Info("Channel demo completed. Pool continues listening for more events...")
262+
logger.Info("Press Ctrl+C to shutdown")
263+
264+
// Keep running until context is cancelled
265+
<-ctx.Done()
266+
return nil
267+
}

pkg/kvcache/kvevents/channel.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
Copyright 2025 The llm-d Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package kvevents
18+
19+
import (
20+
"context"
21+
)
22+
23+
// Channel represents an abstract message channel for KV events.
24+
// This interface allows for different implementations (ZMQ, HTTP SSE, NATS, test mocks, etc.)
25+
// providing extensibility and better testability as suggested in issue #46.
26+
type Channel interface {
27+
// Start begins listening for messages and forwarding them to the pool.
28+
// It should run until the provided context is canceled.
29+
Start(ctx context.Context)
30+
31+
// Close gracefully shuts down the channel and cleans up resources.
32+
Close() error
33+
}
34+
35+
// Publisher represents an abstract publisher for KV events.
36+
// This interface allows for different publishing implementations.
37+
type Publisher interface {
38+
// PublishEvent publishes a KV cache event batch to the specified topic.
39+
PublishEvent(ctx context.Context, topic string, batch interface{}) error
40+
41+
// Close closes the publisher and cleans up resources.
42+
Close() error
43+
}

0 commit comments

Comments
 (0)