Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion manifests/config_with_fake.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@ time-to-first-token: 2000
inter-token-latency: 1000
kv-cache-transfer-latency: 100
seed: 100100100
fake-metrics:
fake-metrics:
running-requests: 16
waiting-requests: 3
kv-cache-usage: 0.3
request-success-total:
stop: 20
request-prompt-tokens: [ 10, 20, 30, 15 ]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand, this is an array of samples, if this is true - this solution is not scalable when we want to mimic situation of thousands of requests.

Lets define array in the configuration as number of samples in the bucket. So array [10, 20, 30, 15] will mean that we have 10 samples in range (-Inf, 1], 20 samples in range (1, 2], 30 samples in range (2, 5], and 15 samples in range (5, 10].

Note that histogram in prometheus is cumulative, so for the test above /metrics will report:
(-Inf, 1] - 10
(1, 2] - 20
(2, 5] - 60
(5, 10] - 75
(10, 20] - 75
(20, 50] - 75
(50, 100] - 75
(100, 200] - 75
(200, 500] - 75
(500, 1000] - 75
(1000, +Inf) - 75

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm. 😄

request-generation-tokens: [ 50, 60, 40 ]
request-params-max-tokens: [ 128, 256, 512 ]
loras:
- '{"running":"lora1,lora2","waiting":"lora3","timestamp":1257894567}'
- '{"running":"lora1,lora3","waiting":"","timestamp":1257894569}'
Expand Down
60 changes: 60 additions & 0 deletions pkg/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,27 @@ const (
FailureTypeServerError = "server_error"
FailureTypeInvalidRequest = "invalid_request"
FailureTypeModelNotFound = "model_not_found"

StopFinishReason = "stop"
LengthFinishReason = "length"
ToolsFinishReason = "tool_calls"
RemoteDecodeFinishReason = "remote_decode"
)

var (
requiredFinishReasons = []string{
StopFinishReason,
LengthFinishReason,
ToolsFinishReason,
RemoteDecodeFinishReason,
}

validFinishReasons = map[string]struct{}{
StopFinishReason: {},
LengthFinishReason: {},
ToolsFinishReason: {},
RemoteDecodeFinishReason: {},
}
)

type Configuration struct {
Expand Down Expand Up @@ -223,6 +244,13 @@ type Metrics struct {
// 0.01, 0.025, 0.05, 0.075, 0.1, 0.15, 0.2, 0.3, 0.4, 0.5, 0.75,
// 1.0, 2.5, 5.0, 7.5, 10.0, 20.0, 40.0, 80.0, +Inf
TPOTBucketValues []int `yaml:"tpot-buckets-values" json:"tpot-buckets-values"`
// RequestPromptTokens RequestGenerationTokens RequestParamsMaxTokens Histogram fake-observation arrays for init.
// Each value will be passed to Observe() once at start-up.
RequestPromptTokens []int `yaml:"request-prompt-tokens" json:"request-prompt-tokens"` // prompt-length samples
RequestGenerationTokens []int `yaml:"request-generation-tokens" json:"request-generation-tokens"` // generation-length samples
RequestParamsMaxTokens []int `yaml:"request-params-max-tokens" json:"request-params-max-tokens"` // max_tokens parameter samples
// RequestSuccessTotal is the number of successful requests, key: finish-reason (stop, length, etc.).
RequestSuccessTotal map[string]int64 `yaml:"request-success-total" json:"request-success-total"`
}

type LorasMetrics struct {
Expand Down Expand Up @@ -521,6 +549,38 @@ func (c *Configuration) validate() error {
}
}
}
if c.FakeMetrics.RequestSuccessTotal != nil {
for reason, count := range c.FakeMetrics.RequestSuccessTotal {
if count < 0 {
return fmt.Errorf("fake metrics request-success-total.%s "+
"cannot be negative, got %d", reason, count)
}
if _, ok := validFinishReasons[reason]; !ok {
return fmt.Errorf("invalid finish reason in request-success-total: "+
"%s (valid reasons: %v)", reason, requiredFinishReasons)
}
}
for _, reason := range requiredFinishReasons {
if _, exists := c.FakeMetrics.RequestSuccessTotal[reason]; !exists {
c.FakeMetrics.RequestSuccessTotal[reason] = 0
}
}
}
for _, v := range c.FakeMetrics.RequestPromptTokens {
if v < 0 {
return errors.New("fake metrics request-prompt-tokens cannot contain negative values")
}
}
for _, v := range c.FakeMetrics.RequestGenerationTokens {
if v < 0 {
return errors.New("fake metrics request-generation-tokens cannot contain negative values")
}
}
for _, v := range c.FakeMetrics.RequestParamsMaxTokens {
if v < 0 {
return errors.New("fake metrics request-params-max-tokens cannot contain negative values")
}
}
}

