@@ -11,6 +11,7 @@ import (
1111 "time"
1212
1313 "github.com/vllm-project/semantic-router/semantic-router/pkg/config"
14+ "github.com/vllm-project/semantic-router/semantic-router/pkg/metrics"
1415 "github.com/vllm-project/semantic-router/semantic-router/pkg/services"
1516)
1617
@@ -91,6 +92,20 @@ func StartClassificationAPI(configPath string, port int) error {
9192 classificationSvc = services .NewPlaceholderClassificationService ()
9293 }
9394
95+ // Initialize batch metrics configuration
96+ if cfg != nil && cfg .API .BatchClassification .Metrics .Enabled {
97+ metricsConfig := metrics.BatchMetricsConfig {
98+ Enabled : cfg .API .BatchClassification .Metrics .Enabled ,
99+ DetailedGoroutineTracking : cfg .API .BatchClassification .Metrics .DetailedGoroutineTracking ,
100+ DurationBuckets : cfg .API .BatchClassification .Metrics .DurationBuckets ,
101+ SizeBuckets : cfg .API .BatchClassification .Metrics .SizeBuckets ,
102+ BatchSizeRanges : cfg .API .BatchClassification .Metrics .BatchSizeRanges ,
103+ HighResolutionTiming : cfg .API .BatchClassification .Metrics .HighResolutionTiming ,
104+ SampleRate : cfg .API .BatchClassification .Metrics .SampleRate ,
105+ }
106+ metrics .SetBatchMetricsConfig (metricsConfig )
107+ }
108+
94109 // Create server instance
95110 apiServer := & ClassificationAPIServer {
96111 classificationSvc : classificationSvc ,
@@ -231,6 +246,8 @@ func (s *ClassificationAPIServer) handleBatchClassification(w http.ResponseWrite
231246
232247 // Input validation
233248 if len (req .Texts ) == 0 {
249+ // Record validation error in metrics
250+ metrics .RecordBatchClassificationError ("validation" , "empty_texts" )
234251 s .writeErrorResponse (w , http .StatusBadRequest , "INVALID_INPUT" , "texts array cannot be empty" )
235252 return
236253 }
@@ -242,6 +259,8 @@ func (s *ClassificationAPIServer) handleBatchClassification(w http.ResponseWrite
242259 }
243260
244261 if len (req .Texts ) > maxBatchSize {
262+ // Record validation error in metrics
263+ metrics .RecordBatchClassificationError ("validation" , "batch_too_large" )
245264 s .writeErrorResponse (w , http .StatusBadRequest , "BATCH_TOO_LARGE" ,
246265 fmt .Sprintf ("batch size cannot exceed %d texts" , maxBatchSize ))
247266 return
@@ -494,10 +513,26 @@ func (s *ClassificationAPIServer) getSystemInfo() SystemInfo {
494513
495514// processSequentially handles small batches with sequential processing
496515func (s * ClassificationAPIServer ) processSequentially (texts []string , options * ClassificationOptions ) ([]services.Classification , error ) {
516+ start := time .Now ()
517+ processingType := "sequential"
518+ batchSize := len (texts )
519+
520+ // Record request and batch size metrics
521+ metrics .RecordBatchClassificationRequest (processingType )
522+ metrics .RecordBatchSizeDistribution (processingType , batchSize )
523+
524+ // Defer recording processing time and text count
525+ defer func () {
526+ duration := time .Since (start ).Seconds ()
527+ metrics .RecordBatchClassificationDuration (processingType , batchSize , duration )
528+ metrics .RecordBatchClassificationTexts (processingType , batchSize )
529+ }()
530+
497531 results := make ([]services.Classification , len (texts ))
498532 for i , text := range texts {
499533 result , err := s .classifySingleText (text , options )
500534 if err != nil {
535+ metrics .RecordBatchClassificationError (processingType , "classification_failed" )
501536 return nil , fmt .Errorf ("failed to classify text at index %d: %w" , i , err )
502537 }
503538 results [i ] = result
@@ -507,6 +542,22 @@ func (s *ClassificationAPIServer) processSequentially(texts []string, options *C
507542
508543// processConcurrently handles large batches with concurrent processing
509544func (s * ClassificationAPIServer ) processConcurrently (texts []string , options * ClassificationOptions ) ([]services.Classification , error ) {
545+ start := time .Now ()
546+ processingType := "concurrent"
547+ batchSize := len (texts )
548+ batchID := fmt .Sprintf ("batch_%d" , time .Now ().UnixNano ())
549+
550+ // Record request and batch size metrics
551+ metrics .RecordBatchClassificationRequest (processingType )
552+ metrics .RecordBatchSizeDistribution (processingType , batchSize )
553+
554+ // Defer recording processing time and text count
555+ defer func () {
556+ duration := time .Since (start ).Seconds ()
557+ metrics .RecordBatchClassificationDuration (processingType , batchSize , duration )
558+ metrics .RecordBatchClassificationTexts (processingType , batchSize )
559+ }()
560+
510561 // Get max concurrency from config, default to 8
511562 maxConcurrency := 8
512563 if s .config != nil && s .config .API .BatchClassification .MaxConcurrency > 0 {
@@ -523,6 +574,18 @@ func (s *ClassificationAPIServer) processConcurrently(texts []string, options *C
523574 wg .Add (1 )
524575 go func (index int , txt string ) {
525576 defer wg .Done ()
577+
578+ // Record goroutine start (if detailed tracking is enabled)
579+ metricsConfig := metrics .GetBatchMetricsConfig ()
580+ if metricsConfig .DetailedGoroutineTracking {
581+ metrics .ConcurrentGoroutines .WithLabelValues (batchID ).Inc ()
582+
583+ defer func () {
584+ // Record goroutine end
585+ metrics .ConcurrentGoroutines .WithLabelValues (batchID ).Dec ()
586+ }()
587+ }
588+
526589 semaphore <- struct {}{}
527590 defer func () { <- semaphore }()
528591
@@ -532,6 +595,7 @@ func (s *ClassificationAPIServer) processConcurrently(texts []string, options *C
532595 result , err := s .classifySingleText (txt , options )
533596 if err != nil {
534597 errors [index ] = err
598+ metrics .RecordBatchClassificationError (processingType , "classification_failed" )
535599 return
536600 }
537601 results [index ] = result
0 commit comments