Skip to content

Commit 61c1c29

Browse files
irar2mayabar
andauthored
New requests queue (#214)
* Initial implementation of queues Signed-off-by: Ira <[email protected]> * More tests and cleanup Signed-off-by: irar2 <[email protected]> * Added interface for requests, changed logs Signed-off-by: irar2 <[email protected]> * Channel for new requests, decrement lora inside select, lint Signed-off-by: irar2 <[email protected]> * Added new config value for waiting queue length Signed-off-by: irar2 <[email protected]> * Make allocations after reading configuration. Code refactoring Signed-off-by: irar2 <[email protected]> * Tests Signed-off-by: irar2 <[email protected]> * Lint fix Signed-off-by: irar2 <[email protected]> * Fixed merge typo Signed-off-by: irar2 <[email protected]> * Review comments Signed-off-by: irar2 <[email protected]> * Fixed automerge errors Signed-off-by: irar2 <[email protected]> * Review comments Signed-off-by: irar2 <[email protected]> * Update worker.go to solve conflicts Signed-off-by: Maya Barnea <[email protected]> * Removed temporary directory Signed-off-by: irar2 <[email protected]> * Fixed merge error Signed-off-by: irar2 <[email protected]> --------- Signed-off-by: Ira <[email protected]> Signed-off-by: irar2 <[email protected]> Signed-off-by: Ira Rosen <[email protected]> Signed-off-by: Maya Barnea <[email protected]> Co-authored-by: Maya Barnea <[email protected]>
1 parent 1c3d559 commit 61c1c29

File tree

14 files changed

+1282
-356
lines changed

14 files changed

+1282
-356
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ For more details see the <a href="https://docs.vllm.ai/en/stable/getting_started
9898
- `max-cpu-loras`: maximum number of LoRAs to store in CPU memory, optional, must be >= than max-loras, default is max-loras
9999
- `max-model-len`: model's context window, maximum number of tokens in a single request including input and output, optional, default is 1024
100100
- `max-num-seqs`: maximum number of sequences per iteration (maximum number of inference requests that could be processed at the same time), default is 5
101+
- `max-waiting-queue-length`: maximum length of inference requests waiting queue, default is 1000
101102
- `mode`: the simulator mode, optional, by default `random`
102103
- `echo`: returns the same text that was sent in the request
103104
- `random`: returns a sentence chosen at random from a set of pre-defined sentences

pkg/common/config.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ type Configuration struct {
8080
// MaxNumSeqs is maximum number of sequences per iteration (the maximum
8181
// number of inference requests that could be processed at the same time)
8282
MaxNumSeqs int `yaml:"max-num-seqs" json:"max-num-seqs"`
83+
// MaxWaitingQueueLength defines maximum size of waiting requests queue
84+
MaxWaitingQueueLength int `yaml:"max-waiting-queue-length" json:"max-waiting-queue-length"`
8385
// MaxModelLen is the model's context window, the maximum number of tokens
8486
// in a single request including input and output. Default value is 1024.
8587
MaxModelLen int `yaml:"max-model-len" json:"max-model-len"`
@@ -329,6 +331,7 @@ func newConfig() *Configuration {
329331
Port: vLLMDefaultPort,
330332
MaxLoras: 1,
331333
MaxNumSeqs: 5,
334+
MaxWaitingQueueLength: 1000,
332335
MaxModelLen: 1024,
333336
Mode: ModeRandom,
334337
Seed: time.Now().UnixNano(),
@@ -458,6 +461,10 @@ func (c *Configuration) validate() error {
458461
return errors.New("max num seqs cannot be less than 1")
459462
}
460463

464+
if c.MaxWaitingQueueLength < 1 {
465+
return errors.New("max waiting queue size cannot be less than 1")
466+
}
467+
461468
for _, lora := range c.LoraModules {
462469
if lora.Name == "" {
463470
return errors.New("empty LoRA name")
@@ -637,7 +644,8 @@ func ParseCommandParamsAndLoadConfig() (*Configuration, error) {
637644

638645
f.IntVar(&config.Port, "port", config.Port, "Port")
639646
f.StringVar(&config.Model, "model", config.Model, "Currently 'loaded' model")
640-
f.IntVar(&config.MaxNumSeqs, "max-num-seqs", config.MaxNumSeqs, "Maximum number of inference requests that could be processed at the same time (parameter to simulate requests waiting queue)")
647+
f.IntVar(&config.MaxNumSeqs, "max-num-seqs", config.MaxNumSeqs, "Maximum number of inference requests that could be processed at the same time")
648+
f.IntVar(&config.MaxWaitingQueueLength, "max-waiting-queue-length", config.MaxWaitingQueueLength, "Maximum length of inference requests waiting queue")
641649
f.IntVar(&config.MaxLoras, "max-loras", config.MaxLoras, "Maximum number of LoRAs in a single batch")
642650
f.IntVar(&config.MaxCPULoras, "max-cpu-loras", config.MaxCPULoras, "Maximum number of LoRAs to store in CPU memory")
643651
f.IntVar(&config.MaxModelLen, "max-model-len", config.MaxModelLen, "Model's context window, maximum number of tokens in a single request including input and output")

pkg/common/config_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,16 @@ var _ = Describe("Simulator configuration", func() {
452452
args: []string{"cmd", "--max-num-seqs", "-1",
453453
"--config", "../../manifests/config.yaml"},
454454
},
455+
{
456+
name: "invalid max-waiting-queue-length",
457+
args: []string{"cmd", "--max-waiting-queue-length", "0",
458+
"--config", "../../manifests/config.yaml"},
459+
},
460+
{
461+
name: "invalid max-waiting-queue-length",
462+
args: []string{"cmd", "--max-waiting-queue-length", "-1",
463+
"--config", "../../manifests/config.yaml"},
464+
},
455465
{
456466
name: "invalid time-factor-under-load",
457467
args: []string{"cmd", "--time-factor-under-load", "0",

pkg/llm-d-inference-sim/latencies.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func (s *VllmSimulator) getCurrLoadFactor() float64 {
2323
if s.config.MaxNumSeqs <= 1 {
2424
return 1.0
2525
}
26-
return 1 + (s.config.TimeFactorUnderLoad-1)*float64(s.nRunningReqs-1)/float64(s.config.MaxNumSeqs-1)
26+
return 1 + (s.config.TimeFactorUnderLoad-1)*float64(s.metrics.nRunningReqs-1)/float64(s.config.MaxNumSeqs-1)
2727
}
2828

2929
func (s *VllmSimulator) getTimeToFirstToken() int {

pkg/llm-d-inference-sim/latencies_test.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ var _ = Describe("Check random latencies", Ordered, func() {
4141
KVCacheTransferLatencyStdDev: 2048,
4242
}
4343

44+
simulator.metrics.runReqChan = make(chan int64, 100)
45+
4446
common.InitRandom(time.Now().UnixNano())
4547
})
4648

@@ -245,7 +247,7 @@ var _ = Describe("Check random latencies", Ordered, func() {
245247
simulator.config.TimeToFirstTokenStdDev = 0
246248
simulator.config.TimeFactorUnderLoad = 1.0
247249

248-
simulator.runReqChan <- 100
250+
simulator.metrics.runReqChan <- 100
249251

250252
ttft := simulator.getWaitTimeToFirstToken(128, 0, false)
251253
Expect(ttft).To(Equal(42))
@@ -257,11 +259,11 @@ var _ = Describe("Check random latencies", Ordered, func() {
257259
simulator.config.TimeFactorUnderLoad = 100.0
258260
simulator.config.MaxNumSeqs = 1
259261

260-
for len(simulator.runReqChan) > 0 {
261-
<-simulator.runReqChan
262+
for len(simulator.metrics.runReqChan) > 0 {
263+
<-simulator.metrics.runReqChan
262264
}
263265

264-
simulator.runReqChan <- 1
266+
simulator.metrics.runReqChan <- 1
265267

266268
ttft := simulator.getWaitTimeToFirstToken(128, 0, false)
267269
Expect(ttft).To(Equal(42))
@@ -273,7 +275,7 @@ var _ = Describe("Check random latencies", Ordered, func() {
273275
simulator.config.TimeToFirstTokenStdDev = 0
274276
simulator.config.TimeFactorUnderLoad = timeFactorUnderLoad
275277
simulator.config.MaxNumSeqs = maxNumOfReq
276-
simulator.nRunningReqs = int64(maxNumOfReq)
278+
simulator.metrics.nRunningReqs = int64(maxNumOfReq)
277279

278280
ttft := simulator.getWaitTimeToFirstToken(128, 0, false)
279281
Expect(ttft).To(Equal(int(float64(42) * timeFactorUnderLoad)))
@@ -296,7 +298,7 @@ var _ = Describe("Check random latencies", Ordered, func() {
296298
simulator.config.TimeToFirstTokenStdDev = 0
297299
simulator.config.TimeFactorUnderLoad = timeFactorUnderLoad
298300
simulator.config.MaxNumSeqs = maxNumOfReq
299-
simulator.nRunningReqs = int64(nCurrNumOfReq)
301+
simulator.metrics.nRunningReqs = int64(nCurrNumOfReq)
300302

301303
ttft := simulator.getWaitTimeToFirstToken(128, 0, false)
302304
max := timeFactorUnderLoad * float64(42)
@@ -318,7 +320,7 @@ var _ = Describe("Check random latencies", Ordered, func() {
318320
It("when TimeFactorUnderLoad is 1.0, calcLoadFactor should give 1", func() {
319321
simulator.config.TimeFactorUnderLoad = 1.0
320322
simulator.config.MaxNumSeqs = 11
321-
simulator.nRunningReqs = 3
323+
simulator.metrics.nRunningReqs = 3
322324

323325
factor := simulator.getCurrLoadFactor()
324326
Expect(factor).To(BeNumerically("==", 1.0))
@@ -327,7 +329,7 @@ var _ = Describe("Check random latencies", Ordered, func() {
327329
It("when TimeFactorUnderLoad is > 1.0, and sim is fully loaded, calcLoadFactor should give TimeFactorUnderLoad", func() {
328330
simulator.config.TimeFactorUnderLoad = 2.0
329331
simulator.config.MaxNumSeqs = 11
330-
simulator.nRunningReqs = 11
332+
simulator.metrics.nRunningReqs = 11
331333

332334
factor := simulator.getCurrLoadFactor()
333335
Expect(factor).To(BeNumerically("==", simulator.config.TimeFactorUnderLoad))
@@ -337,7 +339,7 @@ var _ = Describe("Check random latencies", Ordered, func() {
337339
It("when TimeFactorUnderLoad is > 1.0, and sim is partially loaded, calcLoadFactor should give a value between 1 and TimeFactorUnderLoad", func() {
338340
simulator.config.TimeFactorUnderLoad = 2.0
339341
simulator.config.MaxNumSeqs = 11
340-
simulator.nRunningReqs = 6
342+
simulator.metrics.nRunningReqs = 6
341343

342344
factor := simulator.getCurrLoadFactor()
343345
Expect(factor).To(BeNumerically(">", 1.0))

pkg/llm-d-inference-sim/lora.go

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func (s *VllmSimulator) getLoras() []string {
4747
return loras
4848
}
4949

50-
func (s *VllmSimulator) loadLora(ctx *fasthttp.RequestCtx) {
50+
func (s *VllmSimulator) loadLoraAdaptor(ctx *fasthttp.RequestCtx) {
5151
var req loadLoraRequest
5252
err := json.Unmarshal(ctx.Request.Body(), &req)
5353
if err != nil {
@@ -59,7 +59,7 @@ func (s *VllmSimulator) loadLora(ctx *fasthttp.RequestCtx) {
5959
s.loraAdaptors.Store(req.LoraName, "")
6060
}
6161

62-
func (s *VllmSimulator) unloadLora(ctx *fasthttp.RequestCtx) {
62+
func (s *VllmSimulator) unloadLoraAdaptor(ctx *fasthttp.RequestCtx) {
6363
var req unloadLoraRequest
6464
err := json.Unmarshal(ctx.Request.Body(), &req)
6565
if err != nil {
@@ -70,3 +70,75 @@ func (s *VllmSimulator) unloadLora(ctx *fasthttp.RequestCtx) {
7070

7171
s.loraAdaptors.Delete(req.LoraName)
7272
}
73+
74+
// Checks if the LoRA adaptor is loaded
75+
func (s *VllmSimulator) loraIsLoaded(model string) bool {
76+
if !s.isLora(model) {
77+
return true
78+
}
79+
80+
s.loras.mux.RLock()
81+
defer s.loras.mux.RUnlock()
82+
83+
_, ok := s.loras.loadedLoras[model]
84+
return ok
85+
}
86+
87+
// Load the LoRA adaptor if possible. Return false if not.
88+
func (s *VllmSimulator) loadLora(model string) bool {
89+
if !s.isLora(model) {
90+
return true
91+
}
92+
93+
s.loras.mux.Lock()
94+
defer s.loras.mux.Unlock()
95+
96+
// check if this LoRA is already loaded or within maxLoras slots
97+
_, ok := s.loras.loadedLoras[model]
98+
ok = ok || len(s.loras.loadedLoras) < s.loras.maxLoras
99+
if !ok {
100+
// if this LoRA is not loaded, and the number of loaded LoRAs reached
101+
// maxLoras, try to find a LoRA that is not in use, and unload it
102+
for lora, count := range s.loras.loadedLoras {
103+
if count == 0 {
104+
delete(s.loras.loadedLoras, lora)
105+
ok = true
106+
break
107+
}
108+
}
109+
}
110+
if ok {
111+
s.loras.loadedLoras[model]++
112+
}
113+
return ok
114+
}
115+
116+
// incrementLora increments the count of running requests using the model
117+
// (if the model is a LoRA). Can be called only for loaded LoRAs (that are
118+
// already in loras.loadedLoras)
119+
func (s *VllmSimulator) incrementLora(model string) {
120+
if !s.isLora(model) {
121+
return
122+
}
123+
124+
s.loras.mux.Lock()
125+
defer s.loras.mux.Unlock()
126+
s.loras.loadedLoras[model]++
127+
}
128+
129+
// decrementLora decrements the count of running requests using the model
130+
// (if the model is a LoRA)
131+
func (s *VllmSimulator) decrementLora(model string) {
132+
if model == "" || !s.isLora(model) {
133+
return
134+
}
135+
136+
s.loras.mux.Lock()
137+
defer s.loras.mux.Unlock()
138+
139+
s.loras.loadedLoras[model]--
140+
if s.loras.loadedLoras[model] <= 0 {
141+
// last usage of this LoRA
142+
s.loras.loraRemovable <- 1
143+
}
144+
}

0 commit comments

Comments
 (0)