if c.DPSize < 1 || c.DPSize > 8 {
Expand Down
13 changes: 11 additions & 2 deletions pkg/common/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,17 @@ var _ = Describe("Simulator configuration", func() {
"{\"running\":\"lora1,lora2\",\"waiting\":\"lora3\",\"timestamp\":1257894567}",
"{\"running\":\"lora1,lora3\",\"waiting\":\"\",\"timestamp\":1257894569}",
},
TTFTBucketValues: []int{10, 20, 30, 10},
TPOTBucketValues: []int{0, 0, 10, 20, 30},
TTFTBucketValues: []int{10, 20, 30, 10},
TPOTBucketValues: []int{0, 0, 10, 20, 30},
RequestPromptTokens: []int{10, 20, 30, 15},
RequestGenerationTokens: []int{50, 60, 40},
RequestParamsMaxTokens: []int{128, 256, 512},
RequestSuccessTotal: map[string]int64{
StopFinishReason: 20,
LengthFinishReason: 0,
ToolsFinishReason: 0,
RemoteDecodeFinishReason: 0,
},
}
test = testCase{
name: "config with fake metrics file",
Expand Down
147 changes: 145 additions & 2 deletions pkg/llm-d-inference-sim/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package llmdinferencesim

import (
"context"
"math"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -65,6 +66,7 @@ func (s *VllmSimulator) createAndRegisterPrometheus() error {
return err
}

// not supported for now, reports constant value
s.waitingRequests = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: "",
Expand Down Expand Up @@ -123,6 +125,61 @@ func (s *VllmSimulator) createAndRegisterPrometheus() error {
return err
}

s.requestPromptTokens = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: "",
Name: "vllm:request_prompt_tokens",
Help: "Number of prefill tokens processed.",
Buckets: build125Buckets(s.config.MaxModelLen),
},
[]string{vllmapi.PromLabelModelName},
)
if err := s.registry.Register(s.requestPromptTokens); err != nil {
s.logger.Error(err, "Prometheus request_prompt_tokens histogram register failed")
return err
}

s.requestGenerationTokens = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: "",
Name: "vllm:request_generation_tokens",
Help: "Number of generation tokens processed.",
Buckets: build125Buckets(s.config.MaxModelLen),
},
[]string{vllmapi.PromLabelModelName},
)
if err := s.registry.Register(s.requestGenerationTokens); err != nil {
s.logger.Error(err, "Prometheus request_generation_tokens histogram register failed")
return err
}

s.requestParamsMaxTokens = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: "",
Name: "vllm:request_params_max_tokens",
Help: "Histogram of the max_tokens request parameter.",
Buckets: build125Buckets(s.config.MaxModelLen),
},
[]string{vllmapi.PromLabelModelName},
)
if err := s.registry.Register(s.requestParamsMaxTokens); err != nil {
s.logger.Error(err, "Prometheus request_params_max_tokens histogram register failed")
return err
}

s.requestSuccessTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: "",
Name: "vllm:request_success_total",
Help: "Count of successfully processed requests.",
},
[]string{vllmapi.PromLabelModelName, vllmapi.PromLabelFinishReason},
)
if err := s.registry.Register(s.requestSuccessTotal); err != nil {
s.logger.Error(err, "Prometheus request_success_total counter register failed")
return err
}

s.setInitialPrometheusMetrics()

