diff --git a/config/config.yaml b/config/config.yaml index 95df80fa..7db18883 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -244,4 +244,15 @@ api: batch_classification: max_batch_size: 100 # Maximum number of texts in a single batch concurrency_threshold: 5 # Switch to concurrent processing when batch size > this value - max_concurrency: 8 # Maximum number of concurrent goroutines \ No newline at end of file + max_concurrency: 8 # Maximum number of concurrent goroutines + + # Metrics configuration for monitoring batch classification performance + metrics: + enabled: true # Enable comprehensive metrics collection + detailed_goroutine_tracking: true # Track individual goroutine lifecycle + high_resolution_timing: false # Use nanosecond precision timing + sample_rate: 1.0 # Collect metrics for all requests (1.0 = 100%, 0.5 = 50%) + + # Histogram buckets for metrics (directly configure what you need) + duration_buckets: [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30] + size_buckets: [1, 2, 5, 10, 20, 50, 100, 200] diff --git a/src/semantic-router/pkg/api/server.go b/src/semantic-router/pkg/api/server.go index 5520b2ba..47c64464 100644 --- a/src/semantic-router/pkg/api/server.go +++ b/src/semantic-router/pkg/api/server.go @@ -11,6 +11,7 @@ import ( "time" "github.com/vllm-project/semantic-router/semantic-router/pkg/config" + "github.com/vllm-project/semantic-router/semantic-router/pkg/metrics" "github.com/vllm-project/semantic-router/semantic-router/pkg/services" ) @@ -91,6 +92,20 @@ func StartClassificationAPI(configPath string, port int) error { classificationSvc = services.NewPlaceholderClassificationService() } + // Initialize batch metrics configuration + if cfg != nil && cfg.API.BatchClassification.Metrics.Enabled { + metricsConfig := metrics.BatchMetricsConfig{ + Enabled: cfg.API.BatchClassification.Metrics.Enabled, + DetailedGoroutineTracking: cfg.API.BatchClassification.Metrics.DetailedGoroutineTracking, + DurationBuckets: cfg.API.BatchClassification.Metrics.DurationBuckets, + SizeBuckets: cfg.API.BatchClassification.Metrics.SizeBuckets, + BatchSizeRanges: cfg.API.BatchClassification.Metrics.BatchSizeRanges, + HighResolutionTiming: cfg.API.BatchClassification.Metrics.HighResolutionTiming, + SampleRate: cfg.API.BatchClassification.Metrics.SampleRate, + } + metrics.SetBatchMetricsConfig(metricsConfig) + } + // Create server instance apiServer := &ClassificationAPIServer{ classificationSvc: classificationSvc, @@ -231,6 +246,8 @@ func (s *ClassificationAPIServer) handleBatchClassification(w http.ResponseWrite // Input validation if len(req.Texts) == 0 { + // Record validation error in metrics + metrics.RecordBatchClassificationError("validation", "empty_texts") s.writeErrorResponse(w, http.StatusBadRequest, "INVALID_INPUT", "texts array cannot be empty") return } @@ -242,6 +259,8 @@ func (s *ClassificationAPIServer) handleBatchClassification(w http.ResponseWrite } if len(req.Texts) > maxBatchSize { + // Record validation error in metrics + metrics.RecordBatchClassificationError("validation", "batch_too_large") s.writeErrorResponse(w, http.StatusBadRequest, "BATCH_TOO_LARGE", fmt.Sprintf("batch size cannot exceed %d texts", maxBatchSize)) return @@ -494,10 +513,26 @@ func (s *ClassificationAPIServer) getSystemInfo() SystemInfo { // processSequentially handles small batches with sequential processing func (s *ClassificationAPIServer) processSequentially(texts []string, options *ClassificationOptions) ([]services.Classification, error) { + start := time.Now() + processingType := "sequential" + batchSize := len(texts) + + // Record request and batch size metrics + metrics.RecordBatchClassificationRequest(processingType) + metrics.RecordBatchSizeDistribution(processingType, batchSize) + + // Defer recording processing time and text count + defer func() { + duration := time.Since(start).Seconds() + metrics.RecordBatchClassificationDuration(processingType, batchSize, duration) + metrics.RecordBatchClassificationTexts(processingType, batchSize) + }() + results := make([]services.Classification, len(texts)) for i, text := range texts { result, err := s.classifySingleText(text, options) if err != nil { + metrics.RecordBatchClassificationError(processingType, "classification_failed") return nil, fmt.Errorf("failed to classify text at index %d: %w", i, err) } results[i] = result @@ -507,6 +542,22 @@ func (s *ClassificationAPIServer) processSequentially(texts []string, options *C // processConcurrently handles large batches with concurrent processing func (s *ClassificationAPIServer) processConcurrently(texts []string, options *ClassificationOptions) ([]services.Classification, error) { + start := time.Now() + processingType := "concurrent" + batchSize := len(texts) + batchID := fmt.Sprintf("batch_%d", time.Now().UnixNano()) + + // Record request and batch size metrics + metrics.RecordBatchClassificationRequest(processingType) + metrics.RecordBatchSizeDistribution(processingType, batchSize) + + // Defer recording processing time and text count + defer func() { + duration := time.Since(start).Seconds() + metrics.RecordBatchClassificationDuration(processingType, batchSize, duration) + metrics.RecordBatchClassificationTexts(processingType, batchSize) + }() + // Get max concurrency from config, default to 8 maxConcurrency := 8 if s.config != nil && s.config.API.BatchClassification.MaxConcurrency > 0 { @@ -523,6 +574,18 @@ func (s *ClassificationAPIServer) processConcurrently(texts []string, options *C wg.Add(1) go func(index int, txt string) { defer wg.Done() + + // Record goroutine start (if detailed tracking is enabled) + metricsConfig := metrics.GetBatchMetricsConfig() + if metricsConfig.DetailedGoroutineTracking { + metrics.ConcurrentGoroutines.WithLabelValues(batchID).Inc() + + defer func() { + // Record goroutine end + metrics.ConcurrentGoroutines.WithLabelValues(batchID).Dec() + }() + } + semaphore <- struct{}{} defer func() { <-semaphore }() @@ -532,6 +595,7 @@ func (s *ClassificationAPIServer) processConcurrently(texts []string, options *C result, err := s.classifySingleText(txt, options) if err != nil { errors[index] = err + metrics.RecordBatchClassificationError(processingType, "classification_failed") return } results[index] = result diff --git a/src/semantic-router/pkg/api/server_test.go b/src/semantic-router/pkg/api/server_test.go index bea2f817..243edcb8 100644 --- a/src/semantic-router/pkg/api/server_test.go +++ b/src/semantic-router/pkg/api/server_test.go @@ -217,13 +217,17 @@ func TestBatchClassificationConfiguration(t *testing.T) { config: &config.RouterConfig{ API: config.APIConfig{ BatchClassification: struct { - MaxBatchSize int `yaml:"max_batch_size,omitempty"` - ConcurrencyThreshold int `yaml:"concurrency_threshold,omitempty"` - MaxConcurrency int `yaml:"max_concurrency,omitempty"` + MaxBatchSize int `yaml:"max_batch_size,omitempty"` + ConcurrencyThreshold int `yaml:"concurrency_threshold,omitempty"` + MaxConcurrency int `yaml:"max_concurrency,omitempty"` + Metrics config.BatchClassificationMetricsConfig `yaml:"metrics,omitempty"` }{ MaxBatchSize: 3, // Custom small limit ConcurrencyThreshold: 2, MaxConcurrency: 4, + Metrics: config.BatchClassificationMetricsConfig{ + Enabled: true, + }, }, }, }, @@ -253,13 +257,17 @@ func TestBatchClassificationConfiguration(t *testing.T) { config: &config.RouterConfig{ API: config.APIConfig{ BatchClassification: struct { - MaxBatchSize int `yaml:"max_batch_size,omitempty"` - ConcurrencyThreshold int `yaml:"concurrency_threshold,omitempty"` - MaxConcurrency int `yaml:"max_concurrency,omitempty"` + MaxBatchSize int `yaml:"max_batch_size,omitempty"` + ConcurrencyThreshold int `yaml:"concurrency_threshold,omitempty"` + MaxConcurrency int `yaml:"max_concurrency,omitempty"` + Metrics config.BatchClassificationMetricsConfig `yaml:"metrics,omitempty"` }{ MaxBatchSize: 10, ConcurrencyThreshold: 3, MaxConcurrency: 2, + Metrics: config.BatchClassificationMetricsConfig{ + Enabled: true, + }, }, }, }, diff --git a/src/semantic-router/pkg/config/config.go b/src/semantic-router/pkg/config/config.go index d95d921f..4a8aa580 100644 --- a/src/semantic-router/pkg/config/config.go +++ b/src/semantic-router/pkg/config/config.go @@ -94,9 +94,42 @@ type APIConfig struct { // Maximum number of concurrent goroutines for batch processing MaxConcurrency int `yaml:"max_concurrency,omitempty"` + + // Metrics configuration for batch classification monitoring + Metrics BatchClassificationMetricsConfig `yaml:"metrics,omitempty"` } `yaml:"batch_classification"` } +// BatchClassificationMetricsConfig represents configuration for batch classification metrics +type BatchClassificationMetricsConfig struct { + // Sample rate for metrics collection (0.0-1.0, 1.0 means collect all metrics) + SampleRate float64 `yaml:"sample_rate,omitempty"` + + // Batch size range labels for metrics (optional - uses sensible defaults if not specified) + // Default ranges: "1", "2-5", "6-10", "11-20", "21-50", "50+" + BatchSizeRanges []BatchSizeRangeConfig `yaml:"batch_size_ranges,omitempty"` + + // Histogram buckets for metrics (directly configured) + DurationBuckets []float64 `yaml:"duration_buckets,omitempty"` + SizeBuckets []float64 `yaml:"size_buckets,omitempty"` + + // Enable detailed metrics collection + Enabled bool `yaml:"enabled,omitempty"` + + // Enable detailed goroutine tracking (may impact performance) + DetailedGoroutineTracking bool `yaml:"detailed_goroutine_tracking,omitempty"` + + // Enable high-resolution timing (nanosecond precision) + HighResolutionTiming bool `yaml:"high_resolution_timing,omitempty"` +} + +// BatchSizeRangeConfig defines a batch size range with its boundaries and label +type BatchSizeRangeConfig struct { + Min int `yaml:"min"` + Max int `yaml:"max"` // -1 means no upper limit + Label string `yaml:"label"` +} + // PromptGuardConfig represents configuration for the prompt guard jailbreak detection type PromptGuardConfig struct { // Enable prompt guard jailbreak detection diff --git a/src/semantic-router/pkg/config/config_test.go b/src/semantic-router/pkg/config/config_test.go index 856ecfde..36338c66 100644 --- a/src/semantic-router/pkg/config/config_test.go +++ b/src/semantic-router/pkg/config/config_test.go @@ -6,10 +6,12 @@ import ( "sync" "testing" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" + "gopkg.in/yaml.v3" "github.com/vllm-project/semantic-router/semantic-router/pkg/config" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" ) func TestConfig(t *testing.T) { @@ -130,7 +132,7 @@ tools: tools_db_path: "/path/to/tools.json" fallback_to_empty: true ` - err := os.WriteFile(configFile, []byte(validConfig), 0644) + err := os.WriteFile(configFile, []byte(validConfig), 0o644) Expect(err).NotTo(HaveOccurred()) }) @@ -228,7 +230,7 @@ bert_model: model_id: "test-model" invalid: [ unclosed array ` - err := os.WriteFile(configFile, []byte(invalidYAML), 0644) + err := os.WriteFile(configFile, []byte(invalidYAML), 0o644) Expect(err).NotTo(HaveOccurred()) }) @@ -242,7 +244,7 @@ bert_model: Context("with empty config file", func() { BeforeEach(func() { - err := os.WriteFile(configFile, []byte(""), 0644) + err := os.WriteFile(configFile, []byte(""), 0o644) Expect(err).NotTo(HaveOccurred()) }) @@ -263,7 +265,7 @@ bert_model: threshold: 0.8 default_model: "model-b" ` - err := os.WriteFile(configFile, []byte(validConfig), 0644) + err := os.WriteFile(configFile, []byte(validConfig), 0o644) Expect(err).NotTo(HaveOccurred()) }) @@ -308,7 +310,7 @@ bert_model: semantic_cache: similarity_threshold: 0.9 ` - err := os.WriteFile(configFile, []byte(configContent), 0644) + err := os.WriteFile(configFile, []byte(configContent), 0o644) Expect(err).NotTo(HaveOccurred()) }) @@ -329,7 +331,7 @@ bert_model: semantic_cache: enabled: true ` - err := os.WriteFile(configFile, []byte(configContent), 0644) + err := os.WriteFile(configFile, []byte(configContent), 0o644) Expect(err).NotTo(HaveOccurred()) }) @@ -359,7 +361,7 @@ categories: score: 0.95 default_model: "default-model" ` - err := os.WriteFile(configFile, []byte(configContent), 0644) + err := os.WriteFile(configFile, []byte(configContent), 0o644) Expect(err).NotTo(HaveOccurred()) }) @@ -402,7 +404,7 @@ categories: model_scores: [] default_model: "fallback-model" ` - err := os.WriteFile(configFile, []byte(configContent), 0644) + err := os.WriteFile(configFile, []byte(configContent), 0o644) Expect(err).NotTo(HaveOccurred()) }) @@ -430,7 +432,7 @@ model_config: "unconfigured-model": param_count: 1000000 ` - err := os.WriteFile(configFile, []byte(configContent), 0644) + err := os.WriteFile(configFile, []byte(configContent), 0o644) Expect(err).NotTo(HaveOccurred()) }) @@ -525,7 +527,7 @@ classifier: model_id: "pii-model" pii_mapping_path: "/path/to/pii.json" ` - err := os.WriteFile(configFile, []byte(configContent), 0644) + err := os.WriteFile(configFile, []byte(configContent), 0o644) Expect(err).NotTo(HaveOccurred()) cfg, err := config.LoadConfig(configFile) @@ -540,7 +542,7 @@ classifier: pii_model: pii_mapping_path: "/path/to/pii.json" ` - err := os.WriteFile(configFile, []byte(configContent), 0644) + err := os.WriteFile(configFile, []byte(configContent), 0o644) Expect(err).NotTo(HaveOccurred()) cfg, err := config.LoadConfig(configFile) @@ -555,7 +557,7 @@ classifier: pii_model: model_id: "pii-model" ` - err := os.WriteFile(configFile, []byte(configContent), 0644) + err := os.WriteFile(configFile, []byte(configContent), 0o644) Expect(err).NotTo(HaveOccurred()) cfg, err := config.LoadConfig(configFile) @@ -573,7 +575,7 @@ classifier: model_id: "category-model" category_mapping_path: "/path/to/category.json" ` - err := os.WriteFile(configFile, []byte(configContent), 0644) + err := os.WriteFile(configFile, []byte(configContent), 0o644) Expect(err).NotTo(HaveOccurred()) cfg, err := config.LoadConfig(configFile) @@ -584,7 +586,7 @@ classifier: It("should return false when not configured", func() { // Create an empty config file - err := os.WriteFile(configFile, []byte(""), 0644) + err := os.WriteFile(configFile, []byte(""), 0o644) Expect(err).NotTo(HaveOccurred()) cfg, err := config.LoadConfig(configFile) @@ -602,7 +604,7 @@ prompt_guard: model_id: "jailbreak-model" jailbreak_mapping_path: "/path/to/jailbreak.json" ` - err := os.WriteFile(configFile, []byte(configContent), 0644) + err := os.WriteFile(configFile, []byte(configContent), 0o644) Expect(err).NotTo(HaveOccurred()) cfg, err := config.LoadConfig(configFile) @@ -618,7 +620,7 @@ prompt_guard: model_id: "jailbreak-model" jailbreak_mapping_path: "/path/to/jailbreak.json" ` - err := os.WriteFile(configFile, []byte(configContent), 0644) + err := os.WriteFile(configFile, []byte(configContent), 0o644) Expect(err).NotTo(HaveOccurred()) cfg, err := config.LoadConfig(configFile) @@ -633,7 +635,7 @@ prompt_guard: enabled: true jailbreak_mapping_path: "/path/to/jailbreak.json" ` - err := os.WriteFile(configFile, []byte(configContent), 0644) + err := os.WriteFile(configFile, []byte(configContent), 0o644) Expect(err).NotTo(HaveOccurred()) cfg, err := config.LoadConfig(configFile) @@ -653,7 +655,7 @@ model_config: batch_size: 32 context_size: 4096 ` - err := os.WriteFile(configFile, []byte(configContent), 0644) + err := os.WriteFile(configFile, []byte(configContent), 0o644) Expect(err).NotTo(HaveOccurred()) }) @@ -722,7 +724,7 @@ categories: - name: "category2" description: "Description for category 2" ` - err := os.WriteFile(configFile, []byte(configContent), 0644) + err := os.WriteFile(configFile, []byte(configContent), 0o644) Expect(err).NotTo(HaveOccurred()) }) @@ -748,7 +750,7 @@ categories: - name: "category2" # No description field ` - err := os.WriteFile(configFile, []byte(configContent), 0644) + err := os.WriteFile(configFile, []byte(configContent), 0o644) Expect(err).NotTo(HaveOccurred()) }) @@ -768,7 +770,7 @@ categories: Context("with no categories", func() { It("should return empty slice", func() { // Create an empty config file - err := os.WriteFile(configFile, []byte(""), 0644) + err := os.WriteFile(configFile, []byte(""), 0o644) Expect(err).NotTo(HaveOccurred()) cfg, err := config.LoadConfig(configFile) @@ -789,7 +791,7 @@ semantic_cache: max_entries: 0 ttl_seconds: 0 ` - err := os.WriteFile(configFile, []byte(configContent), 0644) + err := os.WriteFile(configFile, []byte(configContent), 0o644) Expect(err).NotTo(HaveOccurred()) cfg, err := config.LoadConfig(configFile) @@ -808,7 +810,7 @@ gpu_config: flops: 1e20 hbm: 1e15 ` - err := os.WriteFile(configFile, []byte(configContent), 0644) + err := os.WriteFile(configFile, []byte(configContent), 0o644) Expect(err).NotTo(HaveOccurred()) cfg, err := config.LoadConfig(configFile) @@ -826,7 +828,7 @@ categories: - name: "category with spaces" description: "Description with special chars: @#$%^&*()" ` - err := os.WriteFile(configFile, []byte(configContent), 0644) + err := os.WriteFile(configFile, []byte(configContent), 0o644) Expect(err).NotTo(HaveOccurred()) cfg, err := config.LoadConfig(configFile) @@ -881,7 +883,7 @@ categories: default_model: "model-b" ` - err := os.WriteFile(configFile, []byte(configContent), 0644) + err := os.WriteFile(configFile, []byte(configContent), 0o644) Expect(err).NotTo(HaveOccurred()) }) @@ -1017,7 +1019,7 @@ categories: default_model: "existing-model" ` - err := os.WriteFile(configFile, []byte(configContent), 0644) + err := os.WriteFile(configFile, []byte(configContent), 0o644) Expect(err).NotTo(HaveOccurred()) cfg, err := config.LoadConfig(configFile) @@ -1041,7 +1043,7 @@ vllm_endpoints: default_model: "missing-default-model" ` - err := os.WriteFile(configFile, []byte(configContent), 0644) + err := os.WriteFile(configFile, []byte(configContent), 0o644) Expect(err).NotTo(HaveOccurred()) cfg, err := config.LoadConfig(configFile) @@ -1087,4 +1089,90 @@ default_model: "missing-default-model" Expect(config.PIITypeEmailAddress).To(Equal("EMAIL_ADDRESS")) }) }) + + // Test batch classification metrics configuration + Describe("Batch Classification Metrics Configuration", func() { + It("should parse batch classification metrics configuration correctly", func() { + yamlContent := ` +api: + batch_classification: + max_batch_size: 50 + concurrency_threshold: 3 + max_concurrency: 6 + metrics: + enabled: true + detailed_goroutine_tracking: false + high_resolution_timing: true + sample_rate: 0.8 + duration_buckets: [0.01, 0.1, 1.0, 10.0] + size_buckets: [5, 15, 25, 75] +` + + var cfg config.RouterConfig + err := yaml.Unmarshal([]byte(yamlContent), &cfg) + Expect(err).NotTo(HaveOccurred()) + + // Verify batch classification configuration + batchConfig := cfg.API.BatchClassification + Expect(batchConfig.MaxBatchSize).To(Equal(50)) + Expect(batchConfig.ConcurrencyThreshold).To(Equal(3)) + Expect(batchConfig.MaxConcurrency).To(Equal(6)) + + // Verify metrics configuration + metricsConfig := batchConfig.Metrics + Expect(metricsConfig.Enabled).To(BeTrue()) + Expect(metricsConfig.DetailedGoroutineTracking).To(BeFalse()) + Expect(metricsConfig.HighResolutionTiming).To(BeTrue()) + Expect(metricsConfig.SampleRate).To(Equal(0.8)) + + // Verify custom buckets + Expect(metricsConfig.DurationBuckets).To(Equal([]float64{0.01, 0.1, 1.0, 10.0})) + Expect(metricsConfig.SizeBuckets).To(Equal([]float64{5, 15, 25, 75})) + }) + + It("should handle missing metrics configuration with defaults", func() { + yamlContent := ` +api: + batch_classification: + max_batch_size: 100 +` + + var cfg config.RouterConfig + err := yaml.Unmarshal([]byte(yamlContent), &cfg) + Expect(err).NotTo(HaveOccurred()) + + // Verify that missing metrics configuration doesn't cause errors + batchConfig := cfg.API.BatchClassification + Expect(batchConfig.MaxBatchSize).To(Equal(100)) + + // Metrics should have zero values (will be handled by defaults in application) + metricsConfig := batchConfig.Metrics + Expect(metricsConfig.Enabled).To(BeFalse()) // Default zero value + Expect(metricsConfig.SampleRate).To(Equal(0.0)) // Default zero value + }) + + It("should handle partial metrics configuration", func() { + yamlContent := ` +api: + batch_classification: + metrics: + enabled: true + sample_rate: 0.5 +` + + var cfg config.RouterConfig + err := yaml.Unmarshal([]byte(yamlContent), &cfg) + Expect(err).NotTo(HaveOccurred()) + + metricsConfig := cfg.API.BatchClassification.Metrics + Expect(metricsConfig.Enabled).To(BeTrue()) + Expect(metricsConfig.SampleRate).To(Equal(0.5)) + + // Other fields should have zero values + Expect(metricsConfig.DetailedGoroutineTracking).To(BeFalse()) + Expect(metricsConfig.HighResolutionTiming).To(BeFalse()) + Expect(len(metricsConfig.DurationBuckets)).To(Equal(0)) + Expect(len(metricsConfig.SizeBuckets)).To(Equal(0)) + }) + }) }) diff --git a/src/semantic-router/pkg/metrics/metrics.go b/src/semantic-router/pkg/metrics/metrics.go index c6d25891..ad3f9b41 100644 --- a/src/semantic-router/pkg/metrics/metrics.go +++ b/src/semantic-router/pkg/metrics/metrics.go @@ -1,10 +1,97 @@ package metrics import ( + "fmt" + "math/rand" + "sync" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/vllm-project/semantic-router/semantic-router/pkg/config" ) +// Minimal fallback bucket configurations - used only when configuration is completely missing +var ( + // Basic fallback buckets for emergency use when config.yaml is unavailable + FallbackDurationBuckets = []float64{0.001, 0.01, 0.1, 1, 10} + FallbackSizeBuckets = []float64{1, 10, 100} +) + +// Configuration constants +const ( + DefaultSampleRate = 1.0 + MinSampleRate = 0.0 + MaxSampleRate = 1.0 +) + +// BatchMetricsConfig represents configuration for batch classification metrics +type BatchMetricsConfig struct { + SampleRate float64 `yaml:"sample_rate"` + DurationBuckets []float64 `yaml:"duration_buckets"` + SizeBuckets []float64 `yaml:"size_buckets"` + BatchSizeRanges []config.BatchSizeRangeConfig `yaml:"batch_size_ranges"` + + // Boolean fields grouped together to minimize padding + Enabled bool `yaml:"enabled"` + DetailedGoroutineTracking bool `yaml:"detailed_goroutine_tracking"` + HighResolutionTiming bool `yaml:"high_resolution_timing"` +} + +// Global configuration for batch metrics +var ( + batchMetricsConfig BatchMetricsConfig + configMutex sync.RWMutex + metricsInitOnce sync.Once +) + +// SetBatchMetricsConfig sets the configuration for batch classification metrics +func SetBatchMetricsConfig(config BatchMetricsConfig) { + configMutex.Lock() + defer configMutex.Unlock() + + batchMetricsConfig = config + + // Set default values if not provided + if batchMetricsConfig.SampleRate <= MinSampleRate { + batchMetricsConfig.SampleRate = DefaultSampleRate + } + if len(batchMetricsConfig.DurationBuckets) == 0 { + batchMetricsConfig.DurationBuckets = FallbackDurationBuckets + } + if len(batchMetricsConfig.SizeBuckets) == 0 { + batchMetricsConfig.SizeBuckets = FallbackSizeBuckets + } + + // Initialize metrics with the configuration + InitializeBatchMetrics(batchMetricsConfig) +} + +// GetBatchMetricsConfig returns the current batch metrics configuration +func GetBatchMetricsConfig() BatchMetricsConfig { + configMutex.RLock() + defer configMutex.RUnlock() + return batchMetricsConfig +} + +// shouldCollectMetric determines if a metric should be collected based on sample rate +func shouldCollectMetric() bool { + configMutex.RLock() + sampleRate := batchMetricsConfig.SampleRate + enabled := batchMetricsConfig.Enabled + configMutex.RUnlock() + + if !enabled { + return false + } + + if sampleRate >= 1.0 { + return true + } + + return rand.Float64() < sampleRate +} + var ( // ModelRequests tracks the number of requests made to each model ModelRequests = promauto.NewCounterVec( @@ -169,3 +256,210 @@ func RecordPIIViolations(model string, piiTypes []string) { func RecordClassifierLatency(classifier string, seconds float64) { ClassifierLatency.WithLabelValues(classifier).Observe(seconds) } + +// Batch Classification Metrics - Dynamically initialized based on configuration +var ( + BatchClassificationRequests *prometheus.CounterVec + BatchClassificationDuration *prometheus.HistogramVec + BatchClassificationTexts *prometheus.CounterVec + BatchClassificationErrors *prometheus.CounterVec + ConcurrentGoroutines *prometheus.GaugeVec + BatchSizeDistribution *prometheus.HistogramVec +) + +// Default batch size ranges - used only when configuration is missing +var DefaultBatchSizeRanges = []config.BatchSizeRangeConfig{ + {Min: 1, Max: 1, Label: "1"}, + {Min: 2, Max: 5, Label: "2-5"}, + {Min: 6, Max: 10, Label: "6-10"}, + {Min: 11, Max: 20, Label: "11-20"}, + {Min: 21, Max: 50, Label: "21-50"}, + {Min: 51, Max: -1, Label: "50+"}, // -1 means no upper limit +} + +// Uses ranges from configuration file +func GetBatchSizeRange(size int) string { + config := GetBatchMetricsConfig() + ranges := config.BatchSizeRanges + + // Use default ranges if not configured + if len(ranges) == 0 { + ranges = DefaultBatchSizeRanges + } + + // Find the appropriate range for the given size + for _, r := range ranges { + if size >= r.Min && (r.Max == -1 || size <= r.Max) { + return r.Label + } + } + + // Fallback for unexpected cases + return "unknown" +} + +// GetBatchSizeRangeFromBuckets generates range labels based on size buckets +func GetBatchSizeRangeFromBuckets(size int, buckets []float64) string { + if len(buckets) == 0 { + return GetBatchSizeRange(size) // fallback to default ranges + } + + sizeFloat := float64(size) + + // Find which bucket this size falls into + for i, bucket := range buckets { + if sizeFloat <= bucket { + if i == 0 { + return fmt.Sprintf("≤%.0f", bucket) + } + prevBucket := buckets[i-1] + if prevBucket == bucket-1 { + return fmt.Sprintf("%.0f", bucket) + } + return fmt.Sprintf("%.0f-%.0f", prevBucket+1, bucket) + } + } + + // Size is larger than the largest bucket + lastBucket := buckets[len(buckets)-1] + return fmt.Sprintf("%.0f+", lastBucket) +} + +// RecordBatchClassificationRequest increments the counter for batch classification requests +func RecordBatchClassificationRequest(processingType string) { + if !shouldCollectMetric() { + return + } + BatchClassificationRequests.WithLabelValues(processingType).Inc() +} + +// RecordBatchClassificationDuration records the duration of batch classification processing +func RecordBatchClassificationDuration(processingType string, batchSize int, duration float64) { + if !shouldCollectMetric() { + return + } + + // Use configured range labels from config.yaml + batchSizeRange := GetBatchSizeRange(batchSize) + BatchClassificationDuration.WithLabelValues(processingType, batchSizeRange).Observe(duration) +} + +// RecordBatchClassificationTexts adds the number of texts processed in batch classification +func RecordBatchClassificationTexts(processingType string, count int) { + if !shouldCollectMetric() { + return + } + BatchClassificationTexts.WithLabelValues(processingType).Add(float64(count)) +} + +// RecordBatchClassificationError increments the counter for batch classification errors +func RecordBatchClassificationError(processingType, errorType string) { + if !shouldCollectMetric() { + return + } + BatchClassificationErrors.WithLabelValues(processingType, errorType).Inc() +} + +// RecordBatchSizeDistribution records the distribution of batch sizes +func RecordBatchSizeDistribution(processingType string, batchSize int) { + if !shouldCollectMetric() { + return + } + BatchSizeDistribution.WithLabelValues(processingType).Observe(float64(batchSize)) +} + +// GenerateExponentialBuckets creates exponential histogram buckets +func GenerateExponentialBuckets(start, factor float64, count int) []float64 { + buckets := make([]float64, count) + buckets[0] = start + for i := 1; i < count; i++ { + buckets[i] = buckets[i-1] * factor + } + return buckets +} + +// GenerateLinearBuckets creates linear histogram buckets +func GenerateLinearBuckets(start, width float64, count int) []float64 { + buckets := make([]float64, count) + for i := 0; i < count; i++ { + buckets[i] = start + float64(i)*width + } + return buckets +} + +// GetBucketsFromConfig returns buckets from configuration +func GetBucketsFromConfig(config BatchMetricsConfig) (durationBuckets, sizeBuckets []float64) { + // Use configured buckets or fallback to defaults + if len(config.DurationBuckets) > 0 { + durationBuckets = config.DurationBuckets + } else { + durationBuckets = FallbackDurationBuckets + } + + if len(config.SizeBuckets) > 0 { + sizeBuckets = config.SizeBuckets + } else { + sizeBuckets = FallbackSizeBuckets + } + + return durationBuckets, sizeBuckets +} + +// InitializeBatchMetrics initializes batch classification metrics with custom bucket configurations +func InitializeBatchMetrics(config BatchMetricsConfig) { + metricsInitOnce.Do(func() { + // Get buckets from configuration + durationBuckets, sizeBuckets := GetBucketsFromConfig(config) + + // Initialize metrics with configuration-driven buckets + BatchClassificationRequests = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "batch_classification_requests_total", + Help: "Total number of batch classification requests", + }, + []string{"processing_type"}, + ) + + BatchClassificationDuration = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "batch_classification_duration_seconds", + Help: "Duration of batch classification processing", + Buckets: durationBuckets, + }, + []string{"processing_type", "batch_size_range"}, + ) + + BatchClassificationTexts = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "batch_classification_texts_total", + Help: "Total number of texts processed in batch classification", + }, + []string{"processing_type"}, + ) + + BatchClassificationErrors = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "batch_classification_errors_total", + Help: "Total number of batch classification errors", + }, + []string{"processing_type", "error_type"}, + ) + + ConcurrentGoroutines = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "batch_classification_concurrent_goroutines", + Help: "Number of active goroutines in concurrent batch processing", + }, + []string{"batch_id"}, + ) + + BatchSizeDistribution = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "batch_classification_size_distribution", + Help: "Distribution of batch sizes", + Buckets: sizeBuckets, + }, + []string{"processing_type"}, + ) + }) +} diff --git a/src/semantic-router/pkg/metrics/metrics_test.go b/src/semantic-router/pkg/metrics/metrics_test.go new file mode 100644 index 00000000..c84c8f18 --- /dev/null +++ b/src/semantic-router/pkg/metrics/metrics_test.go @@ -0,0 +1,257 @@ +package metrics + +import ( + "testing" + "time" + + "github.com/vllm-project/semantic-router/semantic-router/pkg/config" +) + +// TestMain ensures metrics are initialized before running tests +func TestMain(m *testing.M) { + // Initialize metrics with default configuration for testing + config := BatchMetricsConfig{ + Enabled: true, + DetailedGoroutineTracking: true, + HighResolutionTiming: false, + SampleRate: 1.0, + DurationBuckets: FallbackDurationBuckets, + SizeBuckets: FallbackSizeBuckets, + BatchSizeRanges: []config.BatchSizeRangeConfig{ + {Min: 1, Max: 1, Label: "1"}, + {Min: 2, Max: 5, Label: "2-5"}, + {Min: 6, Max: 10, Label: "6-10"}, + {Min: 11, Max: 20, Label: "11-20"}, + {Min: 21, Max: 50, Label: "21-50"}, + {Min: 51, Max: -1, Label: "50+"}, + }, + } + + // Initialize batch metrics + InitializeBatchMetrics(config) + SetBatchMetricsConfig(config) + + // Run tests + m.Run() +} + +// TestBatchClassificationMetrics tests the batch classification metrics recording +func TestBatchClassificationMetrics(t *testing.T) { + tests := []struct { + name string + processingType string + batchSize int + duration float64 + errorType string + expectError bool + }{ + { + name: "Sequential processing metrics", + processingType: "sequential", + batchSize: 3, + duration: 0.5, + errorType: "", + expectError: false, + }, + { + name: "Concurrent processing metrics", + processingType: "concurrent", + batchSize: 10, + duration: 1.2, + errorType: "", + expectError: false, + }, + { + name: "Error case metrics", + processingType: "sequential", + batchSize: 5, + duration: 0.3, + errorType: "classification_failed", + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Record metrics + RecordBatchClassificationRequest(tt.processingType) + RecordBatchSizeDistribution(tt.processingType, tt.batchSize) + RecordBatchClassificationDuration(tt.processingType, tt.batchSize, tt.duration) + RecordBatchClassificationTexts(tt.processingType, tt.batchSize) + + if tt.expectError { + RecordBatchClassificationError(tt.processingType, tt.errorType) + } + }) + } +} + +// TestGetBatchSizeRange tests the batch size range helper function +func TestGetBatchSizeRange(t *testing.T) { + tests := []struct { + name string + size int + expected string + }{ + {"Single text", 1, "1"}, + {"Small batch", 3, "2-5"}, + {"Medium batch", 8, "6-10"}, + {"Large batch", 15, "11-20"}, + {"Very large batch", 35, "21-50"}, + {"Maximum batch", 100, "50+"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := GetBatchSizeRange(tt.size) + if result != tt.expected { + t.Errorf("GetBatchSizeRange(%d) = %s, want %s", tt.size, result, tt.expected) + } + }) + } +} + +// TestConcurrentGoroutineTracking tests goroutine tracking functionality +func TestConcurrentGoroutineTracking(t *testing.T) { + batchID := "test_batch_123" + + // Simulate goroutine start + ConcurrentGoroutines.WithLabelValues(batchID).Inc() + + // Simulate some work + time.Sleep(10 * time.Millisecond) + + // Simulate goroutine end + ConcurrentGoroutines.WithLabelValues(batchID).Dec() +} + +// BenchmarkBatchClassificationMetrics benchmarks the performance impact of metrics recording +func BenchmarkBatchClassificationMetrics(b *testing.B) { + processingType := "concurrent" + batchSize := 10 + duration := 0.5 + + b.ResetTimer() + for i := 0; i < b.N; i++ { + RecordBatchClassificationRequest(processingType) + RecordBatchSizeDistribution(processingType, batchSize) + RecordBatchClassificationDuration(processingType, batchSize, duration) + RecordBatchClassificationTexts(processingType, batchSize) + } +} + +// TestMetricsIntegration tests the integration of all batch classification metrics +func TestMetricsIntegration(t *testing.T) { + // Simulate a complete batch processing scenario + processingType := "concurrent" + batchSize := 8 + batchID := "integration_test_batch" + + // Start of batch processing + RecordBatchClassificationRequest(processingType) + RecordBatchSizeDistribution(processingType, batchSize) + + // Simulate concurrent goroutines + for i := 0; i < batchSize; i++ { + ConcurrentGoroutines.WithLabelValues(batchID).Inc() + } + + // Simulate processing completion + duration := 1.5 + RecordBatchClassificationDuration(processingType, batchSize, duration) + RecordBatchClassificationTexts(processingType, batchSize) + + // Simulate goroutines completion + for i := 0; i < batchSize; i++ { + ConcurrentGoroutines.WithLabelValues(batchID).Dec() + } +} + +// TestDefaultBatchSizeRanges tests that default batch size ranges work when configuration is empty +func TestDefaultBatchSizeRanges(t *testing.T) { + // Save current config + originalConfig := GetBatchMetricsConfig() + + // Set config with empty BatchSizeRanges to test defaults + emptyConfig := BatchMetricsConfig{ + Enabled: true, + DetailedGoroutineTracking: true, + HighResolutionTiming: false, + SampleRate: 1.0, + DurationBuckets: FallbackDurationBuckets, + SizeBuckets: FallbackSizeBuckets, + BatchSizeRanges: []config.BatchSizeRangeConfig{}, // Empty - should use defaults + } + SetBatchMetricsConfig(emptyConfig) + + // Test that default ranges are used + tests := []struct { + name string + size int + expected string + }{ + {"Single text (default)", 1, "1"}, + {"Small batch (default)", 3, "2-5"}, + {"Medium batch (default)", 8, "6-10"}, + {"Large batch (default)", 15, "11-20"}, + {"Very large batch (default)", 35, "21-50"}, + {"Maximum batch (default)", 100, "50+"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := GetBatchSizeRange(tt.size) + if result != tt.expected { + t.Errorf("GetBatchSizeRange(%d) with empty config = %s, want %s", tt.size, result, tt.expected) + } + }) + } + + // Restore original config + SetBatchMetricsConfig(originalConfig) +} + +// TestCustomBatchSizeRanges tests that custom batch size ranges override defaults +func TestCustomBatchSizeRanges(t *testing.T) { + // Save current config + originalConfig := GetBatchMetricsConfig() + + // Set config with custom BatchSizeRanges + customConfig := BatchMetricsConfig{ + Enabled: true, + DetailedGoroutineTracking: true, + HighResolutionTiming: false, + SampleRate: 1.0, + DurationBuckets: FallbackDurationBuckets, + SizeBuckets: FallbackSizeBuckets, + BatchSizeRanges: []config.BatchSizeRangeConfig{ + {Min: 1, Max: 10, Label: "small"}, + {Min: 11, Max: 100, Label: "medium"}, + {Min: 101, Max: -1, Label: "large"}, + }, + } + SetBatchMetricsConfig(customConfig) + + // Test that custom ranges are used + tests := []struct { + name string + size int + expected string + }{ + {"Custom small range", 5, "small"}, + {"Custom medium range", 50, "medium"}, + {"Custom large range", 200, "large"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := GetBatchSizeRange(tt.size) + if result != tt.expected { + t.Errorf("GetBatchSizeRange(%d) with custom config = %s, want %s", tt.size, result, tt.expected) + } + }) + } + + // Restore original config + SetBatchMetricsConfig(originalConfig) +} diff --git a/website/docs/getting-started/configuration.md b/website/docs/getting-started/configuration.md index 56391ef8..dbba7606 100644 --- a/website/docs/getting-started/configuration.md +++ b/website/docs/getting-started/configuration.md @@ -218,8 +218,126 @@ bert_model: model_id: sentence-transformers/all-MiniLM-L12-v2 threshold: 0.6 # Similarity threshold use_cpu: true # CPU-only inference + +# Batch Classification API Configuration +api: + batch_classification: + max_batch_size: 100 # Maximum texts per batch request + concurrency_threshold: 5 # Switch to concurrent processing at this size + max_concurrency: 8 # Maximum concurrent goroutines + + # Metrics configuration for monitoring + metrics: + enabled: true # Enable Prometheus metrics collection + detailed_goroutine_tracking: true # Track individual goroutine lifecycle + high_resolution_timing: false # Use nanosecond precision timing + sample_rate: 1.0 # Collect metrics for all requests (1.0 = 100%) + + # Batch size range labels for metrics (OPTIONAL - uses sensible defaults) + # Default ranges: "1", "2-5", "6-10", "11-20", "21-50", "50+" + # Only specify if you need custom ranges: + # batch_size_ranges: + # - {min: 1, max: 1, label: "1"} + # - {min: 2, max: 5, label: "2-5"} + # - {min: 6, max: 10, label: "6-10"} + # - {min: 11, max: 20, label: "11-20"} + # - {min: 21, max: 50, label: "21-50"} + # - {min: 51, max: -1, label: "50+"} # -1 means no upper limit + + # Histogram buckets - choose from presets below or customize + duration_buckets: [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30] + size_buckets: [1, 2, 5, 10, 20, 50, 100, 200] + + # Preset examples for quick configuration (copy values above) + preset_examples: + fast: + duration: [0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1] + size: [1, 2, 3, 5, 8, 10] + standard: + duration: [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10] + size: [1, 2, 5, 10, 20, 50, 100] + slow: + duration: [0.1, 0.5, 1, 5, 10, 30, 60, 120] + size: [10, 50, 100, 500, 1000, 5000] +``` + +### How to Use Preset Examples + +The configuration includes preset examples for quick setup. Here's how to use them: + +**Step 1: Choose your scenario** +- `fast` - For real-time APIs (microsecond to millisecond response times) +- `standard` - For typical web APIs (millisecond to second response times) +- `slow` - For batch processing or heavy computation (seconds to minutes) + +**Step 2: Copy the preset values** +```yaml +# Example: Switch to fast API configuration +# Copy from preset_examples.fast and paste to the actual config: +duration_buckets: [0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1] +size_buckets: [1, 2, 3, 5, 8, 10] ``` +**Step 3: Restart the service** +```bash +pkill -f "router" +make run-router +``` + +### Default Batch Size Ranges + +The system provides sensible default batch size ranges that work well for most use cases: + +- **"1"** - Single text requests +- **"2-5"** - Small batch requests +- **"6-10"** - Medium batch requests +- **"11-20"** - Large batch requests +- **"21-50"** - Very large batch requests +- **"50+"** - Maximum batch requests + +**You don't need to configure `batch_size_ranges` unless you have specific requirements.** The defaults are automatically used when the configuration is omitted. + +### Configuration Examples by Use Case + +**Real-time Chat API (fast preset)** +```yaml +# Copy these values to your config for sub-millisecond monitoring +duration_buckets: [0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1] +size_buckets: [1, 2, 3, 5, 8, 10] +# batch_size_ranges: uses defaults (no configuration needed) +``` + +**E-commerce API (standard preset)** +```yaml +# Copy these values for typical web API response times +duration_buckets: [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10] +size_buckets: [1, 2, 5, 10, 20, 50, 100] +# batch_size_ranges: uses defaults (no configuration needed) +``` + +**Data Processing Pipeline (slow preset)** +```yaml +# Copy these values for heavy computation workloads +duration_buckets: [0.1, 0.5, 1, 5, 10, 30, 60, 120] +size_buckets: [10, 50, 100, 500, 1000, 5000] +# Custom batch size ranges for large-scale processing (overrides defaults) +batch_size_ranges: + - {min: 1, max: 50, label: "1-50"} + - {min: 51, max: 200, label: "51-200"} + - {min: 201, max: 1000, label: "201-1000"} + - {min: 1001, max: -1, label: "1000+"} +``` + +**Available Metrics:** +- `batch_classification_requests_total` - Total number of batch requests +- `batch_classification_duration_seconds` - Processing duration histogram +- `batch_classification_texts_total` - Total number of texts processed +- `batch_classification_errors_total` - Error count by type +- `batch_classification_concurrent_goroutines` - Active goroutine count +- `batch_classification_size_distribution` - Batch size distribution + +Access metrics at: `http://localhost:9190/metrics` + ## Common Configuration Examples ### Enable All Security Features