diff --git a/README.md b/README.md index 0853bd60..f57f148b 100644 --- a/README.md +++ b/README.md @@ -65,32 +65,97 @@ API responses contains a subset of the fields provided by the OpenAI API. - messages - role - content + - tool_calls + - function + - name + - arguments + - id + - type + - index + - max_tokens + - max_completion_tokens + - tools + - type + - function + - name + - arguments + - tool_choice + - logprobs + - top_logprobs + - stream_options + - include_usage + - do_remote_decode + - do_remote_prefill + - remote_block_ids + - remote_engine_id + - remote_host + - remote_port + - ignore_eos - **response** - id - created - model - choices - - index - - finish_reason - - message + - index + - finish_reason + - message + - logprobs + - content + - token + - logprob + - bytes + - top_logprobs + - usage + - object + - do_remote_decode + - do_remote_prefill + - remote_block_ids + - remote_engine_id + - remote_host + - remote_port - `/v1/completions` - **request** - stream - model - prompt - - max_tokens (for future usage) + - max_tokens + - stream_options + - include_usage + - do_remote_decode + - do_remote_prefill + - remote_block_ids + - remote_engine_id + - remote_host + - remote_port + - ignore_eos + - logprobs - **response** - id - created - model - choices - - text + - index + - finish_reason + - text + - logprobs + - tokens + - token_logprobs + - top_logprobs + - text_offset + - usage + - object + - do_remote_decode + - do_remote_prefill + - remote_block_ids + - remote_engine_id + - remote_host + - remote_port - `/v1/models` - **response** - - object (list) + - object - data - id - - object (model) + - object - created - owned_by - root @@ -158,8 +223,22 @@ For more details see the 0 { - s.logger.Info("Existing, discard remaining events", "num of events", len(s.batch)) + s.logger.V(logging.INFO).Info("Exiting, discard remaining events", "num of events", len(s.batch)) } return ctx.Err() @@ -78,7 +79,7 @@ func (s *KVEventSender) Run(ctx context.Context) error { if !ok { // Channel closed, discard remaining events and exit if len(s.batch) > 0 { - s.logger.Info("Channel closed, discard remaining events", "num of events", len(s.batch)) + s.logger.V(logging.INFO).Info("Channel closed, discard remaining events", "num of events", len(s.batch)) } return nil } diff --git a/pkg/llm-d-inference-sim/helpers.go b/pkg/llm-d-inference-sim/helpers.go index 451767cb..809894b6 100644 --- a/pkg/llm-d-inference-sim/helpers.go +++ b/pkg/llm-d-inference-sim/helpers.go @@ -22,6 +22,7 @@ import ( "fmt" "github.com/llm-d/llm-d-inference-sim/pkg/common" + "github.com/llm-d/llm-d-inference-sim/pkg/common/logging" openaiserverapi "github.com/llm-d/llm-d-inference-sim/pkg/openai-server-api" ) @@ -92,7 +93,7 @@ func (s *VllmSimulator) showConfig(dp bool) error { if err != nil { return fmt.Errorf("failed to marshal configuration to JSON: %w", err) } - s.logger.Info("Configuration:", "", string(cfgJSON)) + s.logger.V(logging.INFO).Info("Configuration:", "", string(cfgJSON)) return nil } diff --git a/pkg/llm-d-inference-sim/lora.go b/pkg/llm-d-inference-sim/lora.go index 608bce93..f7531605 100644 --- a/pkg/llm-d-inference-sim/lora.go +++ b/pkg/llm-d-inference-sim/lora.go @@ -21,6 +21,7 @@ import ( "encoding/json" "github.com/llm-d/llm-d-inference-sim/pkg/common" + "github.com/llm-d/llm-d-inference-sim/pkg/common/logging" "github.com/valyala/fasthttp" ) @@ -40,7 +41,7 @@ func (s *VllmSimulator) getLoras() []string { if lora, ok := key.(string); ok { loras = append(loras, lora) } else { - s.logger.Info("Stored LoRA is not a string", "value", key) + s.logger.V(logging.WARN).Info("Stored LoRA is not a string", "value", key) } return true }) diff --git a/pkg/llm-d-inference-sim/metrics.go b/pkg/llm-d-inference-sim/metrics.go index 787d452c..8607289d 100644 --- a/pkg/llm-d-inference-sim/metrics.go +++ b/pkg/llm-d-inference-sim/metrics.go @@ -66,7 +66,7 @@ func (s *VllmSimulator) createAndRegisterPrometheus() error { ) if err := s.metrics.registry.Register(s.metrics.loraInfo); err != nil { - s.logger.Error(err, "Prometheus lora info gauge register failed") + s.logger.Error(err, "prometheus lora info gauge register failed") return err } @@ -80,7 +80,7 @@ func (s *VllmSimulator) createAndRegisterPrometheus() error { ) if err := s.metrics.registry.Register(s.metrics.runningRequests); err != nil { - s.logger.Error(err, "Prometheus number of running requests gauge register failed") + s.logger.Error(err, "prometheus number of running requests gauge register failed") return err } @@ -95,7 +95,7 @@ func (s *VllmSimulator) createAndRegisterPrometheus() error { ) if err := s.metrics.registry.Register(s.metrics.waitingRequests); err != nil { - s.logger.Error(err, "Prometheus number of requests in queue gauge register failed") + s.logger.Error(err, "prometheus number of requests in queue gauge register failed") return err } @@ -110,7 +110,7 @@ func (s *VllmSimulator) createAndRegisterPrometheus() error { ) if err := s.metrics.registry.Register(s.metrics.ttft); err != nil { - s.logger.Error(err, "Prometheus time to first token histogram register failed") + s.logger.Error(err, "prometheus time to first token histogram register failed") return err } @@ -125,7 +125,7 @@ func (s *VllmSimulator) createAndRegisterPrometheus() error { ) if err := s.metrics.registry.Register(s.metrics.tpot); err != nil { - s.logger.Error(err, "Prometheus time per output token histogram register failed") + s.logger.Error(err, "prometheus time per output token histogram register failed") return err } @@ -214,7 +214,7 @@ func (s *VllmSimulator) createAndRegisterPrometheus() error { ) if err := s.metrics.registry.Register(s.metrics.kvCacheUsagePercentage); err != nil { - s.logger.Error(err, "Prometheus kv cache usage percentage gauge register failed") + s.logger.Error(err, "prometheus kv cache usage percentage gauge register failed") return err } @@ -228,7 +228,7 @@ func (s *VllmSimulator) createAndRegisterPrometheus() error { []string{vllmapi.PromLabelModelName}, ) if err := s.metrics.registry.Register(s.metrics.requestPromptTokens); err != nil { - s.logger.Error(err, "Prometheus request_prompt_tokens histogram register failed") + s.logger.Error(err, "prometheus request_prompt_tokens histogram register failed") return err } @@ -242,7 +242,7 @@ func (s *VllmSimulator) createAndRegisterPrometheus() error { []string{vllmapi.PromLabelModelName}, ) if err := s.metrics.registry.Register(s.metrics.requestGenerationTokens); err != nil { - s.logger.Error(err, "Prometheus request_generation_tokens histogram register failed") + s.logger.Error(err, "prometheus request_generation_tokens histogram register failed") return err } @@ -256,7 +256,7 @@ func (s *VllmSimulator) createAndRegisterPrometheus() error { []string{vllmapi.PromLabelModelName}, ) if err := s.metrics.registry.Register(s.metrics.requestParamsMaxTokens); err != nil { - s.logger.Error(err, "Prometheus request_params_max_tokens histogram register failed") + s.logger.Error(err, "prometheus request_params_max_tokens histogram register failed") return err } @@ -269,7 +269,7 @@ func (s *VllmSimulator) createAndRegisterPrometheus() error { []string{vllmapi.PromLabelModelName, vllmapi.PromLabelFinishReason}, ) if err := s.metrics.registry.Register(s.metrics.requestSuccessTotal); err != nil { - s.logger.Error(err, "Prometheus request_success_total counter register failed") + s.logger.Error(err, "prometheus request_success_total counter register failed") return err } diff --git a/pkg/llm-d-inference-sim/server.go b/pkg/llm-d-inference-sim/server.go index e21717a9..44ce37bf 100644 --- a/pkg/llm-d-inference-sim/server.go +++ b/pkg/llm-d-inference-sim/server.go @@ -30,6 +30,7 @@ import ( "github.com/valyala/fasthttp/fasthttpadaptor" "github.com/llm-d/llm-d-inference-sim/pkg/common" + "github.com/llm-d/llm-d-inference-sim/pkg/common/logging" openaiserverapi "github.com/llm-d/llm-d-inference-sim/pkg/openai-server-api" vllmapi "github.com/llm-d/llm-d-inference-sim/pkg/vllm-api" ) @@ -75,10 +76,10 @@ func (s *VllmSimulator) startServer(ctx context.Context, listener net.Listener) serverErr := make(chan error, 1) go func() { if s.config.SSLEnabled() { - s.logger.Info("Server starting", "protocol", "HTTPS", "port", s.config.Port) + s.logger.V(logging.INFO).Info("Server starting", "protocol", "HTTPS", "port", s.config.Port) serverErr <- server.ServeTLS(listener, "", "") } else { - s.logger.Info("Server starting", "protocol", "HTTP", "port", s.config.Port) + s.logger.V(logging.INFO).Info("Server starting", "protocol", "HTTP", "port", s.config.Port) serverErr <- server.Serve(listener) } }() @@ -86,20 +87,20 @@ func (s *VllmSimulator) startServer(ctx context.Context, listener net.Listener) // Wait for either context cancellation or server error select { case <-ctx.Done(): - s.logger.Info("Shutdown signal received, shutting down server gracefully") + s.logger.V(logging.INFO).Info("Shutdown signal received, shutting down server gracefully") // Gracefully shutdown the server if err := server.Shutdown(); err != nil { - s.logger.Error(err, "Error during server shutdown") + s.logger.Error(err, "error during server shutdown") return err } - s.logger.Info("Server stopped") + s.logger.V(logging.INFO).Info("Server stopped") return nil case err := <-serverErr: if err != nil { - s.logger.Error(err, "Server failed") + s.logger.Error(err, "server failed") } return err } @@ -165,7 +166,7 @@ func (s *VllmSimulator) readTokenizeRequest(ctx *fasthttp.RequestCtx) (*vllmapi. // HandleTokenize http handler for /tokenize func (s *VllmSimulator) HandleTokenize(ctx *fasthttp.RequestCtx) { - s.logger.Info("tokenize request received") + s.logger.V(logging.TRACE).Info("Tokenize request received") req, err := s.readTokenizeRequest(ctx) if err != nil { s.logger.Error(err, "failed to read and parse tokenize request body") @@ -207,12 +208,12 @@ func (s *VllmSimulator) HandleTokenize(ctx *fasthttp.RequestCtx) { } func (s *VllmSimulator) HandleLoadLora(ctx *fasthttp.RequestCtx) { - s.logger.Info("load lora request received") + s.logger.V(logging.DEBUG).Info("Load lora request received") s.loadLoraAdaptor(ctx) } func (s *VllmSimulator) HandleUnloadLora(ctx *fasthttp.RequestCtx) { - s.logger.Info("unload lora request received") + s.logger.V(logging.DEBUG).Info("Unload lora request received") s.unloadLoraAdaptor(ctx) } @@ -270,7 +271,7 @@ func (s *VllmSimulator) sendCompletionResponse(ctx *fasthttp.RequestCtx, resp op func (s *VllmSimulator) sendCompletionError(ctx *fasthttp.RequestCtx, compErr openaiserverapi.CompletionError, isInjected bool) { if isInjected { - s.logger.Info("Injecting failure", "type", compErr.Type, "message", compErr.Message) + s.logger.V(logging.TRACE).Info("Injecting failure", "type", compErr.Type, "message", compErr.Message) } else { s.logger.Error(nil, compErr.Message) } @@ -291,11 +292,12 @@ func (s *VllmSimulator) sendCompletionError(ctx *fasthttp.RequestCtx, // HandleModels handles /v1/models request according the data stored in the simulator func (s *VllmSimulator) HandleModels(ctx *fasthttp.RequestCtx) { + s.logger.V(logging.TRACE).Info("/models request received") modelsResp := s.createModelsResponse() data, err := json.Marshal(modelsResp) if err != nil { - s.logger.Error(err, "Failed to marshal models response") + s.logger.Error(err, "failed to marshal models response") ctx.Error("Failed to marshal models response, "+err.Error(), fasthttp.StatusInternalServerError) return } @@ -306,12 +308,12 @@ func (s *VllmSimulator) HandleModels(ctx *fasthttp.RequestCtx) { } func (s *VllmSimulator) HandleError(_ *fasthttp.RequestCtx, err error) { - s.logger.Error(err, "VLLM server error") + s.logger.Error(err, "vLLM server error") } // HandleHealth http handler for /health func (s *VllmSimulator) HandleHealth(ctx *fasthttp.RequestCtx) { - s.logger.V(4).Info("health request received") + s.logger.V(logging.TRACE).Info("Health request received") ctx.Response.Header.SetContentType("application/json") ctx.Response.Header.SetStatusCode(fasthttp.StatusOK) ctx.Response.SetBody([]byte("{}")) @@ -319,7 +321,7 @@ func (s *VllmSimulator) HandleHealth(ctx *fasthttp.RequestCtx) { // HandleReady http handler for /ready func (s *VllmSimulator) HandleReady(ctx *fasthttp.RequestCtx) { - s.logger.V(4).Info("readiness request received") + s.logger.V(logging.TRACE).Info("Readiness request received") ctx.Response.Header.SetContentType("application/json") ctx.Response.Header.SetStatusCode(fasthttp.StatusOK) ctx.Response.SetBody([]byte("{}")) diff --git a/pkg/llm-d-inference-sim/server_tls.go b/pkg/llm-d-inference-sim/server_tls.go index 601418d7..0f23772c 100644 --- a/pkg/llm-d-inference-sim/server_tls.go +++ b/pkg/llm-d-inference-sim/server_tls.go @@ -27,6 +27,7 @@ import ( "math/big" "time" + "github.com/llm-d/llm-d-inference-sim/pkg/common/logging" "github.com/valyala/fasthttp" ) @@ -40,10 +41,10 @@ func (s *VllmSimulator) configureSSL(server *fasthttp.Server) error { var err error if s.config.SSLCertFile != "" && s.config.SSLKeyFile != "" { - s.logger.Info("HTTPS server starting with certificate files", "cert", s.config.SSLCertFile, "key", s.config.SSLKeyFile) + s.logger.V(logging.INFO).Info("HTTPS server starting with certificate files", "cert", s.config.SSLCertFile, "key", s.config.SSLKeyFile) cert, err = tls.LoadX509KeyPair(s.config.SSLCertFile, s.config.SSLKeyFile) } else if s.config.SelfSignedCerts { - s.logger.Info("HTTPS server starting with self-signed certificate") + s.logger.V(logging.INFO).Info("HTTPS server starting with self-signed certificate") cert, err = CreateSelfSignedTLSCertificate() } diff --git a/pkg/llm-d-inference-sim/simulator.go b/pkg/llm-d-inference-sim/simulator.go index 158ee3a1..0dbd8279 100644 --- a/pkg/llm-d-inference-sim/simulator.go +++ b/pkg/llm-d-inference-sim/simulator.go @@ -34,6 +34,7 @@ import ( "k8s.io/klog/v2" "github.com/llm-d/llm-d-inference-sim/pkg/common" + "github.com/llm-d/llm-d-inference-sim/pkg/common/logging" "github.com/llm-d/llm-d-inference-sim/pkg/dataset" kvcache "github.com/llm-d/llm-d-inference-sim/pkg/kv-cache" openaiserverapi "github.com/llm-d/llm-d-inference-sim/pkg/openai-server-api" @@ -272,7 +273,7 @@ func (s *VllmSimulator) startSim(ctx context.Context) error { listener, err := s.newListener() if err != nil { - s.logger.Error(err, "Failed to create listener") + s.logger.Error(err, "failed to create listener") return fmt.Errorf("listener creation error: %w", err) } @@ -366,7 +367,7 @@ func (s *VllmSimulator) initDataset(ctx context.Context) error { } if s.config.DatasetPath == "" && s.config.DatasetURL == "" { - s.logger.Info("No dataset path or URL provided, using random text for responses") + s.logger.V(logging.INFO).Info("No dataset path or URL provided, using random text for responses") s.dataset = randDataset return nil } @@ -380,7 +381,7 @@ func (s *VllmSimulator) initDataset(ctx context.Context) error { } if strings.HasPrefix(err.Error(), "database is locked") { - s.logger.Info("Database is locked by another process, will use preset text for responses instead") + s.logger.V(logging.WARN).Info("Database is locked by another process, will use preset text for responses instead") s.dataset = randDataset return nil } @@ -390,20 +391,20 @@ func (s *VllmSimulator) initDataset(ctx context.Context) error { // Print prints to a log, implementation of fasthttp.Logger func (s *VllmSimulator) Printf(format string, args ...interface{}) { - s.logger.Info("Server error", "msg", fmt.Sprintf(format, args...)) + s.logger.V(logging.WARN).Info("Server error", "msg", fmt.Sprintf(format, args...)) } func (s *VllmSimulator) processing(ctx context.Context) { - s.logger.Info("Start processing routine") + s.logger.V(logging.INFO).Info("Start processing routine") for { select { case <-ctx.Done(): - s.logger.Info("Request processing done") + s.logger.V(logging.INFO).Info("Request processing done") return case completedReq := <-s.workerFinished: worker := completedReq.worker - s.logger.V(4).Info("Worker finished", "worker", worker.id) + s.logger.V(logging.TRACE).Info("Worker finished", "worker", worker.id) s.decrementLora(completedReq.model) // there is a free worker - find a request for it and send this request for // processing with this worker @@ -412,7 +413,7 @@ func (s *VllmSimulator) processing(ctx context.Context) { // there is a LoRA that can be removed, go through availbale workers // and queued requests and find requests that can run now, // stop if there are no free workers, or no requests - s.logger.V(4).Info("LoRA can be removed") + s.logger.V(logging.TRACE).Info("LoRA can be removed") for { // check if there is a free worker worker := s.getFreeWorker() @@ -433,7 +434,7 @@ func (s *VllmSimulator) processing(ctx context.Context) { worker := s.getFreeWorker() if worker == nil { - s.logger.V(4).Info("No free worker - sending the request to the waiting queue", + s.logger.V(logging.TRACE).Info("No free worker - sending the request to the waiting queue", "model", reqCtx.CompletionReq.GetModel(), "req id", reqCtx.CompletionReq.GetRequestID()) // no free worker, add this request to the waiting queue s.addRequestToQueue(reqCtx) @@ -444,14 +445,14 @@ func (s *VllmSimulator) processing(ctx context.Context) { if s.isLora(model) && !s.loadLora(model) { // free the worker s.freeWorkers <- worker - s.logger.V(4).Info("LoRA cannot be loaded - sending the request to the waiting queue", + s.logger.V(logging.TRACE).Info("LoRA cannot be loaded - sending the request to the waiting queue", "LoRA", model, "req id", reqCtx.CompletionReq.GetRequestID()) // LoRA max reached, try to enqueue s.addRequestToQueue(reqCtx) break } - s.logger.V(4).Info("Sending the request to the processing channel", "model", model, + s.logger.V(logging.TRACE).Info("Sending the request to the processing channel", "model", model, "req id", reqCtx.CompletionReq.GetRequestID(), "worker", worker.id) common.WriteToChannel(worker.reqChan, reqCtx, s.logger, "worker's reqChan") } @@ -462,7 +463,7 @@ func (s *VllmSimulator) findRequestAndSendToProcess(worker *worker) bool { nextReq := s.dequeue() if nextReq != nil { // send this request for processing in this worker - s.logger.V(4).Info("Sending request to processing", "model", nextReq.CompletionReq.GetModel(), + s.logger.V(logging.TRACE).Info("Sending request to processing", "model", nextReq.CompletionReq.GetModel(), "req", nextReq.CompletionReq.GetRequestID(), "worker", worker.id) common.WriteToChannel(worker.reqChan, nextReq, s.logger, "worker's reqChan") // decrement waiting requests metric @@ -517,7 +518,7 @@ func (s *VllmSimulator) handleCompletions(ctx *fasthttp.RequestCtx, isChatComple return } - s.logger.V(4).Info("Completion request received", "req id", vllmReq.GetRequestID(), "isChat", isChatCompletion) + s.logger.V(logging.DEBUG).Info("Completion request received", "req id", vllmReq.GetRequestID(), "isChat", isChatCompletion) var wg sync.WaitGroup wg.Add(1) diff --git a/pkg/llm-d-inference-sim/streaming.go b/pkg/llm-d-inference-sim/streaming.go index 2a136beb..6e708b6d 100644 --- a/pkg/llm-d-inference-sim/streaming.go +++ b/pkg/llm-d-inference-sim/streaming.go @@ -24,6 +24,7 @@ import ( "time" "github.com/llm-d/llm-d-inference-sim/pkg/common" + "github.com/llm-d/llm-d-inference-sim/pkg/common/logging" "github.com/llm-d/llm-d-inference-sim/pkg/dataset" openaiserverapi "github.com/llm-d/llm-d-inference-sim/pkg/openai-server-api" "github.com/valyala/fasthttp" @@ -73,12 +74,12 @@ func (s *VllmSimulator) sendStreamingResponse(context *streamingContext, respons } } if len(toolCalls) > 0 { - s.logger.Info("Going to send tools calls") + s.logger.V(logging.TRACE).Info("Going to send tools calls") for _, tc := range toolCalls { s.sendTokenChunks(context, w, tc.Function.TokenizedArguments, &tc, finishReason) } } else { - s.logger.Info("Going to send text", "number of tokens", len(responseTokens)) + s.logger.V(logging.TRACE).Info("Going to send text", "number of tokens", len(responseTokens)) s.sendTokenChunks(context, w, responseTokens, nil, finishReason) } } diff --git a/pkg/llm-d-inference-sim/worker.go b/pkg/llm-d-inference-sim/worker.go index 1dc796a6..692a657f 100644 --- a/pkg/llm-d-inference-sim/worker.go +++ b/pkg/llm-d-inference-sim/worker.go @@ -23,6 +23,7 @@ import ( "github.com/go-logr/logr" "github.com/llm-d/llm-d-inference-sim/pkg/common" + "github.com/llm-d/llm-d-inference-sim/pkg/common/logging" "github.com/llm-d/llm-d-inference-sim/pkg/dataset" openaiserverapi "github.com/llm-d/llm-d-inference-sim/pkg/openai-server-api" "github.com/valyala/fasthttp" @@ -46,7 +47,7 @@ func (w *worker) waitForRequests() { for { select { case <-w.ctx.Done(): - w.logger.V(4).Info("worker done", "id", w.id) + w.logger.V(logging.TRACE).Info("worker done", "id", w.id) return case req := <-w.reqChan: w.processor.processRequest(req) @@ -157,7 +158,7 @@ func (s *VllmSimulator) processRequest(reqCtx *openaiserverapi.CompletionReqCtx) s.logger, "metrics.requestSuccessChan") } - s.logger.V(4).Info("Finished processing request", "id", req.GetRequestID()) + s.logger.V(logging.DEBUG).Info("Finished processing request", "id", req.GetRequestID()) reqCtx.Wg.Done() }