@@ -68,6 +68,13 @@ const (
6868 kvLatencyMetricName MetricName = "cloudflare_kv_latency"
6969 workerSubrequestsCountMetricName MetricName = "cloudflare_worker_subrequests_count"
7070 workerSubrequestTimeMetricName MetricName = "cloudflare_worker_subrequest_time"
71+ queueBacklogMessagesMetricName MetricName = "cloudflare_queue_backlog_messages"
72+ queueBacklogBytesMetricName MetricName = "cloudflare_queue_backlog_bytes"
73+ queueConsumerConcurrencyMetricName MetricName = "cloudflare_queue_consumer_concurrency"
74+ queueOperationsMetricName MetricName = "cloudflare_queue_operations_count"
75+ queueOperationBytesMetricName MetricName = "cloudflare_queue_operations_bytes"
76+ queueOperationLagTimeMetricName MetricName = "cloudflare_queue_operations_lag_time"
77+ queueOperationRetryCountMetricName MetricName = "cloudflare_queue_operations_retry_count"
7178)
7279
7380type MetricsSet map [MetricName ]struct {}
@@ -360,6 +367,41 @@ var (
360367 Help : "Subrequest response time quantiles (microseconds)" ,
361368 }, []string {"script_name" , "account" , "quantile" })
362369
370+ queueBacklogMessages = prometheus .NewGaugeVec (prometheus.GaugeOpts {
371+ Name : queueBacklogMessagesMetricName .String (),
372+ Help : "Average number of messages in queue backlog" ,
373+ }, []string {"queue_name" , "account" })
374+
375+ queueBacklogBytes = prometheus .NewGaugeVec (prometheus.GaugeOpts {
376+ Name : queueBacklogBytesMetricName .String (),
377+ Help : "Average backlog size in bytes" ,
378+ }, []string {"queue_name" , "account" })
379+
380+ queueConsumerConcurrency = prometheus .NewGaugeVec (prometheus.GaugeOpts {
381+ Name : queueConsumerConcurrencyMetricName .String (),
382+ Help : "Average number of concurrent queue consumers" ,
383+ }, []string {"queue_name" , "account" })
384+
385+ queueOperations = prometheus .NewGaugeVec (prometheus.GaugeOpts {
386+ Name : queueOperationsMetricName .String (),
387+ Help : "Number of queue message operations" ,
388+ }, []string {"queue_name" , "account" , "action_type" , "outcome" })
389+
390+ queueOperationBytes = prometheus .NewGaugeVec (prometheus.GaugeOpts {
391+ Name : queueOperationBytesMetricName .String (),
392+ Help : "Total bytes processed by queue message operations" ,
393+ }, []string {"queue_name" , "account" , "action_type" , "outcome" })
394+
395+ queueOperationLagTime = prometheus .NewGaugeVec (prometheus.GaugeOpts {
396+ Name : queueOperationLagTimeMetricName .String (),
397+ Help : "Average lag time between write and read/delete (milliseconds)" ,
398+ }, []string {"queue_name" , "account" , "action_type" , "outcome" })
399+
400+ queueOperationRetryCount = prometheus .NewGaugeVec (prometheus.GaugeOpts {
401+ Name : queueOperationRetryCountMetricName .String (),
402+ Help : "Average retry count for queue message operations" ,
403+ }, []string {"queue_name" , "account" , "action_type" , "outcome" })
404+
363405 dnsFirewallQueryCount = prometheus .NewGaugeVec (prometheus.GaugeOpts {
364406 Name : dnsFirewallQueryCountMetricName .String (),
365407 Help : "DNS Firewall query count by query type and response code" ,
@@ -416,6 +458,13 @@ func buildAllMetricsSet() MetricsSet {
416458 allMetricsSet .Add (kvLatencyMetricName )
417459 allMetricsSet .Add (workerSubrequestsCountMetricName )
418460 allMetricsSet .Add (workerSubrequestTimeMetricName )
461+ allMetricsSet .Add (queueBacklogMessagesMetricName )
462+ allMetricsSet .Add (queueBacklogBytesMetricName )
463+ allMetricsSet .Add (queueConsumerConcurrencyMetricName )
464+ allMetricsSet .Add (queueOperationsMetricName )
465+ allMetricsSet .Add (queueOperationBytesMetricName )
466+ allMetricsSet .Add (queueOperationLagTimeMetricName )
467+ allMetricsSet .Add (queueOperationRetryCountMetricName )
419468 return allMetricsSet
420469}
421470
@@ -577,6 +626,27 @@ func mustRegisterMetrics(deniedMetrics MetricsSet) {
577626 if ! deniedMetrics .Has (workerSubrequestTimeMetricName ) {
578627 prometheus .MustRegister (workerSubrequestTime )
579628 }
629+ if ! deniedMetrics .Has (queueBacklogMessagesMetricName ) {
630+ prometheus .MustRegister (queueBacklogMessages )
631+ }
632+ if ! deniedMetrics .Has (queueBacklogBytesMetricName ) {
633+ prometheus .MustRegister (queueBacklogBytes )
634+ }
635+ if ! deniedMetrics .Has (queueConsumerConcurrencyMetricName ) {
636+ prometheus .MustRegister (queueConsumerConcurrency )
637+ }
638+ if ! deniedMetrics .Has (queueOperationsMetricName ) {
639+ prometheus .MustRegister (queueOperations )
640+ }
641+ if ! deniedMetrics .Has (queueOperationBytesMetricName ) {
642+ prometheus .MustRegister (queueOperationBytes )
643+ }
644+ if ! deniedMetrics .Has (queueOperationLagTimeMetricName ) {
645+ prometheus .MustRegister (queueOperationLagTime )
646+ }
647+ if ! deniedMetrics .Has (queueOperationRetryCountMetricName ) {
648+ prometheus .MustRegister (queueOperationRetryCount )
649+ }
580650}
581651
582652func fetchLoadblancerPoolsHealth (account cfaccounts.Account , wg * sync.WaitGroup ) {
@@ -706,6 +776,68 @@ func fetchWorkerSubrequestAnalytics(account cfaccounts.Account, wg *sync.WaitGro
706776 }
707777}
708778
779+ func fetchQueueAnalytics (account cfaccounts.Account , wg * sync.WaitGroup , deniedMetricsSet MetricsSet ) {
780+ wg .Add (1 )
781+ defer wg .Done ()
782+
783+ names , err := fetchQueueNames (account .ID )
784+ if err != nil {
785+ log .Error ("failed to fetch queue names for account " , account .ID , ": " , err )
786+ return
787+ }
788+
789+ r , err := fetchQueueMetrics (account .ID )
790+ if err != nil {
791+ log .Error ("failed to fetch queue metrics for account " , account .ID , ": " , err )
792+ return
793+ }
794+
795+ accountName := strings .ToLower (strings .ReplaceAll (account .Name , " " , "-" ))
796+
797+ resolveQueueName := func (queueID string ) string {
798+ if name , ok := names [queueID ]; ok {
799+ return name
800+ }
801+ return queueID
802+ }
803+
804+ for _ , a := range r .Viewer .Accounts {
805+ for _ , b := range a .QueueBacklogAdaptiveGroups {
806+ qName := resolveQueueName (b .Dimensions .QueueID )
807+ if ! deniedMetricsSet .Has (queueBacklogMessagesMetricName ) {
808+ queueBacklogMessages .With (prometheus.Labels {"queue_name" : qName , "account" : accountName }).Set (b .Avg .Messages )
809+ }
810+ if ! deniedMetricsSet .Has (queueBacklogBytesMetricName ) {
811+ queueBacklogBytes .With (prometheus.Labels {"queue_name" : qName , "account" : accountName }).Set (b .Avg .Bytes )
812+ }
813+ }
814+
815+ for _ , c := range a .QueueConsumerMetricsAdaptiveGroups {
816+ qName := resolveQueueName (c .Dimensions .QueueID )
817+ if ! deniedMetricsSet .Has (queueConsumerConcurrencyMetricName ) {
818+ queueConsumerConcurrency .With (prometheus.Labels {"queue_name" : qName , "account" : accountName }).Set (c .Avg .Concurrency )
819+ }
820+ }
821+
822+ for _ , op := range a .QueueMessageOperationsAdaptiveGroups {
823+ qName := resolveQueueName (op .Dimensions .QueueID )
824+ labels := prometheus.Labels {"queue_name" : qName , "account" : accountName , "action_type" : op .Dimensions .ActionType , "outcome" : op .Dimensions .Outcome }
825+ if ! deniedMetricsSet .Has (queueOperationsMetricName ) {
826+ queueOperations .With (labels ).Set (float64 (op .Sum .BillableOperations ))
827+ }
828+ if ! deniedMetricsSet .Has (queueOperationBytesMetricName ) {
829+ queueOperationBytes .With (labels ).Set (float64 (op .Sum .Bytes ))
830+ }
831+ if ! deniedMetricsSet .Has (queueOperationLagTimeMetricName ) {
832+ queueOperationLagTime .With (labels ).Set (op .Avg .LagTime )
833+ }
834+ if ! deniedMetricsSet .Has (queueOperationRetryCountMetricName ) {
835+ queueOperationRetryCount .With (labels ).Set (op .Avg .RetryCount )
836+ }
837+ }
838+ }
839+ }
840+
709841func fetchLogpushAnalyticsForAccount (account cfaccounts.Account , wg * sync.WaitGroup ) {
710842 wg .Add (1 )
711843 defer wg .Done ()
0 commit comments