Skip to content

Commit d5a8ca8

Browse files
authored
- fixed kvevents and made api/example more intuitive (#88)
- dockerfile fix - deployment fix Signed-off-by: Maroon Ayoub <[email protected]>
1 parent 3bcc158 commit d5a8ca8

File tree

5 files changed

+59
-85
lines changed

5 files changed

+59
-85
lines changed

Dockerfile

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,6 @@ RUN dnf install -y 'https://dl.fedoraproject.org/pub/epel/epel-release-latest-8.
2626
dnf install -y gcc-c++ libstdc++ libstdc++-devel clang zeromq-devel pkgconfig && \
2727
dnf clean all
2828

29-
# Install Python 3.9+ for chat template functionality
30-
RUN dnf install -y python39 python39-pip python39-devel && \
31-
dnf clean all
32-
3329
# Copy the Go Modules manifests
3430
COPY go.mod go.mod
3531
COPY go.sum go.sum
@@ -41,9 +37,6 @@ RUN go mod download
4137
COPY examples/kv_events examples/kv_events
4238
COPY . .
4339

44-
# Install Python dependencies for chat template functionality
45-
RUN python3.9 -m pip install -r pkg/preprocessing/chat_completions_template/requirements.txt
46-
4740
# HuggingFace tokenizer bindings
4841
RUN mkdir -p lib
4942
ARG RELEASE_VERSION=v1.22.1
@@ -66,14 +59,9 @@ WORKDIR /
6659
# The final image is UBI9, so we need epel-release-9.
6760
USER root
6861
RUN dnf install -y 'https://dl.fedoraproject.org/pub/epel/epel-release-latest-9.noarch.rpm' && \
69-
dnf install -y zeromq python39 python39-pip
70-
71-
# Install Python dependencies in the final image
72-
COPY pkg/preprocessing/chat_completions_template/requirements.txt /tmp/requirements.txt
73-
RUN python3.9 -m pip install -r /tmp/requirements.txt
62+
dnf install -y zeromq
7463

7564
COPY --from=builder /workspace/bin/kv-cache-manager /app/kv-cache-manager
76-
COPY --from=builder /workspace/pkg/preprocessing/chat_completions_template/chat_template_wrapper.py /app/chat_template_wrapper.py
7765
USER 65532:65532
7866

7967
# Set the entrypoint to the kv-cache-manager binary

examples/kv_events/offline/main.go

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ limitations under the License.
1717
package main
1818

1919
import (
20-
"bytes"
2120
"context"
2221
_ "embed"
2322
"fmt"
@@ -166,22 +165,21 @@ func runEventsDemo(ctx context.Context, kvCacheIndexer *kvcache.Indexer, publish
166165
// Simulate vLLM engine publishing BlockStored events
167166
logger.Info("@@@ Simulating vLLM engine publishing BlockStored events...")
168167

169-
var blockStoredPayload bytes.Buffer
170-
enc := msgpack.NewEncoder(&blockStoredPayload)
171-
enc.UseArrayEncodedStructs(true)
168+
blockStoredEvent := kvevents.BlockStored{
169+
BlockHashes: testdata.PromptHashes,
170+
ParentBlockHash: nil,
171+
TokenIds: []uint32{1, 2, 3},
172+
BlockSize: 256,
173+
LoraID: nil,
174+
}
172175

173176
//nolint // won't fail
174-
enc.Encode(&kvevents.BlockStoredEvent{
175-
TypeField: "BlockStored",
176-
BlockStored: &kvevents.BlockStored{BlockHashes: testdata.PromptHashes},
177-
})
178-
179-
dpRank := 0
177+
blockStoredPayload, _ := msgpack.Marshal(blockStoredEvent.ToTaggedUnion())
180178

181179
eventBatch := kvevents.EventBatch{
182180
TS: float64(time.Now().UnixNano()) / 1e9,
183-
Events: []msgpack.RawMessage{blockStoredPayload.Bytes()},
184-
DataParallelRank: &dpRank,
181+
Events: []msgpack.RawMessage{blockStoredPayload},
182+
DataParallelRank: nil,
185183
}
186184

187185
topic := fmt.Sprintf("kv@vllm-pod1@%s", testdata.ModelName)
@@ -204,20 +202,17 @@ func runEventsDemo(ctx context.Context, kvCacheIndexer *kvcache.Indexer, publish
204202
// Simulate removing some blocks
205203
logger.Info("@@@ Simulating vLLM engine removing some blocks...")
206204

207-
var blockRemovedPayload bytes.Buffer
208-
enc = msgpack.NewEncoder(&blockRemovedPayload)
209-
enc.UseArrayEncodedStructs(true)
205+
blockRemovedEvent := kvevents.BlockRemoved{
206+
BlockHashes: testdata.PromptHashes[2:], // Remove last blocks
207+
}
210208

211209
//nolint // won't fail
212-
enc.Encode(&kvevents.BlockRemovedEvent{
213-
TypeField: "BlockRemoved",
214-
BlockRemoved: &kvevents.BlockRemoved{BlockHashes: testdata.PromptHashes[2:]},
215-
})
210+
blockRemovedPayload, _ := msgpack.Marshal(blockRemovedEvent.ToTaggedUnion())
216211

217212
removeEventBatch := kvevents.EventBatch{
218213
TS: float64(time.Now().UnixNano()) / 1e9,
219-
Events: []msgpack.RawMessage{blockRemovedPayload.Bytes()},
220-
DataParallelRank: &dpRank,
214+
Events: []msgpack.RawMessage{blockRemovedPayload},
215+
DataParallelRank: nil,
221216
}
222217

223218
if err := publisher.PublishEvent(ctx, topic, removeEventBatch); err != nil {

pkg/kvcache/kvevents/events.go

Lines changed: 34 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,22 @@
11
package kvevents
22

33
import (
4-
"fmt"
5-
64
"github.com/vmihailenco/msgpack/v5"
75
)
86

7+
const (
8+
// BlockStoredEventTag is the tag for BlockStored events.
9+
BlockStoredEventTag = "BlockStored"
10+
// BlockRemovedEventTag is the tag for BlockRemoved events.
11+
BlockRemovedEventTag = "BlockRemoved"
12+
// AllBlocksClearedEventTag is the tag for AllBlocksCleared events.
13+
AllBlocksClearedEventTag = "AllBlocksCleared"
14+
)
15+
916
// event is a marker interface for KV-cache events.
1017
type event interface {
1118
isEvent()
19+
ToTaggedUnion() []any
1220
}
1321

1422
// EventBatch represents a batch of events.
@@ -20,37 +28,6 @@ type EventBatch struct {
2028
DataParallelRank *int `msgpack:",omitempty"`
2129
}
2230

23-
// DecodeMsgpack allows 2- or 3-element array-encoded batches.
24-
func (e *EventBatch) DecodeMsgpack(decoder *msgpack.Decoder) error {
25-
length, err := decoder.DecodeArrayLen()
26-
if err != nil {
27-
return err
28-
}
29-
if length < 2 {
30-
return fmt.Errorf("EventBatch: expected at least 2 fields, got %d", length)
31-
}
32-
if e.TS, err = decoder.DecodeFloat64(); err != nil {
33-
return err
34-
}
35-
if err := decoder.Decode(&e.Events); err != nil {
36-
return err
37-
}
38-
if length > 2 {
39-
var rank int
40-
if err := decoder.Decode(&rank); err != nil {
41-
return err
42-
}
43-
e.DataParallelRank = &rank
44-
}
45-
// skip any extra
46-
for i := 3; i < length; i++ {
47-
if err := decoder.Skip(); err != nil {
48-
return err
49-
}
50-
}
51-
return nil
52-
}
53-
5431
// BlockStored event.
5532
type BlockStored struct {
5633
_ struct{} `msgpack:",array"`
@@ -61,31 +38,43 @@ type BlockStored struct {
6138
LoraID *int
6239
}
6340

64-
func (BlockStored) isEvent() {}
65-
66-
type BlockStoredEvent struct {
67-
_ struct{} `msgpack:",array"`
68-
TypeField string
69-
*BlockStored
41+
func (bs BlockStored) ToTaggedUnion() []any {
42+
return []any{
43+
BlockStoredEventTag,
44+
bs.BlockHashes,
45+
bs.ParentBlockHash,
46+
bs.TokenIds,
47+
bs.BlockSize,
48+
bs.LoraID,
49+
}
7050
}
7151

52+
func (BlockStored) isEvent() {}
53+
7254
// BlockRemoved event.
7355
type BlockRemoved struct {
7456
_ struct{} `msgpack:",array"`
7557
BlockHashes []uint64
7658
}
7759

78-
func (BlockRemoved) isEvent() {}
79-
80-
type BlockRemovedEvent struct {
81-
_ struct{} `msgpack:",array"`
82-
TypeField string
83-
*BlockRemoved
60+
func (br BlockRemoved) ToTaggedUnion() []any {
61+
return []any{
62+
BlockRemovedEventTag,
63+
br.BlockHashes,
64+
}
8465
}
8566

67+
func (BlockRemoved) isEvent() {}
68+
8669
// AllBlocksCleared event.
8770
type AllBlocksCleared struct {
8871
_ struct{} `msgpack:",array"`
8972
}
9073

74+
func (ac AllBlocksCleared) ToTaggedUnion() []any {
75+
return []any{
76+
AllBlocksClearedEventTag,
77+
}
78+
}
79+
9180
func (AllBlocksCleared) isEvent() {}

pkg/kvcache/kvevents/pool.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -193,13 +193,13 @@ func (p *Pool) processEvent(ctx context.Context, msg *Message) {
193193
var unmarshalErr error
194194
switch tag {
195195
case "BlockStored":
196-
var bs BlockStoredEvent
197-
unmarshalErr = msgpack.Unmarshal(rawEvent, &bs)
198-
event = bs.BlockStored
196+
var bs BlockStored
197+
unmarshalErr = msgpack.Unmarshal(payloadBytes, &bs)
198+
event = bs
199199
case "BlockRemoved":
200-
var br BlockRemovedEvent
201-
unmarshalErr = msgpack.Unmarshal(rawEvent, &br)
202-
event = br.BlockRemoved
200+
var br BlockRemoved
201+
unmarshalErr = msgpack.Unmarshal(payloadBytes, &br)
202+
event = br
203203
case "AllBlocksCleared":
204204
var ac AllBlocksCleared
205205
unmarshalErr = msgpack.Unmarshal(payloadBytes, &ac)

vllm-setup-helm/templates/deployment.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ spec:
9191
startupProbe:
9292
{{- toYaml .Values.vllm.startupProbe | nindent 12 }}
9393
env:
94+
- name: HOME
95+
value: {{ .Values.persistence.mountPath }}
9496
- name: HF_HOME
9597
value: {{ .Values.persistence.mountPath }}
9698
- name: POD_IP

0 commit comments

Comments
 (0)