From 5992cbe00e13450c658a71ae238032ea8237023f Mon Sep 17 00:00:00 2001 From: irar2 Date: Wed, 29 Oct 2025 12:22:30 +0200 Subject: [PATCH] Made workers' requests channel non-blocking, changed stress test Signed-off-by: irar2 --- pkg/llm-d-inference-sim/simulator.go | 4 +-- pkg/llm-d-inference-sim/worker_test.go | 35 ++++++++++++-------------- 2 files changed, 18 insertions(+), 21 deletions(-) diff --git a/pkg/llm-d-inference-sim/simulator.go b/pkg/llm-d-inference-sim/simulator.go index 89bec6c7..22e6d1dc 100644 --- a/pkg/llm-d-inference-sim/simulator.go +++ b/pkg/llm-d-inference-sim/simulator.go @@ -345,7 +345,7 @@ func (s *VllmSimulator) initializeSim(ctx context.Context) error { ctx: ctx, logger: s.logger, finishedChan: s.workerFinished, - reqChan: make(chan *openaiserverapi.CompletionReqCtx), + reqChan: make(chan *openaiserverapi.CompletionReqCtx, 1), processor: s, } go worker.waitForRequests() @@ -402,8 +402,8 @@ func (s *VllmSimulator) processing(ctx context.Context) { s.logger.Info("Request processing done") return case completedReq := <-s.workerFinished: - s.logger.V(4).Info("Worker finished") worker := completedReq.worker + s.logger.V(4).Info("Worker finished", "worker", worker.id) s.decrementLora(completedReq.model) // there is a free worker - find a request for it and send this request for // processing with this worker diff --git a/pkg/llm-d-inference-sim/worker_test.go b/pkg/llm-d-inference-sim/worker_test.go index ce5ee076..c0de63e3 100644 --- a/pkg/llm-d-inference-sim/worker_test.go +++ b/pkg/llm-d-inference-sim/worker_test.go @@ -344,10 +344,15 @@ var _ = Describe("Simulator requests scheduling", Ordered, func() { option.WithBaseURL(baseURL), option.WithHTTPClient(client)) + var wg sync.WaitGroup + // Run 2000 requests for 2 loras simultaneously numberOfRequests := 2000 + wg.Add(numberOfRequests) + for i := range numberOfRequests { go func() { + defer wg.Done() defer GinkgoRecover() params := openai.ChatCompletionNewParams{ Messages: []openai.ChatCompletionMessageParamUnion{ @@ -385,8 +390,11 @@ var _ = Describe("Simulator requests scheduling", Ordered, func() { // After about 2 secs (the mean ttft), send 500 more requests numberOfRequests = 500 + wg.Add(numberOfRequests) + for i := range numberOfRequests { go func() { + defer wg.Done() defer GinkgoRecover() params := openai.ChatCompletionNewParams{ Messages: []openai.ChatCompletionMessageParamUnion{ @@ -408,15 +416,8 @@ var _ = Describe("Simulator requests scheduling", Ordered, func() { metrics = strings.Split(string(data), "\n") // We sent 2500 requests, after about 2.5 seconds - // number of running requests should be 1000 + // number of running requests should be about 1000 // and the number of waiting requests should be less than 1000. - // Since we are in the middle of requests scheduling, - // the number of running requests can be 999. - runningStr = findMetric(metrics, runningMetric) - Expect(runningStr).NotTo(Equal("")) - running, err = strconv.Atoi(runningStr) - Expect(err).NotTo(HaveOccurred()) - Expect(running).To(Or(Equal(1000), Equal(999))) waitingStr = findMetric(metrics, waitingMetric) waiting, err = strconv.Atoi(waitingStr) Expect(err).NotTo(HaveOccurred()) @@ -431,19 +432,15 @@ var _ = Describe("Simulator requests scheduling", Ordered, func() { Expect(err).NotTo(HaveOccurred()) metrics = strings.Split(string(data), "\n") - // The number of running requests should be 1000 - // and the number of waiting requests should be less than 1000. - // Since we are in the middle of requests scheduling, - // the number of running requests can be 999. - runningStr = findMetric(metrics, runningMetric) - Expect(runningStr).NotTo(Equal("")) - running, err = strconv.Atoi(runningStr) - Expect(err).NotTo(HaveOccurred()) - Expect(running).To(Or(Equal(1000), Equal(999))) + // The number of running requests should be about 1000 + // and the number of waiting requests should be less than the + // previous number of waiting requests. waitingStr = findMetric(metrics, waitingMetric) - waiting, err = strconv.Atoi(waitingStr) + newWaiting, err := strconv.Atoi(waitingStr) Expect(err).NotTo(HaveOccurred()) - Expect(waiting).To(BeNumerically("<", 1000)) + Expect(newWaiting).To(BeNumerically("<=", waiting)) + + wg.Wait() }) }) })