Skip to content

Commit 09c67f9

Browse files
authored
Fix kv_events offline example (#82)
* Added new structs for events to contain TagField, changed marshaling to encoding as array Signed-off-by: Ira <[email protected]> * Fixed the sent hashes, changed publisher endpoint, fixed readme Signed-off-by: Ira <[email protected]> --------- Signed-off-by: Ira <[email protected]>
1 parent 9a135ac commit 09c67f9

File tree

5 files changed

+48
-18
lines changed

5 files changed

+48
-18
lines changed

examples/kv_events/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ Download tokenizer bindings:
1818

1919
### Running the Example
2020
```
21-
go run -ldflags="-extldflags '-L$(pwd)/lib'" examples/kv-events/offline/main.go examples/kv-events/offline/publisher.go
21+
go run -ldflags="-extldflags '-L$(pwd)/lib'" examples/kv_events/offline/main.go examples/kv_events/offline/publisher.go
2222
```
2323

2424
The example will start the KV-Cache Manager (indexer) and a dummy publisher that simulates KV-Events.

examples/kv_events/offline/main.go

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package main
1818

1919
import (
20+
"bytes"
2021
"context"
2122
_ "embed"
2223
"fmt"
@@ -135,11 +136,10 @@ func setupEventsPool(ctx context.Context, kvBlockIndex kvblock.Index) *kvevents.
135136
func setupPublisher(ctx context.Context) (*Publisher, error) {
136137
logger := klog.FromContext(ctx)
137138

138-
cfg := kvevents.DefaultConfig()
139-
140-
logger.Info("Creating ZMQ publisher (simulating vLLM engines)", "endpoint", cfg.ZMQEndpoint)
139+
endpoint := "tcp://localhost:5557"
140+
logger.Info("Creating ZMQ publisher (simulating vLLM engines)", "endpoint", endpoint)
141141

142-
publisher, err := NewPublisher(cfg.ZMQEndpoint)
142+
publisher, err := NewPublisher(endpoint)
143143
if err != nil {
144144
return nil, fmt.Errorf("failed to create ZMQ publisher: %w", err)
145145
}
@@ -166,13 +166,21 @@ func runEventsDemo(ctx context.Context, kvCacheIndexer *kvcache.Indexer, publish
166166
// Simulate vLLM engine publishing BlockStored events
167167
logger.Info("@@@ Simulating vLLM engine publishing BlockStored events...")
168168

169+
var blockStoredPayload bytes.Buffer
170+
enc := msgpack.NewEncoder(&blockStoredPayload)
171+
enc.UseArrayEncodedStructs(true)
172+
169173
//nolint // won't fail
170-
blockStoredPayloadBytes, _ := msgpack.Marshal(kvevents.BlockStored{BlockHashes: testdata.PromptHashes})
174+
enc.Encode(&kvevents.BlockStoredEvent{
175+
TypeField: "BlockStored",
176+
BlockStored: &kvevents.BlockStored{BlockHashes: testdata.PromptHashes},
177+
})
178+
171179
dpRank := 0
172180

173181
eventBatch := kvevents.EventBatch{
174182
TS: float64(time.Now().UnixNano()) / 1e9,
175-
Events: []msgpack.RawMessage{blockStoredPayloadBytes},
183+
Events: []msgpack.RawMessage{blockStoredPayload.Bytes()},
176184
DataParallelRank: &dpRank,
177185
}
178186

@@ -196,14 +204,19 @@ func runEventsDemo(ctx context.Context, kvCacheIndexer *kvcache.Indexer, publish
196204
// Simulate removing some blocks
197205
logger.Info("@@@ Simulating vLLM engine removing some blocks...")
198206

207+
var blockRemovedPayload bytes.Buffer
208+
enc = msgpack.NewEncoder(&blockRemovedPayload)
209+
enc.UseArrayEncodedStructs(true)
210+
199211
//nolint // won't fail
200-
blockRemovedPayloadBytes, _ := msgpack.Marshal(kvevents.BlockRemoved{
201-
BlockHashes: testdata.PromptHashes[:2],
212+
enc.Encode(&kvevents.BlockRemovedEvent{
213+
TypeField: "BlockRemoved",
214+
BlockRemoved: &kvevents.BlockRemoved{BlockHashes: testdata.PromptHashes[2:]},
202215
})
203216

204217
removeEventBatch := kvevents.EventBatch{
205218
TS: float64(time.Now().UnixNano()) / 1e9,
206-
Events: []msgpack.RawMessage{blockRemovedPayloadBytes},
219+
Events: []msgpack.RawMessage{blockRemovedPayload.Bytes()},
207220
DataParallelRank: &dpRank,
208221
}
209222

examples/kv_events/offline/publisher.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package main
1818

1919
import (
20+
"bytes"
2021
"context"
2122
"encoding/binary"
2223
"fmt"
@@ -58,7 +59,11 @@ func NewPublisher(endpoint string) (*Publisher, error) {
5859
func (p *Publisher) PublishEvent(ctx context.Context, topic string, batch interface{}) error {
5960
logger := klog.FromContext(ctx).V(0)
6061

61-
payload, err := msgpack.Marshal(batch)
62+
// Use an encoder configured for struct as array
63+
var payload bytes.Buffer
64+
enc := msgpack.NewEncoder(&payload)
65+
enc.UseArrayEncodedStructs(true)
66+
err := enc.Encode(batch)
6267
if err != nil {
6368
return fmt.Errorf("failed to marshal event batch: %w", err)
6469
}
@@ -69,7 +74,7 @@ func (p *Publisher) PublishEvent(ctx context.Context, topic string, batch interf
6974
binary.BigEndian.PutUint64(seqBytes, seq)
7075

7176
// send topic, sequence, payload
72-
if _, err := p.socket.SendMessage(topic, seqBytes, payload); err != nil {
77+
if _, err := p.socket.SendMessage(topic, seqBytes, payload.Bytes()); err != nil {
7378
return fmt.Errorf("failed to send message to topic %s: %w", topic, err)
7479
}
7580

pkg/kvcache/kvevents/events.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,12 @@ type BlockStored struct {
6363

6464
func (BlockStored) isEvent() {}
6565

66+
type BlockStoredEvent struct {
67+
_ struct{} `msgpack:",array"`
68+
TypeField string
69+
*BlockStored
70+
}
71+
6672
// BlockRemoved event.
6773
type BlockRemoved struct {
6874
_ struct{} `msgpack:",array"`
@@ -71,6 +77,12 @@ type BlockRemoved struct {
7177

7278
func (BlockRemoved) isEvent() {}
7379

80+
type BlockRemovedEvent struct {
81+
_ struct{} `msgpack:",array"`
82+
TypeField string
83+
*BlockRemoved
84+
}
85+
7486
// AllBlocksCleared event.
7587
type AllBlocksCleared struct {
7688
_ struct{} `msgpack:",array"`

pkg/kvcache/kvevents/pool.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -193,13 +193,13 @@ func (p *Pool) processEvent(ctx context.Context, msg *Message) {
193193
var unmarshalErr error
194194
switch tag {
195195
case "BlockStored":
196-
var bs BlockStored
197-
unmarshalErr = msgpack.Unmarshal(payloadBytes, &bs)
198-
event = bs
196+
var bs BlockStoredEvent
197+
unmarshalErr = msgpack.Unmarshal(rawEvent, &bs)
198+
event = bs.BlockStored
199199
case "BlockRemoved":
200-
var br BlockRemoved
201-
unmarshalErr = msgpack.Unmarshal(payloadBytes, &br)
202-
event = br
200+
var br BlockRemovedEvent
201+
unmarshalErr = msgpack.Unmarshal(rawEvent, &br)
202+
event = br.BlockRemoved
203203
case "AllBlocksCleared":
204204
var ac AllBlocksCleared
205205
unmarshalErr = msgpack.Unmarshal(payloadBytes, &ac)

0 commit comments

Comments
 (0)