return nil
Expand All @@ -132,21 +189,34 @@ func (s *VllmSimulator) createAndRegisterPrometheus() error {
// the fake metrics if set
func (s *VllmSimulator) setInitialPrometheusMetrics() {
var nRunningReqs, nWaitingReqs, kvCacheUsage float64
modelName := s.getDisplayedModelName(s.config.Model)
if s.config.FakeMetrics != nil {
nRunningReqs = float64(s.config.FakeMetrics.RunningRequests)
nWaitingReqs = float64(s.config.FakeMetrics.WaitingRequests)
kvCacheUsage = float64(s.config.FakeMetrics.KVCacheUsagePercentage)

if s.config.FakeMetrics.TTFTBucketValues != nil {
s.initFakeHistogram(s.ttft, common.TTFTBucketsBoundaries, s.config.FakeMetrics.TTFTBucketValues)
}

if s.config.FakeMetrics.TPOTBucketValues != nil {
s.initFakeHistogram(s.tpot, common.TPOTBucketsBoundaries, s.config.FakeMetrics.TPOTBucketValues)
}
buckets := build125Buckets(s.config.MaxModelLen)
if s.config.FakeMetrics.RequestPromptTokens != nil {
s.initFakeHistogram(s.requestPromptTokens, buckets, s.config.FakeMetrics.RequestPromptTokens)
}
if s.config.FakeMetrics.RequestGenerationTokens != nil {
s.initFakeHistogram(s.requestParamsMaxTokens, buckets, s.config.FakeMetrics.RequestGenerationTokens)
}
if s.config.FakeMetrics.RequestParamsMaxTokens != nil {
s.initFakeHistogram(s.requestGenerationTokens, buckets, s.config.FakeMetrics.RequestParamsMaxTokens)
}

for reason, requestSuccessTotal := range s.config.FakeMetrics.RequestSuccessTotal {
s.requestSuccessTotal.WithLabelValues(modelName, reason).Add(float64(requestSuccessTotal))
}
}

modelName := s.getDisplayedModelName(s.config.Model)
s.runningRequests.WithLabelValues(modelName).Set(nRunningReqs)
s.waitingRequests.WithLabelValues(modelName).Set(nWaitingReqs)
s.kvCacheUsagePercentage.WithLabelValues(modelName).Set(kvCacheUsage)
Expand Down Expand Up @@ -288,6 +358,7 @@ func (s *VllmSimulator) startMetricsUpdaters(ctx context.Context) {
go s.kvCacheUsageUpdater(ctx)
go s.ttftUpdater(ctx)
go s.tpotUpdater(ctx)
go s.recordRequestUpdater(ctx)
}

// waitingRequestsUpdater updates the waiting requests metric by listening on the relevant channel
Expand Down Expand Up @@ -396,3 +467,75 @@ func (s *VllmSimulator) decrementLoraRefCount(lora string, theMap *sync.Map) {
s.logger.Error(nil, "Zero model reference", "model", lora)
}
}

// recordRequestUpdater listens on requestSuccessChan and drives the Prometheus metric
// for successfully completed requests.
func (s *VllmSimulator) recordRequestUpdater(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case event := <-s.requestSuccessChan:
s.recordRequestMetricsOnSuccess(
event.promptTokens,
event.generationTokens,
event.maxTokens,
event.finishReason,
)
}
}
}

// requestSuccessEvent represents the data associated with a successfully completed request,
// which is sent through the requestSuccessChan for asynchronous metrics recording.
type requestSuccessEvent struct {
// promptTokens is the number of input (prompt) tokens in the request
promptTokens int
// generationTokens is the number of generated (output) tokens in the response
generationTokens int
// maxTokens is the maximum number of tokens allowed for generation (if specified in the request)
maxTokens *int64
// finishReason indicates why the generation stopped (e.g., "stop", "length", "tool_calls")
finishReason string
}

// recordRequestMetricsOnSuccess records metrics for a successfully completed request
func (s *VllmSimulator) recordRequestMetricsOnSuccess(promptTokens,
generationTokens int, maxTokens *int64, finishReason string) {
modelName := s.getDisplayedModelName(s.config.Model)
s.requestPromptTokens.WithLabelValues(modelName).Observe(float64(promptTokens))
s.requestGenerationTokens.WithLabelValues(modelName).Observe(float64(generationTokens))
if maxTokens != nil {
s.requestParamsMaxTokens.WithLabelValues(modelName).Observe(float64(*maxTokens))
}
s.requestSuccessTotal.WithLabelValues(modelName, finishReason).Inc()
}

// build125Buckets generates histogram buckets in powers of 10 scaled by [1,2,5].
// This matches vLLM's build_1_2_5_buckets() in metrics.py.
//
// Reference: https://github.com/vllm-project/vllm/blob/main/vllm/engine/metrics.py#L175
func build125Buckets(maxValue int) []float64 {
if maxValue <= 0 {
return []float64{}
}
var buckets []float64
exponent := 0
mantissa := []int{1, 2, 5}

for {
complete := true
for _, m := range mantissa {
value := m * int(math.Pow10(exponent))
if value <= maxValue {
buckets = append(buckets, float64(value))
complete = false
}
}
if complete {
break
}
exponent++
}
return buckets
}
Loading
Loading