From 0b13a2eb9aced134ac5b0a274ed6cf5e6c963d5a Mon Sep 17 00:00:00 2001 From: irar2 Date: Thu, 23 Oct 2025 10:40:25 +0300 Subject: [PATCH 1/2] Made writing to channels non-blocking Signed-off-by: irar2 --- pkg/common/utils.go | 9 +++++++++ pkg/kv-cache/block_cache.go | 20 +++++++++++++------- pkg/llm-d-inference-sim/lora.go | 3 ++- pkg/llm-d-inference-sim/simulator.go | 23 +++++++++++++---------- pkg/llm-d-inference-sim/streaming.go | 4 ++-- pkg/llm-d-inference-sim/worker.go | 23 +++++++++++------------ 6 files changed, 50 insertions(+), 32 deletions(-) diff --git a/pkg/common/utils.go b/pkg/common/utils.go index d22fcd7c..422c5e53 100644 --- a/pkg/common/utils.go +++ b/pkg/common/utils.go @@ -21,6 +21,7 @@ import ( "regexp" "sync" + "github.com/go-logr/logr" "github.com/google/uuid" ) @@ -127,3 +128,11 @@ func init() { func Tokenize(text string) []string { return re.FindAllString(text, -1) } + +func WriteToChannel[T any](channel chan T, object T, logger logr.Logger, channelName string) { + select { + case channel <- object: + default: + logger.V(1).Info("failed to write to", "channel", channelName, "dropping value") + } +} diff --git a/pkg/kv-cache/block_cache.go b/pkg/kv-cache/block_cache.go index 79d21e50..aa01ac42 100644 --- a/pkg/kv-cache/block_cache.go +++ b/pkg/kv-cache/block_cache.go @@ -70,10 +70,10 @@ func newBlockCache(config *common.Configuration, logger logr.Logger, usageChan c }, nil } -func (b *blockCache) start(ctx context.Context) { - err := b.eventSender.Run(ctx) +func (bc *blockCache) start(ctx context.Context) { + err := bc.eventSender.Run(ctx) if err != nil { - b.logger.Info("sender stopped with error", "error", err) + bc.logger.Info("sender stopped with error", "error", err) } } @@ -139,12 +139,16 @@ func (bc *blockCache) startRequest(requestID string, blocks []uint64) (int, erro } delete(bc.unusedBlocks, oldestUnusedHash) - bc.eventChan <- EventData{action: eventActionRemove, hashValues: []uint64{oldestUnusedHash}} + common.WriteToChannel(bc.eventChan, + EventData{action: eventActionRemove, hashValues: []uint64{oldestUnusedHash}}, + bc.logger, "block cache eventChan") } // Add the new block bc.usedBlocks[block] = 1 - bc.eventChan <- EventData{action: eventActionStore, hashValues: []uint64{block}} + common.WriteToChannel(bc.eventChan, + EventData{action: eventActionStore, hashValues: []uint64{block}}, + bc.logger, "block cache eventChan") } // store the request mapping @@ -152,7 +156,8 @@ func (bc *blockCache) startRequest(requestID string, blocks []uint64) (int, erro copy(bc.requestToBlocks[requestID], blocks) if bc.usageChan != nil { - bc.usageChan <- float64(len(bc.usedBlocks)) / float64(bc.maxBlocks) + common.WriteToChannel(bc.usageChan, float64(len(bc.usedBlocks))/float64(bc.maxBlocks), + bc.logger, "block cache usageChan") } return len(blockAreadyInUse) + len(blockToMoveToUsed), nil } @@ -188,7 +193,8 @@ func (bc *blockCache) finishRequest(requestID string) error { } if bc.usageChan != nil { - bc.usageChan <- float64(len(bc.usedBlocks)) / float64(bc.maxBlocks) + common.WriteToChannel(bc.usageChan, float64(len(bc.usedBlocks))/float64(bc.maxBlocks), + bc.logger, "block cache usageChan") } // Remove the request mapping diff --git a/pkg/llm-d-inference-sim/lora.go b/pkg/llm-d-inference-sim/lora.go index e5df5dc4..608bce93 100644 --- a/pkg/llm-d-inference-sim/lora.go +++ b/pkg/llm-d-inference-sim/lora.go @@ -20,6 +20,7 @@ package llmdinferencesim import ( "encoding/json" + "github.com/llm-d/llm-d-inference-sim/pkg/common" "github.com/valyala/fasthttp" ) @@ -139,6 +140,6 @@ func (s *VllmSimulator) decrementLora(model string) { s.loras.loadedLoras[model]-- if s.loras.loadedLoras[model] <= 0 { // last usage of this LoRA - s.loras.loraRemovable <- 1 + common.WriteToChannel(s.loras.loraRemovable, 1, s.logger, "loraRemovable") } } diff --git a/pkg/llm-d-inference-sim/simulator.go b/pkg/llm-d-inference-sim/simulator.go index 0ed2b987..76405bb8 100644 --- a/pkg/llm-d-inference-sim/simulator.go +++ b/pkg/llm-d-inference-sim/simulator.go @@ -420,7 +420,7 @@ func (s *VllmSimulator) processing(ctx context.Context) { s.logger.V(4).Info("Sending the request to the processing channel", "model", model, "req id", reqCtx.CompletionReq.GetRequestID(), "worker", worker.id) - worker.reqChan <- reqCtx + common.WriteToChannel(worker.reqChan, reqCtx, s.logger, "worker's reqChan") } } } @@ -431,9 +431,9 @@ func (s *VllmSimulator) findRequestAndSendToProcess(worker *worker) bool { // send this request for processing in this worker s.logger.V(4).Info("Sending request to processing", "model", nextReq.CompletionReq.GetModel(), "req", nextReq.CompletionReq.GetRequestID(), "worker", worker.id) - worker.reqChan <- nextReq + common.WriteToChannel(worker.reqChan, nextReq, s.logger, "worker's reqChan") // decrement waiting requests metric - s.metrics.waitingReqChan <- -1 + common.WriteToChannel(s.metrics.waitingReqChan, -1, s.logger, "metrics.waitingReqChan") return true } @@ -450,9 +450,11 @@ func (s *VllmSimulator) addRequestToQueue(reqCtx *openaiserverapi.CompletionReqC return } // increment the waiting requests metric - s.metrics.waitingReqChan <- 1 + common.WriteToChannel(s.metrics.waitingReqChan, 1, s.logger, "metrics.waitingReqChan") // update loraInfo metrics with the new waiting request - s.metrics.lorasChan <- loraUsage{reqCtx.CompletionReq.GetModel(), waitingUsageState} + common.WriteToChannel(s.metrics.lorasChan, loraUsage{reqCtx.CompletionReq.GetModel(), waitingUsageState}, + s.logger, "metrics.lorasChan") + } // handleCompletions general completion requests handler, support both text and chat completion APIs @@ -487,18 +489,19 @@ func (s *VllmSimulator) handleCompletions(ctx *fasthttp.RequestCtx, isChatComple IsChatCompletion: isChatCompletion, Wg: &wg, } - s.newRequests <- reqCtx + common.WriteToChannel(s.newRequests, reqCtx, s.logger, "newRequests") wg.Wait() } // request processing finished func (s *VllmSimulator) responseSentCallback(model string, isChatCompletion bool, requestID string) { // decrement running requests count - s.metrics.runReqChan <- -1 + common.WriteToChannel(s.metrics.runReqChan, -1, s.logger, "metrics.runReqChan") if s.isLora(model) { // update loraInfo metrics to reflect that the request processing has been finished - s.metrics.lorasChan <- loraUsage{model, doneUsageState} + common.WriteToChannel(s.metrics.lorasChan, loraUsage{model, doneUsageState}, + s.logger, "metrics.lorasChan") } if s.config.EnableKVCache && !isChatCompletion { @@ -580,14 +583,14 @@ func (s *VllmSimulator) sendResponse(reqCtx *openaiserverapi.CompletionReqCtx, r time.Sleep(time.Duration(ttft) * time.Millisecond) // report ttft in seconds - s.metrics.ttftChan <- (float64(ttft) / 1000) + common.WriteToChannel(s.metrics.ttftChan, (float64(ttft) / 1000), s.logger, "metrics.ttftChan") for range usageData.CompletionTokens - 1 { perTokenLatency := s.getInterTokenLatency() time.Sleep(time.Duration(perTokenLatency) * time.Millisecond) // report tpot in seconds - s.metrics.tpotChan <- float64(perTokenLatency) / 1000 + common.WriteToChannel(s.metrics.tpotChan, (float64(perTokenLatency) / 1000), s.logger, "metrics.tpotChan") } s.sendCompletionResponse(reqCtx.HTTPReqCtx, resp) diff --git a/pkg/llm-d-inference-sim/streaming.go b/pkg/llm-d-inference-sim/streaming.go index ee02b4b1..fd2b6720 100644 --- a/pkg/llm-d-inference-sim/streaming.go +++ b/pkg/llm-d-inference-sim/streaming.go @@ -104,14 +104,14 @@ func (s *VllmSimulator) sendTokenChunks(context *streamingContext, w *bufio.Writ ttft := s.getWaitTimeToFirstToken(context.nPromptTokens, context.nCachedPromptTokens, context.doRemotePrefill) time.Sleep(time.Duration(ttft) * time.Millisecond) // report ttft in seconds - s.metrics.ttftChan <- (float64(ttft) / 1000) + common.WriteToChannel(s.metrics.ttftChan, (float64(ttft) / 1000), s.logger, "metrics.ttftChan") for i, token := range genTokens { if i != 0 { interTokenLat := s.getInterTokenLatency() time.Sleep(time.Duration(interTokenLat) * time.Millisecond) // report tpot in seconds - s.metrics.tpotChan <- float64(interTokenLat) / 1000 + common.WriteToChannel(s.metrics.tpotChan, (float64(interTokenLat) / 1000), s.logger, "metrics.tpotChan") } var toolChunkInsert *openaiserverapi.ToolCall diff --git a/pkg/llm-d-inference-sim/worker.go b/pkg/llm-d-inference-sim/worker.go index 5171580a..c79bcd02 100644 --- a/pkg/llm-d-inference-sim/worker.go +++ b/pkg/llm-d-inference-sim/worker.go @@ -21,6 +21,7 @@ import ( "context" "github.com/go-logr/logr" + "github.com/llm-d/llm-d-inference-sim/pkg/common" "github.com/llm-d/llm-d-inference-sim/pkg/dataset" openaiserverapi "github.com/llm-d/llm-d-inference-sim/pkg/openai-server-api" "github.com/valyala/fasthttp" @@ -63,12 +64,13 @@ func (s *VllmSimulator) processRequest(reqCtx *openaiserverapi.CompletionReqCtx) displayModel := s.getDisplayedModelName(model) // increment running requests count - s.metrics.runReqChan <- 1 + common.WriteToChannel(s.metrics.runReqChan, 1, s.logger, "metrics.runReqChan") if s.isLora(model) { // update loraInfo metric to reflect that // the request has changed its status from waiting to running - s.metrics.lorasChan <- loraUsage{model, runningUsageState} + common.WriteToChannel(s.metrics.lorasChan, loraUsage{model, runningUsageState}, s.logger, + "metrics.lorasChan") } if s.config.EnableKVCache && !reqCtx.IsChatCompletion { @@ -137,16 +139,13 @@ func (s *VllmSimulator) processRequest(reqCtx *openaiserverapi.CompletionReqCtx) s.sendResponse(reqCtx, responseTokens, toolCalls, displayModel, finishReason, &usageData) } - select { - case s.metrics.requestSuccessChan <- requestSuccessEvent{ - promptTokens: usageData.PromptTokens, - generationTokens: usageData.CompletionTokens, - maxTokens: reqCtx.CompletionReq.GetMaxCompletionTokens(), - finishReason: finishReason, - }: - default: - s.logger.V(1).Info("requestSuccessChan full, dropping success event") - } + common.WriteToChannel(s.metrics.requestSuccessChan, + requestSuccessEvent{ + promptTokens: usageData.PromptTokens, + generationTokens: usageData.CompletionTokens, + maxTokens: reqCtx.CompletionReq.GetMaxCompletionTokens(), + finishReason: finishReason}, + s.logger, "metrics.requestSuccessChan") } s.logger.V(4).Info("Finished processing request", "id", req.GetRequestID()) From 001afaf4ea48ce67b84c23574748bf28d5266fc6 Mon Sep 17 00:00:00 2001 From: irar2 Date: Thu, 23 Oct 2025 10:57:26 +0300 Subject: [PATCH 2/2] Lint Signed-off-by: irar2 --- pkg/common/utils.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/common/utils.go b/pkg/common/utils.go index 422c5e53..d1f3cfe1 100644 --- a/pkg/common/utils.go +++ b/pkg/common/utils.go @@ -133,6 +133,6 @@ func WriteToChannel[T any](channel chan T, object T, logger logr.Logger, channel select { case channel <- object: default: - logger.V(1).Info("failed to write to", "channel", channelName, "dropping value") + logger.V(1).Info("failed to write to", "channel", channelName) } }