diff --git a/pkg/kv-cache/block_cache.go b/pkg/kv-cache/block_cache.go index 9045dd1b..79d21e50 100644 --- a/pkg/kv-cache/block_cache.go +++ b/pkg/kv-cache/block_cache.go @@ -40,11 +40,12 @@ type blockCache struct { maxBlocks int // maximum number of blocks in the cache eventSender *KVEventSender // emmits kv events eventChan chan EventData // channel for asynchronous event processing + usageChan chan float64 // channel for usage reporting logger logr.Logger } // newBlockCache creates a new blockCache with the specified maximum number of blocks -func newBlockCache(config *common.Configuration, logger logr.Logger) (*blockCache, error) { +func newBlockCache(config *common.Configuration, logger logr.Logger, usageChan chan float64) (*blockCache, error) { // TODO read size of channel from config eChan := make(chan EventData, 10000) @@ -63,6 +64,7 @@ func newBlockCache(config *common.Configuration, logger logr.Logger) (*blockCach unusedBlocks: make(map[uint64]time.Time), maxBlocks: config.KVCacheSize, eventChan: eChan, + usageChan: usageChan, eventSender: NewKVEventSender(publisher, createTopic(config), eChan, config.EventBatchSize, delay, logger), logger: logger, }, nil @@ -149,6 +151,9 @@ func (bc *blockCache) startRequest(requestID string, blocks []uint64) (int, erro bc.requestToBlocks[requestID] = make([]uint64, len(blocks)) copy(bc.requestToBlocks[requestID], blocks) + if bc.usageChan != nil { + bc.usageChan <- float64(len(bc.usedBlocks)) / float64(bc.maxBlocks) + } return len(blockAreadyInUse) + len(blockToMoveToUsed), nil } @@ -182,6 +187,10 @@ func (bc *blockCache) finishRequest(requestID string) error { } } + if bc.usageChan != nil { + bc.usageChan <- float64(len(bc.usedBlocks)) / float64(bc.maxBlocks) + } + // Remove the request mapping delete(bc.requestToBlocks, requestID) diff --git a/pkg/kv-cache/kv_cache.go b/pkg/kv-cache/kv_cache.go index 5c5819db..ebb32a83 100644 --- a/pkg/kv-cache/kv_cache.go +++ b/pkg/kv-cache/kv_cache.go @@ -35,7 +35,7 @@ type KVCacheHelper struct { blockSize int } -func NewKVCacheHelper(config *common.Configuration, logger logr.Logger) (*KVCacheHelper, error) { +func NewKVCacheHelper(config *common.Configuration, logger logr.Logger, usageChan chan float64) (*KVCacheHelper, error) { tokenProcConfig := kvblock.DefaultTokenProcessorConfig() tokenProcConfig.BlockSize = config.TokenBlockSize if config.HashSeed != "" { @@ -51,7 +51,7 @@ func NewKVCacheHelper(config *common.Configuration, logger logr.Logger) (*KVCach if err != nil { return nil, fmt.Errorf("failed to create tokenizer: %w", err) } - blockCache, err := newBlockCache(config, logger) + blockCache, err := newBlockCache(config, logger, usageChan) if err != nil { return nil, fmt.Errorf("failed to create block cache: %w", err) } @@ -92,11 +92,14 @@ func (h *KVCacheHelper) OnRequestStart(vllmReq openaiserverapi.CompletionRequest blockHashes[i] = key.ChunkHash } - nExistingBlocks, err := h.blockCache.startRequest(requestID, blockHashes) - vllmReq.SetNumberOfCachedPromptTokens(nExistingBlocks * h.blockSize) + nBlocksAlreadyInCache, err := h.blockCache.startRequest(requestID, blockHashes) + if err == nil { + vllmReq.SetNumberOfCachedPromptTokens(nBlocksAlreadyInCache * h.blockSize) + } + return err } -func (h *KVCacheHelper) OnRequestEnd(vllmReq openaiserverapi.CompletionRequest) error { - return h.blockCache.finishRequest(vllmReq.GetRequestID()) +func (h *KVCacheHelper) OnRequestEnd(requestID string) error { + return h.blockCache.finishRequest(requestID) } diff --git a/pkg/kv-cache/kv_cache_test.go b/pkg/kv-cache/kv_cache_test.go index 172c9ced..1d1d1c82 100644 --- a/pkg/kv-cache/kv_cache_test.go +++ b/pkg/kv-cache/kv_cache_test.go @@ -216,7 +216,7 @@ var _ = Describe("KV cache", Ordered, func() { wg := sync.WaitGroup{} wg.Add(1) - blockCache, err := newBlockCache(config, GinkgoLogr) + blockCache, err := newBlockCache(config, GinkgoLogr, nil) Expect(err).NotTo(HaveOccurred()) go func() { @@ -318,7 +318,7 @@ var _ = Describe("KV cache", Ordered, func() { wg := sync.WaitGroup{} wg.Add(1) - blockCache, err := newBlockCache(config, GinkgoLogr) + blockCache, err := newBlockCache(config, GinkgoLogr, nil) Expect(err).NotTo(HaveOccurred()) go func() { @@ -422,7 +422,7 @@ var _ = Describe("KV cache", Ordered, func() { KVCacheSize: testCase.cacheSize, ZMQMaxConnectAttempts: 3, } - blockCache, err := newBlockCache(&config, GinkgoLogr) + blockCache, err := newBlockCache(&config, GinkgoLogr, nil) Expect(err).NotTo(HaveOccurred()) var wg sync.WaitGroup diff --git a/pkg/llm-d-inference-sim/failures_test.go b/pkg/llm-d-inference-sim/failures_test.go index 5ff48034..24ebf0a8 100644 --- a/pkg/llm-d-inference-sim/failures_test.go +++ b/pkg/llm-d-inference-sim/failures_test.go @@ -127,7 +127,7 @@ var _ = Describe("Failures", func() { BeforeEach(func() { ctx = context.Background() var err error - client, err = startServerWithArgs(ctx, "failure", []string{ + client, err = startServerWithArgs(ctx, "", []string{ "cmd", "--model", model, "--failure-injection-rate", "100", }, nil) @@ -185,7 +185,7 @@ var _ = Describe("Failures", func() { BeforeEach(func() { ctx = context.Background() var err error - client, err = startServerWithArgs(ctx, "failure", []string{ + client, err = startServerWithArgs(ctx, "", []string{ "cmd", "--model", model, "--failure-injection-rate", "100", "--failure-types", common.FailureTypeRateLimit, @@ -221,7 +221,7 @@ var _ = Describe("Failures", func() { BeforeEach(func() { ctx = context.Background() var err error - client, err = startServerWithArgs(ctx, "failure", []string{ + client, err = startServerWithArgs(ctx, "", []string{ "cmd", "--model", model, "--failure-injection-rate", "100", "--failure-types", common.FailureTypeInvalidAPIKey, common.FailureTypeServerError, @@ -262,7 +262,7 @@ var _ = Describe("Failures", func() { BeforeEach(func() { ctx = context.Background() var err error - client, err = startServerWithArgs(ctx, "failure", []string{ + client, err = startServerWithArgs(ctx, "", []string{ "cmd", "--model", model, "--failure-injection-rate", "0", }, nil) @@ -293,7 +293,7 @@ var _ = Describe("Failures", func() { DescribeTable("should return correct error for each failure type", func(failureType string, expectedStatusCode int, expectedErrorType string) { ctx := context.Background() - client, err := startServerWithArgs(ctx, "failure", []string{ + client, err := startServerWithArgs(ctx, "", []string{ "cmd", "--model", model, "--failure-injection-rate", "100", "--failure-types", failureType, diff --git a/pkg/llm-d-inference-sim/metrics.go b/pkg/llm-d-inference-sim/metrics.go index 850db935..e86e900f 100644 --- a/pkg/llm-d-inference-sim/metrics.go +++ b/pkg/llm-d-inference-sim/metrics.go @@ -181,11 +181,23 @@ func (s *VllmSimulator) reportWaitingRequests() { } } +// reportKVCacheUsage sets information about kv cache usage +func (s *VllmSimulator) reportKVCacheUsage(value float64) { + if s.config.FakeMetrics != nil { + return + } + if s.kvCacheUsagePercentage != nil { + s.kvCacheUsagePercentage.WithLabelValues( + s.getDisplayedModelName(s.config.Model)).Set(value) + } +} + // startMetricsUpdaters starts the various metrics updaters func (s *VllmSimulator) startMetricsUpdaters(ctx context.Context) { go s.waitingRequestsUpdater(ctx) go s.runningRequestsUpdater(ctx) go s.lorasUpdater(ctx) + go s.kvCacheUsageUpdater(ctx) } // waitingRequestsUpdater updates the waiting requests metric by listening on the relevant channel @@ -214,6 +226,18 @@ func (s *VllmSimulator) runningRequestsUpdater(ctx context.Context) { } } +// kvCacheUsageUpdater updates the kv cache usage metric by listening on the relevant channel +func (s *VllmSimulator) kvCacheUsageUpdater(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case value := <-s.kvCacheUsageChan: + s.reportKVCacheUsage(value) + } + } +} + // lorasUpdater updates the running loras metric by listening on the relevant channel // one function updates both waiting and running loras since they a part of the same prometheus gauge func (s *VllmSimulator) lorasUpdater(ctx context.Context) { diff --git a/pkg/llm-d-inference-sim/metrics_test.go b/pkg/llm-d-inference-sim/metrics_test.go index f721093e..bc94c460 100644 --- a/pkg/llm-d-inference-sim/metrics_test.go +++ b/pkg/llm-d-inference-sim/metrics_test.go @@ -21,6 +21,7 @@ import ( "errors" "io" "net/http" + "os" "regexp" "sort" "strconv" @@ -314,6 +315,161 @@ var _ = Describe("Simulator metrics", Ordered, func() { Expect(bothRunningTimestamp <= emptyTimestamp).To(BeTrue()) }) + Context("kv cache metrics", func() { + tmpDir := "./tests-tmp/" + AfterAll(func() { + err := os.RemoveAll(tmpDir) + Expect(err).NotTo(HaveOccurred()) + }) + It("Should send correct kv cache usage metrics", func() { + modelName := "Qwen/Qwen2-0.5B" + // Three requests, there are should be two blocks in the kv cache, because + // the first and the second prompt share a block. + ctx := context.TODO() + args := []string{"cmd", "--model", modelName, "--mode", common.ModeRandom, + "--enable-kvcache", "true", "--kv-cache-size", "16", "--block-size", "8", + "--time-to-first-token", "5000", "--tokenizers-cache-dir", tmpDir} + + client, err := startServerWithArgs(ctx, common.ModeRandom, args, nil) + Expect(err).NotTo(HaveOccurred()) + + openaiclient := openai.NewClient( + option.WithBaseURL(baseURL), + option.WithHTTPClient(client)) + + paramsArray := []openai.CompletionNewParams{ + { + Prompt: openai.CompletionNewParamsPromptUnion{ + OfString: openai.String("What is the weather like in Haifa today? Is it cold?"), + }, + Model: openai.CompletionNewParamsModel(modelName), + }, + { + Prompt: openai.CompletionNewParamsPromptUnion{ + OfString: openai.String("What is the weather like in Haifa today?"), + }, + Model: openai.CompletionNewParamsModel(modelName), + }, + { + Prompt: openai.CompletionNewParamsPromptUnion{ + OfString: openai.String("What is the weather like in New York today?"), + }, + Model: openai.CompletionNewParamsModel(modelName), + }, + } + + for _, params := range paramsArray { + go func() { + defer GinkgoRecover() + _, err := openaiclient.Completions.New(ctx, params) + Expect(err).NotTo(HaveOccurred()) + }() + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + defer GinkgoRecover() + + time.Sleep(4 * time.Second) + metricsResp, err := client.Get(metricsUrl) + Expect(err).NotTo(HaveOccurred()) + Expect(metricsResp.StatusCode).To(Equal(http.StatusOK)) + + data, err := io.ReadAll(metricsResp.Body) + Expect(err).NotTo(HaveOccurred()) + metrics := string(data) + // Expect three running requests and two blocks in the kv cache - usage 2/16=0.125 + Expect(metrics).To(ContainSubstring("vllm:num_requests_running{model_name=\"Qwen/Qwen2-0.5B\"} 3")) + Expect(metrics).To(ContainSubstring("vllm:num_requests_waiting{model_name=\"Qwen/Qwen2-0.5B\"} 0")) + Expect(metrics).To(ContainSubstring("vllm:gpu_cache_usage_perc{model_name=\"Qwen/Qwen2-0.5B\"} 0.125")) + + time.Sleep(3 * time.Second) + metricsResp, err = client.Get(metricsUrl) + Expect(err).NotTo(HaveOccurred()) + Expect(metricsResp.StatusCode).To(Equal(http.StatusOK)) + + data, err = io.ReadAll(metricsResp.Body) + Expect(err).NotTo(HaveOccurred()) + metrics = string(data) + // The requests finished running, expect 0 usage + Expect(metrics).To(ContainSubstring("vllm:num_requests_running{model_name=\"Qwen/Qwen2-0.5B\"} 0")) + Expect(metrics).To(ContainSubstring("vllm:num_requests_waiting{model_name=\"Qwen/Qwen2-0.5B\"} 0")) + Expect(metrics).To(ContainSubstring("vllm:gpu_cache_usage_perc{model_name=\"Qwen/Qwen2-0.5B\"} 0")) + }() + wg.Wait() + }) + + It("Should send correct kv cache usage metrics for sequentual requests", func() { + modelName := "Qwen/Qwen2-0.5B" + ctx := context.TODO() + args := []string{"cmd", "--model", modelName, "--mode", common.ModeRandom, + "--enable-kvcache", "true", "--kv-cache-size", "16", "--block-size", "8", + "--time-to-first-token", "5000", "--tokenizers-cache-dir", tmpDir, "--max-num-seqs", "2"} + + client, err := startServerWithArgs(ctx, common.ModeRandom, args, nil) + Expect(err).NotTo(HaveOccurred()) + + openaiclient := openai.NewClient( + option.WithBaseURL(baseURL), + option.WithHTTPClient(client)) + + paramsArray := []openai.CompletionNewParams{ + { + Prompt: openai.CompletionNewParamsPromptUnion{ + OfString: openai.String("What is the weather like in Haifa today? Is it cold?"), + }, + Model: openai.CompletionNewParamsModel(modelName), + }, + { + Prompt: openai.CompletionNewParamsPromptUnion{ + OfString: openai.String("What is the weather like in Haifa today?"), + }, + Model: openai.CompletionNewParamsModel(modelName), + }, + { + Prompt: openai.CompletionNewParamsPromptUnion{ + OfString: openai.String("What is the weather like in New York today?"), + }, + Model: openai.CompletionNewParamsModel(modelName), + }, + } + + for i, params := range paramsArray { + go func() { + defer GinkgoRecover() + time.Sleep(time.Duration(i*500) * time.Millisecond) + _, err := openaiclient.Completions.New(ctx, params) + Expect(err).NotTo(HaveOccurred()) + }() + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + defer GinkgoRecover() + + time.Sleep(3 * time.Second) + metricsResp, err := client.Get(metricsUrl) + Expect(err).NotTo(HaveOccurred()) + Expect(metricsResp.StatusCode).To(Equal(http.StatusOK)) + + data, err := io.ReadAll(metricsResp.Body) + Expect(err).NotTo(HaveOccurred()) + metrics := string(data) + // The requests were sent with 500 millisecond intervals, and the first two should be still running. + // The third is waiting, and is still not in the kv-cache. + // We expect one block in the kv-cache, usage 1/16=0.0625. + Expect(metrics).To(ContainSubstring("vllm:num_requests_running{model_name=\"Qwen/Qwen2-0.5B\"} 2")) + Expect(metrics).To(ContainSubstring("vllm:num_requests_waiting{model_name=\"Qwen/Qwen2-0.5B\"} 1")) + Expect(metrics).To(ContainSubstring("vllm:gpu_cache_usage_perc{model_name=\"Qwen/Qwen2-0.5B\"} 0.0625")) + }() + wg.Wait() + }) + }) + Context("fake metrics", func() { It("Should respond with fake metrics to /metrics", func() { ctx := context.TODO() diff --git a/pkg/llm-d-inference-sim/simulator.go b/pkg/llm-d-inference-sim/simulator.go index 24446685..e080dac7 100644 --- a/pkg/llm-d-inference-sim/simulator.go +++ b/pkg/llm-d-inference-sim/simulator.go @@ -95,6 +95,8 @@ type VllmSimulator struct { nWaitingReqs int64 // waitingReqChan is a channel to update nWaitingReqs waitingReqChan chan int64 + // kvCacheUsageChan is a channel to update kvCacheUsagePercentage + kvCacheUsageChan chan float64 // registry is a Prometheus registry registry *prometheus.Registry // loraInfo is prometheus gauge @@ -125,15 +127,16 @@ func New(logger logr.Logger) (*VllmSimulator, error) { } return &VllmSimulator{ - logger: logger, - reqChan: make(chan *openaiserverapi.CompletionReqCtx, maxNumberOfRequests), - toolsValidator: toolsValidator, - kvcacheHelper: nil, // kvcache helper will be created only if required after reading configuration - namespace: os.Getenv(podNsEnv), - pod: os.Getenv(podNameEnv), - runReqChan: make(chan int64, maxNumberOfRequests), - waitingReqChan: make(chan int64, maxNumberOfRequests), - lorasChan: make(chan loraUsage, maxNumberOfRequests), + logger: logger, + reqChan: make(chan *openaiserverapi.CompletionReqCtx, maxNumberOfRequests), + toolsValidator: toolsValidator, + kvcacheHelper: nil, // kvcache helper will be created only if required after reading configuration + namespace: os.Getenv(podNsEnv), + pod: os.Getenv(podNameEnv), + runReqChan: make(chan int64, maxNumberOfRequests), + waitingReqChan: make(chan int64, maxNumberOfRequests), + lorasChan: make(chan loraUsage, maxNumberOfRequests), + kvCacheUsageChan: make(chan float64, maxNumberOfRequests), }, nil } @@ -192,7 +195,7 @@ func (s *VllmSimulator) startSim(ctx context.Context) error { } if s.config.EnableKVCache { - s.kvcacheHelper, err = kvcache.NewKVCacheHelper(s.config, s.logger) + s.kvcacheHelper, err = kvcache.NewKVCacheHelper(s.config, s.logger, s.kvCacheUsageChan) if err != nil { return err } @@ -360,6 +363,15 @@ func (s *VllmSimulator) validateRequest(req openaiserverapi.CompletionRequest) ( return "Ignore_eos is true but max_completion_tokens (or max_tokens) is not set", fasthttp.StatusBadRequest } + // Validate context window constraints + promptTokens := req.GetNumberOfPromptTokens() + completionTokens := req.GetMaxCompletionTokens() + isValid, actualCompletionTokens, totalTokens := common.ValidateContextWindow(promptTokens, completionTokens, s.config.MaxModelLen) + if !isValid { + message := fmt.Sprintf("This model's maximum context length is %d tokens. However, you requested %d tokens (%d in the messages, %d in the completion). Please reduce the length of the messages or completion", + s.config.MaxModelLen, totalTokens, promptTokens, actualCompletionTokens) + return message, fasthttp.StatusBadRequest + } return "", fasthttp.StatusOK } @@ -412,33 +424,6 @@ func (s *VllmSimulator) handleCompletions(ctx *fasthttp.RequestCtx, isChatComple return } - defer func() { - if s.config.EnableKVCache && !isChatCompletion { - err := s.kvcacheHelper.OnRequestEnd(vllmReq) - if err != nil { - s.logger.Error(err, "kv cache failed to process request end") - } - } - }() - if s.config.EnableKVCache && !isChatCompletion { - // kv cache is currently supported for /completion API only - err = s.kvcacheHelper.OnRequestStart(vllmReq) - if err != nil { - s.sendCompletionError(ctx, openaiserverapi.NewCompletionError(err.Error(), fasthttp.StatusInternalServerError, nil), false) - } - } - - // Validate context window constraints - promptTokens := vllmReq.GetNumberOfPromptTokens() - completionTokens := vllmReq.GetMaxCompletionTokens() - isValid, actualCompletionTokens, totalTokens := common.ValidateContextWindow(promptTokens, completionTokens, s.config.MaxModelLen) - if !isValid { - message := fmt.Sprintf("This model's maximum context length is %d tokens. However, you requested %d tokens (%d in the messages, %d in the completion). Please reduce the length of the messages or completion", - s.config.MaxModelLen, totalTokens, promptTokens, actualCompletionTokens) - s.sendCompletionError(ctx, openaiserverapi.NewCompletionError(message, fasthttp.StatusBadRequest, nil), false) - return - } - var wg sync.WaitGroup wg.Add(1) reqCtx := &openaiserverapi.CompletionReqCtx{ @@ -474,7 +459,7 @@ func (s *VllmSimulator) reqProcessingWorker(ctx context.Context, id int) { model := req.GetModel() displayModel := s.getDisplayedModelName(model) - // decriment waiting and increment running requests count + // decrement waiting and increment running requests count s.waitingReqChan <- -1 s.runReqChan <- 1 @@ -484,6 +469,13 @@ func (s *VllmSimulator) reqProcessingWorker(ctx context.Context, id int) { s.lorasChan <- loraUsage{model, runningUsageState} } + if s.config.EnableKVCache && !reqCtx.IsChatCompletion { + // kv cache is currently supported for /completion API only + if err := s.kvcacheHelper.OnRequestStart(req); err != nil { + s.sendCompletionError(reqCtx.HTTPReqCtx, openaiserverapi.NewCompletionError(err.Error(), fasthttp.StatusInternalServerError, nil), false) + } + } + var responseTokens []string var finishReason string var err error @@ -545,8 +537,8 @@ func (s *VllmSimulator) reqProcessingWorker(ctx context.Context, id int) { } } -// decrease model usage reference number -func (s *VllmSimulator) responseSentCallback(model string) { +// request processing finished +func (s *VllmSimulator) responseSentCallback(model string, isChatCompletion bool, requestID string) { // decriment running requests count s.runReqChan <- -1 @@ -554,6 +546,12 @@ func (s *VllmSimulator) responseSentCallback(model string) { // update loraInfo metrics to reflect that the request processing has been finished s.lorasChan <- loraUsage{model, doneUsageState} } + + if s.config.EnableKVCache && !isChatCompletion { + if err := s.kvcacheHelper.OnRequestEnd(requestID); err != nil { + s.logger.Error(err, "kv cache failed to process request end") + } + } } // sendCompletionError sends an error response for the current completion request @@ -692,7 +690,7 @@ func (s *VllmSimulator) sendResponse(reqCtx *openaiserverapi.CompletionReqCtx, r } ctx.Response.SetBody(data) - s.responseSentCallback(modelName) + s.responseSentCallback(modelName, reqCtx.IsChatCompletion, reqCtx.CompletionReq.GetRequestID()) } // returns time to first token based on the current request's doRemotePrefill diff --git a/pkg/llm-d-inference-sim/simulator_test.go b/pkg/llm-d-inference-sim/simulator_test.go index df43ff57..c321e3f1 100644 --- a/pkg/llm-d-inference-sim/simulator_test.go +++ b/pkg/llm-d-inference-sim/simulator_test.go @@ -28,6 +28,7 @@ import ( "time" "github.com/llm-d/llm-d-inference-sim/pkg/common" + kvcache "github.com/llm-d/llm-d-inference-sim/pkg/kv-cache" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/openai/openai-go" @@ -96,6 +97,15 @@ func startServerWithArgs(ctx context.Context, mode string, args []string, envs m return nil, err } + if s.config.EnableKVCache { + s.kvcacheHelper, err = kvcache.NewKVCacheHelper(s.config, s.logger, s.kvCacheUsageChan) + if err != nil { + return nil, err + } + + go s.kvcacheHelper.Run(ctx) + } + // calculate number of tokens for user message, // must be activated after parseCommandParamsAndLoadConfig since it initializes the random engine userMsgTokens = int64(len(common.Tokenize(userMessage))) diff --git a/pkg/llm-d-inference-sim/streaming.go b/pkg/llm-d-inference-sim/streaming.go index 5ff1e240..ea9b6676 100644 --- a/pkg/llm-d-inference-sim/streaming.go +++ b/pkg/llm-d-inference-sim/streaming.go @@ -35,6 +35,7 @@ type streamingContext struct { doRemotePrefill bool nPromptTokens int nCachedPromptTokens int + requestID string } // sendStreamingResponse creates and sends a streaming response for completion requests of both types (text and chat) @@ -91,7 +92,7 @@ func (s *VllmSimulator) sendStreamingResponse(context *streamingContext, respons context.ctx.Error("Sending last stream chunk failed, "+err.Error(), fasthttp.StatusInternalServerError) return } - s.responseSentCallback(context.model) + s.responseSentCallback(context.model, context.isChatCompletion, context.requestID) }) }