diff --git a/go.mod b/go.mod index 3ed7eebe..b56c7987 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/buaazp/fasthttprouter v0.1.1 github.com/go-logr/logr v1.4.2 github.com/google/uuid v1.6.0 - github.com/llm-d/llm-d-kv-cache-manager v0.2.1 + github.com/llm-d/llm-d-kv-cache-manager v0.3.0-rc1 github.com/onsi/ginkgo/v2 v2.23.4 github.com/onsi/gomega v1.37.0 github.com/openai/openai-go v0.1.0-beta.10 diff --git a/go.sum b/go.sum index 7878bc43..4db4fd05 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/alicebob/miniredis/v2 v2.35.0 h1:QwLphYqCEAo1eu1TqPRN2jgVMPBweeQcR21jeqDCONI= +github.com/alicebob/miniredis/v2 v2.35.0/go.mod h1:TcL7YfarKPGDAthEtl5NBeHZfeUQj6OXMm/+iu5cLMM= github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -66,8 +68,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= -github.com/llm-d/llm-d-kv-cache-manager v0.2.1 h1:PKIjJPUF9ILLFBNvZRa0QQ/liTQjBKwWChzcenEdM08= -github.com/llm-d/llm-d-kv-cache-manager v0.2.1/go.mod h1:s1xaE4ImkihWaLg2IQh4VN6L1PgN5RD1u1VarPey6dw= +github.com/llm-d/llm-d-kv-cache-manager v0.3.0-rc1 h1:SDLiNrcreDcA9m9wfXAumFARDHHXpjOjHTzshTiTGxk= +github.com/llm-d/llm-d-kv-cache-manager v0.3.0-rc1/go.mod h1:tN80/D0Faf6pE2ocwFgTNoCxKPsqdsa2XnjQUqOaZ8Q= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -143,6 +145,8 @@ github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZ github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= +github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= diff --git a/pkg/kv-cache/kv_cache.go b/pkg/kv-cache/kv_cache.go index ebb32a83..bb676ed9 100644 --- a/pkg/kv-cache/kv_cache.go +++ b/pkg/kv-cache/kv_cache.go @@ -93,10 +93,7 @@ func (h *KVCacheHelper) OnRequestStart(vllmReq openaiserverapi.CompletionRequest } nBlocksAlreadyInCache, err := h.blockCache.startRequest(requestID, blockHashes) - if err == nil { - vllmReq.SetNumberOfCachedPromptTokens(nBlocksAlreadyInCache * h.blockSize) - } - + vllmReq.SetNumberOfCachedPromptTokens(nBlocksAlreadyInCache * h.blockSize) return err } diff --git a/pkg/kv-cache/kv_cache_sender.go b/pkg/kv-cache/kv_cache_sender.go index 1c0c970a..b0ddac3f 100644 --- a/pkg/kv-cache/kv_cache_sender.go +++ b/pkg/kv-cache/kv_cache_sender.go @@ -33,11 +33,6 @@ const ( eventActionRemove ) -const ( - BlockStored = "BlockStored" - BlockRemoved = "BlockRemoved" -) - type EventData struct { action EventAction hashValues []uint64 @@ -98,9 +93,9 @@ func (s *KVEventSender) Run(ctx context.Context) error { switch eventData.action { case eventActionStore: - payload, err = msgpack.Marshal(storedToTaggedUnion(kvevents.BlockStored{BlockHashes: eventData.hashValues})) + payload, err = msgpack.Marshal(kvevents.BlockStored{BlockHashes: eventData.hashValues}.ToTaggedUnion()) case eventActionRemove: - payload, err = msgpack.Marshal(removedToTaggedUnion(kvevents.BlockRemoved{BlockHashes: eventData.hashValues})) + payload, err = msgpack.Marshal(kvevents.BlockRemoved{BlockHashes: eventData.hashValues}.ToTaggedUnion()) default: return fmt.Errorf("invalid event action %d", eventData.action) } @@ -135,24 +130,6 @@ func (s *KVEventSender) Run(ctx context.Context) error { } } -func storedToTaggedUnion(bs kvevents.BlockStored) []any { - return []any{ - BlockStored, - bs.BlockHashes, - bs.ParentBlockHash, - bs.TokenIds, - bs.BlockSize, - bs.LoraID, - } -} - -func removedToTaggedUnion(br kvevents.BlockRemoved) []any { - return []any{ - BlockRemoved, - br.BlockHashes, - } -} - // helper to publish collected batch if not empty func (s *KVEventSender) publishHelper(ctx context.Context) error { if len(s.batch) == 0 { diff --git a/pkg/kv-cache/kv_cache_test.go b/pkg/kv-cache/kv_cache_test.go index 1d1d1c82..f7e09e00 100644 --- a/pkg/kv-cache/kv_cache_test.go +++ b/pkg/kv-cache/kv_cache_test.go @@ -513,11 +513,11 @@ func parseEvent(parts [][]byte, expectedTopic string, expectedSeq uint64) ([]uin Expect(err).NotTo(HaveOccurred()) switch tag { - case BlockStored: + case kvevents.BlockStoredEventTag: var bs kvevents.BlockStored err = msgpack.Unmarshal(payloadBytes, &bs) stored = append(stored, bs.BlockHashes...) - case BlockRemoved: + case kvevents.BlockRemovedEventTag: var br kvevents.BlockRemoved err = msgpack.Unmarshal(payloadBytes, &br) removed = append(removed, br.BlockHashes...)