Skip to content

Commit f99a38b

Browse files
irar2smarunich
authored andcommitted
Publish kv-cache events (#126)
* Publish kv-cache events Signed-off-by: Ira <[email protected]> * Fix lint errors Signed-off-by: Ira <[email protected]> * Review fixes Signed-off-by: Ira <[email protected]> * Sleep to allow prevous sub to close Signed-off-by: Ira <[email protected]> --------- Signed-off-by: Ira <[email protected]> Signed-off-by: Sergey Marunich <[email protected]>
1 parent 9ab5879 commit f99a38b

File tree

12 files changed

+614
-389
lines changed

12 files changed

+614
-389
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ $(TOKENIZER_LIB):
4646
## Download the HuggingFace tokenizer bindings.
4747
@echo "Downloading HuggingFace tokenizer bindings..."
4848
mkdir -p lib
49-
curl -L https://github.com/daulet/tokenizers/releases/download/v1.20.2/libtokenizers.$(TARGETOS)-$(TARGETARCH).tar.gz | tar -xz -C lib
49+
curl -L https://github.com/daulet/tokenizers/releases/download/v1.22.1/libtokenizers.$(TARGETOS)-$(TARGETARCH).tar.gz | tar -xz -C lib
5050
ranlib lib/*.a
5151

5252
##@ Development

go.mod

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ require (
88
github.com/buaazp/fasthttprouter v0.1.1
99
github.com/go-logr/logr v1.4.2
1010
github.com/google/uuid v1.6.0
11-
github.com/llm-d/llm-d-kv-cache-manager v0.2.0
11+
github.com/llm-d/llm-d-kv-cache-manager v0.2.2-0.20250810103202-0adf0940f60a
1212
github.com/onsi/ginkgo/v2 v2.23.4
1313
github.com/onsi/gomega v1.37.0
1414
github.com/openai/openai-go v0.1.0-beta.10
@@ -17,7 +17,6 @@ require (
1717
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1
1818
github.com/spf13/pflag v1.0.6
1919
github.com/valyala/fasthttp v1.59.0
20-
github.com/vmihailenco/msgpack v4.0.4+incompatible
2120
github.com/vmihailenco/msgpack/v5 v5.4.1
2221
gopkg.in/yaml.v3 v3.0.1
2322
k8s.io/klog/v2 v2.130.1
@@ -27,7 +26,7 @@ require (
2726
github.com/andybalholm/brotli v1.1.1 // indirect
2827
github.com/beorn7/perks v1.0.1 // indirect
2928
github.com/cespare/xxhash/v2 v2.3.0 // indirect
30-
github.com/daulet/tokenizers v1.20.2 // indirect
29+
github.com/daulet/tokenizers v1.22.1 // indirect
3130
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
3231
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
3332
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
@@ -37,7 +36,6 @@ require (
3736
github.com/go-openapi/swag v0.23.0 // indirect
3837
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
3938
github.com/gogo/protobuf v1.3.2 // indirect
40-
github.com/golang/protobuf v1.5.2 // indirect
4139
github.com/google/gnostic-models v0.6.9 // indirect
4240
github.com/google/go-cmp v0.7.0 // indirect
4341
github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 // indirect
@@ -69,7 +67,6 @@ require (
6967
golang.org/x/text v0.23.0 // indirect
7068
golang.org/x/time v0.9.0 // indirect
7169
golang.org/x/tools v0.31.0 // indirect
72-
google.golang.org/appengine v1.6.8 // indirect
7370
google.golang.org/protobuf v1.36.5 // indirect
7471
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
7572
gopkg.in/inf.v0 v0.9.1 // indirect

go.sum

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL
1313
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
1414
github.com/daulet/tokenizers v1.20.2 h1:tlq/vIOiBTKDPets3596aFvmJYLn3XI6LFKq4q9LKhQ=
1515
github.com/daulet/tokenizers v1.20.2/go.mod h1:tGnMdZthXdcWY6DGD07IygpwJqiPvG85FQUnhs/wSCs=
16+
github.com/daulet/tokenizers v1.22.1 h1:3wzAFIxfgRuqGKka8xdkeTbctDmmqOOs12GofqdorpM=
17+
github.com/daulet/tokenizers v1.22.1/go.mod h1:tGnMdZthXdcWY6DGD07IygpwJqiPvG85FQUnhs/wSCs=
1618
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
1719
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
1820
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
@@ -37,12 +39,8 @@ github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1v
3739
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
3840
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
3941
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
40-
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
41-
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
42-
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
4342
github.com/google/gnostic-models v0.6.9 h1:MU/8wDLif2qCXZmzncUQ/BOfxWfthHi63KqpoNbWqVw=
4443
github.com/google/gnostic-models v0.6.9/go.mod h1:CiWsm0s6BSQd1hRn8/QmxqB6BesYcbSZxsz9b0KuDBw=
45-
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
4644
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
4745
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
4846
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
@@ -72,6 +70,8 @@ github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0
7270
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
7371
github.com/llm-d/llm-d-kv-cache-manager v0.2.0 h1:7MXFPjy3P8nZ7HbB1LWhhVLHvNTLbZglkD/ZcT7UU1k=
7472
github.com/llm-d/llm-d-kv-cache-manager v0.2.0/go.mod h1:ZTqwsnIVC6R5YuTUrYofPIUnCeZ9RvXn1UQAdxLYl1Y=
73+
github.com/llm-d/llm-d-kv-cache-manager v0.2.2-0.20250810103202-0adf0940f60a h1:PXR37HLgYYfolzWQA2uQOEiJlj3IV9YSvgaEFqCRSa8=
74+
github.com/llm-d/llm-d-kv-cache-manager v0.2.2-0.20250810103202-0adf0940f60a/go.mod h1:g2UlYKNJ4S860SAQ/QoRnytAFfnp8f1luW4IuZSMwCE=
7575
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
7676
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
7777
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
@@ -137,8 +137,6 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw
137137
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
138138
github.com/valyala/fasthttp v1.59.0 h1:Qu0qYHfXvPk1mSLNqcFtEk6DpxgA26hy6bmydotDpRI=
139139
github.com/valyala/fasthttp v1.59.0/go.mod h1:GTxNb9Bc6r2a9D0TWNSPwDz78UxnTGBViY3xZNEqyYU=
140-
github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI=
141-
github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk=
142140
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
143141
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
144142
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
@@ -149,49 +147,35 @@ github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZ
149147
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
150148
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
151149
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
152-
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
153150
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
154151
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
155152
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
156153
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
157154
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
158155
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
159156
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
160-
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
161157
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
162158
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
163-
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
164159
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
165160
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
166161
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
167162
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
168-
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
169-
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
170163
golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
171164
golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
172165
golang.org/x/oauth2 v0.27.0 h1:da9Vo7/tDv5RH/7nZDz1eMGS/q1Vv1N/7FCrBhI9I3M=
173166
golang.org/x/oauth2 v0.27.0/go.mod h1:onh5ek6nERTohokkhCD/y2cV4Do3fxFHFuAejCkRWT8=
174167
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
175168
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
176169
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
177-
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
178170
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
179171
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
180172
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
181-
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
182-
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
183-
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
184-
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
185173
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
186174
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
187-
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
188-
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
189175
golang.org/x/term v0.30.0 h1:PQ39fJZ+mfadBm0y5WlL4vlM7Sx1Hgf13sMIY2+QS9Y=
190176
golang.org/x/term v0.30.0/go.mod h1:NYYFdzHoI5wRh/h5tDMdMqCqPJZEuNqVR5xJLd/n67g=
191177
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
192178
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
193-
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
194-
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
195179
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
196180
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
197181
golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY=
@@ -200,17 +184,12 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm
200184
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
201185
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
202186
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
203-
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
204187
golang.org/x/tools v0.31.0 h1:0EedkvKDbh+qistFTd0Bcwe/YLh4vHwWEkiI0toFIBU=
205188
golang.org/x/tools v0.31.0/go.mod h1:naFTU+Cev749tSJRXJlna0T3WxKvb1kWEx15xA4SdmQ=
206189
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
207190
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
208191
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
209192
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
210-
google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM=
211-
google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds=
212-
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
213-
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
214193
google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
215194
google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
216195
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

pkg/common/config.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,8 @@ type Configuration struct {
125125

126126
// ZMQEndpoint is the ZMQ address to publish events, the default value is tcp://localhost:5557
127127
ZMQEndpoint string `yaml:"zmq-endpoint"`
128+
// EventBatchSize is the maximum number of kv-cache events to be sent together, defaults to 16
129+
EventBatchSize int `yaml:"event-batch-size"`
128130
}
129131

130132
type LoraModule struct {
@@ -183,6 +185,7 @@ func newConfig() *Configuration {
183185
KVCacheSize: 1024,
184186
TokenBlockSize: 16,
185187
ZMQEndpoint: "tcp://localhost:5557",
188+
EventBatchSize: 16,
186189
}
187190
}
188191

@@ -293,6 +296,9 @@ func (c *Configuration) validate() error {
293296
if c.KVCacheSize < 0 {
294297
return errors.New("KV cache size cannot be negative")
295298
}
299+
if c.EventBatchSize < 1 {
300+
return errors.New("event batch size cannot less than 1")
301+
}
296302
return nil
297303
}
298304

@@ -344,6 +350,7 @@ func ParseCommandParamsAndLoadConfig() (*Configuration, error) {
344350
f.StringVar(&config.TokenizersCacheDir, "tokenizers-cache-dir", config.TokenizersCacheDir, "Directory for caching tokenizers")
345351
f.StringVar(&config.HashSeed, "hash-seed", config.HashSeed, "Seed for hash generation (if not set, is read from PYTHONHASHSEED environment variable)")
346352
f.StringVar(&config.ZMQEndpoint, "zmq-endpoint", config.ZMQEndpoint, "ZMQ address to publish events")
353+
f.IntVar(&config.EventBatchSize, "event-batch-size", config.EventBatchSize, "Maximum number of kv-cache events to be sent together")
347354

348355
// These values were manually parsed above in getParamValueFromArgs, we leave this in order to get these flags in --help
349356
var dummyString string

pkg/common/config_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,11 +103,13 @@ var _ = Describe("Simulator configuration", func() {
103103
"{\"name\":\"lora3\",\"path\":\"/path/to/lora3\"}",
104104
"{\"name\":\"lora4\",\"path\":\"/path/to/lora4\"}",
105105
}
106+
c.EventBatchSize = 5
106107
test = testCase{
107108
name: "config file with command line args",
108109
args: []string{"cmd", "--model", model, "--config", "../../manifests/config.yaml", "--port", "8002",
109110
"--served-model-name", "alias1", "alias2", "--seed", "100",
110111
"--lora-modules", "{\"name\":\"lora3\",\"path\":\"/path/to/lora3\"}", "{\"name\":\"lora4\",\"path\":\"/path/to/lora4\"}",
112+
"--event-batch-size", "5",
111113
},
112114
expectedConfig: c,
113115
}
@@ -291,6 +293,11 @@ var _ = Describe("Simulator configuration", func() {
291293
args: []string{"cmd", "--block-size", "35",
292294
"--config", "../../manifests/config.yaml"},
293295
},
296+
{
297+
name: "invalid (negative) event-batch-size",
298+
args: []string{"cmd", "--event-batch-size", "-35",
299+
"--config", "../../manifests/config.yaml"},
300+
},
294301
}
295302

296303
for _, test := range invalidTests {

pkg/common/publisher.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,15 @@ limitations under the License.
1717
package common
1818

1919
import (
20+
"bytes"
2021
"context"
2122
"encoding/binary"
2223
"errors"
2324
"fmt"
2425
"sync/atomic"
2526

2627
zmq "github.com/pebbe/zmq4"
27-
"github.com/vmihailenco/msgpack"
28+
"github.com/vmihailenco/msgpack/v5"
2829
"k8s.io/klog/v2"
2930
)
3031

@@ -62,7 +63,11 @@ func NewPublisher(endpoint string) (*Publisher, error) {
6263
func (p *Publisher) PublishEvent(ctx context.Context, topic string, batch interface{}) error {
6364
logger := klog.FromContext(ctx).V(0)
6465

65-
payload, err := msgpack.Marshal(batch)
66+
// Use an encoder configured for struct as array
67+
var payload bytes.Buffer
68+
enc := msgpack.NewEncoder(&payload)
69+
enc.UseArrayEncodedStructs(true)
70+
err := enc.Encode(batch)
6671
if err != nil {
6772
return fmt.Errorf("failed to marshal event batch: %w", err)
6873
}
@@ -73,7 +78,7 @@ func (p *Publisher) PublishEvent(ctx context.Context, topic string, batch interf
7378
binary.BigEndian.PutUint64(seqBytes, seq)
7479

7580
// send topic, sequence, payload
76-
if _, err := p.socket.SendMessage(topic, seqBytes, payload); err != nil {
81+
if _, err := p.socket.SendMessage(topic, seqBytes, payload.Bytes()); err != nil {
7782
return fmt.Errorf("failed to send message to topic %s: %w", topic, err)
7883
}
7984

pkg/common/publisher_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525
. "github.com/onsi/ginkgo/v2"
2626
. "github.com/onsi/gomega"
2727
zmq "github.com/pebbe/zmq4"
28-
"github.com/vmihailenco/msgpack"
28+
"github.com/vmihailenco/msgpack/v5"
2929
)
3030

3131
const (
@@ -44,6 +44,8 @@ var _ = Describe("Publisher", func() {
4444
Expect(err).NotTo(HaveOccurred())
4545
err = sub.SetSubscribe(topic)
4646
Expect(err).NotTo(HaveOccurred())
47+
//nolint
48+
defer sub.Close()
4749

4850
time.Sleep(100 * time.Millisecond)
4951

pkg/kv-cache/block_cache.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@ import (
2323
"time"
2424

2525
"github.com/go-logr/logr"
26+
"github.com/llm-d/llm-d-inference-sim/pkg/common"
2627
)
2728

2829
const (
2930
capacityError = "the kv cache does not have sufficient capacity to store this request"
30-
batchSize = 3
3131
delay = time.Second
3232
)
3333

@@ -44,20 +44,24 @@ type blockCache struct {
4444
}
4545

4646
// newBlockCache creates a new blockCache with the specified maximum number of blocks
47-
func newBlockCache(maxBlocks int, logger logr.Logger) *blockCache {
47+
func newBlockCache(config *common.Configuration, logger logr.Logger) (*blockCache, error) {
4848
// TODO read size of channel from config
4949
eChan := make(chan EventData, 10000)
5050

51+
publisher, err := common.NewPublisher(config.ZMQEndpoint)
52+
if err != nil {
53+
return nil, err
54+
}
55+
5156
return &blockCache{
5257
requestToBlocks: make(map[string][]uint64),
5358
usedBlocks: make(map[uint64]int),
5459
unusedBlocks: make(map[uint64]time.Time),
55-
maxBlocks: maxBlocks,
60+
maxBlocks: config.KVCacheSize,
5661
eventChan: eChan,
57-
// TODO - create topic name from pod ip + model name
58-
eventSender: NewKVEventSender(&Publisher{}, "topic1", eChan, batchSize, delay, logger),
59-
logger: logger,
60-
}
62+
eventSender: NewKVEventSender(publisher, createTopic(config), eChan, config.EventBatchSize, delay, logger),
63+
logger: logger,
64+
}, nil
6165
}
6266

6367
func (b *blockCache) start(ctx context.Context) {
@@ -128,7 +132,7 @@ func (bc *blockCache) startRequest(requestID string, blocks []uint64) error {
128132
}
129133

130134
delete(bc.unusedBlocks, oldestUnusedHash)
131-
bc.eventChan <- EventData{action: eventActionRemove, hashValues: []uint64{block}}
135+
bc.eventChan <- EventData{action: eventActionRemove, hashValues: []uint64{oldestUnusedHash}}
132136
}
133137

134138
// Add the new block
@@ -214,3 +218,7 @@ func (bc *blockCache) getBlockInfo(blockHash uint64) (int, bool) {
214218

215219
return 0, false
216220
}
221+
222+
func createTopic(config *common.Configuration) string {
223+
return fmt.Sprintf("kv@$localhost:%d@%s", config.Port, config.Model)
224+
}

0 commit comments

Comments
 (0)