Skip to content

Commit a0c9b30

Browse files
committed
SD-11577: add queue health metrics
1 parent 3d905da commit a0c9b30

File tree

4 files changed

+274
-0
lines changed

4 files changed

+274
-0
lines changed

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,13 @@ Note: `ZONE_<name>` configuration is not supported as flag.
133133
# HELP cloudflare_kv_latency KV operation latency quantiles (milliseconds)
134134
# HELP cloudflare_worker_subrequests_count Number of subrequests by script name
135135
# HELP cloudflare_worker_subrequest_time Subrequest response time quantiles (microseconds)
136+
# HELP cloudflare_queue_backlog_messages Average number of messages in queue backlog
137+
# HELP cloudflare_queue_backlog_bytes Average backlog size in bytes
138+
# HELP cloudflare_queue_consumer_concurrency Average number of concurrent queue consumers
139+
# HELP cloudflare_queue_operations_count Number of queue message operations
140+
# HELP cloudflare_queue_operations_bytes Total bytes processed by queue message operations
141+
# HELP cloudflare_queue_operations_lag_time Average lag time between write and read/delete (milliseconds)
142+
# HELP cloudflare_queue_operations_retry_count Average retry count for queue message operations
136143
```
137144

138145
## Helm chart repository

cloudflare.go

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
cfaccounts "github.com/cloudflare/cloudflare-go/v4/accounts"
1010
cfload_balancers "github.com/cloudflare/cloudflare-go/v4/load_balancers"
1111
cfpagination "github.com/cloudflare/cloudflare-go/v4/packages/pagination"
12+
cfqueues "github.com/cloudflare/cloudflare-go/v4/queues"
1213
cfrulesets "github.com/cloudflare/cloudflare-go/v4/rulesets"
1314
cfzero_trust "github.com/cloudflare/cloudflare-go/v4/zero_trust"
1415
cfzones "github.com/cloudflare/cloudflare-go/v4/zones"
@@ -346,6 +347,47 @@ type subrequestsAccountResp struct {
346347
} `json:"workersSubrequestsAdaptiveGroups"`
347348
}
348349

350+
type cloudflareResponseQueues struct {
351+
Viewer struct {
352+
Accounts []queueAccountResp `json:"accounts"`
353+
} `json:"viewer"`
354+
}
355+
356+
type queueAccountResp struct {
357+
QueueBacklogAdaptiveGroups []struct {
358+
Dimensions struct {
359+
QueueID string `json:"queueId"`
360+
} `json:"dimensions"`
361+
Avg struct {
362+
Messages float64 `json:"messages"`
363+
Bytes float64 `json:"bytes"`
364+
} `json:"avg"`
365+
} `json:"queueBacklogAdaptiveGroups"`
366+
QueueConsumerMetricsAdaptiveGroups []struct {
367+
Dimensions struct {
368+
QueueID string `json:"queueId"`
369+
} `json:"dimensions"`
370+
Avg struct {
371+
Concurrency float64 `json:"concurrency"`
372+
} `json:"avg"`
373+
} `json:"queueConsumerMetricsAdaptiveGroups"`
374+
QueueMessageOperationsAdaptiveGroups []struct {
375+
Dimensions struct {
376+
QueueID string `json:"queueId"`
377+
ActionType string `json:"actionType"`
378+
Outcome string `json:"outcome"`
379+
} `json:"dimensions"`
380+
Sum struct {
381+
BillableOperations uint64 `json:"billableOperations"`
382+
Bytes uint64 `json:"bytes"`
383+
} `json:"sum"`
384+
Avg struct {
385+
LagTime float64 `json:"lagTime"`
386+
RetryCount float64 `json:"retryCount"`
387+
} `json:"avg"`
388+
} `json:"queueMessageOperationsAdaptiveGroups"`
389+
}
390+
349391
type cloudflareResponseDNSFirewall struct {
350392
Viewer struct {
351393
Accounts []dnsFirewallAccountResp `json:"accounts"`
@@ -1221,6 +1263,98 @@ func fetchWorkerSubrequests(accountID string) (*cloudflareResponseSubrequests, e
12211263
return &resp, nil
12221264
}
12231265

1266+
func fetchQueueNames(accountID string) (map[string]string, error) {
1267+
ctx, cancel := context.WithTimeout(context.Background(), cftimeout)
1268+
defer cancel()
1269+
page := cfclient.Queues.ListAutoPaging(ctx,
1270+
cfqueues.QueueListParams{
1271+
AccountID: cf.F(accountID),
1272+
})
1273+
if page.Err() != nil {
1274+
return nil, page.Err()
1275+
}
1276+
1277+
names := make(map[string]string)
1278+
seenIDs := make(map[string]struct{})
1279+
for page.Next() {
1280+
if page.Err() != nil {
1281+
log.Errorf("error during paging queues: %v", page.Err())
1282+
break
1283+
}
1284+
q := page.Current()
1285+
if _, exists := seenIDs[q.QueueID]; exists {
1286+
log.Errorf("fetchQueueNames: duplicate queue ID detected (%s), breaking loop", q.QueueID)
1287+
break
1288+
}
1289+
seenIDs[q.QueueID] = struct{}{}
1290+
names[q.QueueID] = q.QueueName
1291+
}
1292+
1293+
return names, nil
1294+
}
1295+
1296+
func fetchQueueMetrics(accountID string) (*cloudflareResponseQueues, error) {
1297+
request := graphql.NewRequest(`
1298+
query ($accountID: String!, $mintime: Time!, $maxtime: Time!, $limit: Int!) {
1299+
viewer {
1300+
accounts(filter: {accountTag: $accountID}) {
1301+
queueBacklogAdaptiveGroups(limit: $limit, filter: {datetime_geq: $mintime, datetime_lt: $maxtime}) {
1302+
dimensions {
1303+
queueId
1304+
}
1305+
avg {
1306+
messages
1307+
bytes
1308+
}
1309+
}
1310+
queueConsumerMetricsAdaptiveGroups(limit: $limit, filter: {datetime_geq: $mintime, datetime_lt: $maxtime}) {
1311+
dimensions {
1312+
queueId
1313+
}
1314+
avg {
1315+
concurrency
1316+
}
1317+
}
1318+
queueMessageOperationsAdaptiveGroups(limit: $limit, filter: {datetime_geq: $mintime, datetime_lt: $maxtime}) {
1319+
dimensions {
1320+
queueId
1321+
actionType
1322+
outcome
1323+
}
1324+
sum {
1325+
billableOperations
1326+
bytes
1327+
}
1328+
avg {
1329+
lagTime
1330+
retryCount
1331+
}
1332+
}
1333+
}
1334+
}
1335+
}`)
1336+
1337+
now, now1mAgo := GetTimeRange()
1338+
request.Var("limit", gqlQueryLimit)
1339+
request.Var("maxtime", now)
1340+
request.Var("mintime", now1mAgo)
1341+
request.Var("accountID", accountID)
1342+
1343+
gql.Mu.RLock()
1344+
defer gql.Mu.RUnlock()
1345+
1346+
ctx, cancel := context.WithTimeout(context.Background(), cftimeout)
1347+
defer cancel()
1348+
1349+
var resp cloudflareResponseQueues
1350+
if err := gql.Client.Run(ctx, request, &resp); err != nil {
1351+
log.Errorf("error fetching queue metrics, err:%v", err)
1352+
return nil, err
1353+
}
1354+
1355+
return &resp, nil
1356+
}
1357+
12241358
func fetchDNSFirewallTotals(accountID string) (*cloudflareResponseDNSFirewall, error) {
12251359
request := graphql.NewRequest(`
12261360
query ($accountID: string, $mintime: Time!, $maxtime: Time!, $limit: Int!) {

main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ func fetchMetrics(deniedMetricsSet MetricsSet) {
116116
go fetchWorkerAnalytics(a, &wg)
117117
go fetchKVAnalytics(a, &wg, deniedMetricsSet)
118118
go fetchWorkerSubrequestAnalytics(a, &wg, deniedMetricsSet)
119+
go fetchQueueAnalytics(a, &wg, deniedMetricsSet)
119120
go fetchLogpushAnalyticsForAccount(a, &wg)
120121
go fetchR2StorageForAccount(a, &wg)
121122
go fetchLoadblancerPoolsHealth(a, &wg)

prometheus.go

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,13 @@ const (
6868
kvLatencyMetricName MetricName = "cloudflare_kv_latency"
6969
workerSubrequestsCountMetricName MetricName = "cloudflare_worker_subrequests_count"
7070
workerSubrequestTimeMetricName MetricName = "cloudflare_worker_subrequest_time"
71+
queueBacklogMessagesMetricName MetricName = "cloudflare_queue_backlog_messages"
72+
queueBacklogBytesMetricName MetricName = "cloudflare_queue_backlog_bytes"
73+
queueConsumerConcurrencyMetricName MetricName = "cloudflare_queue_consumer_concurrency"
74+
queueOperationsMetricName MetricName = "cloudflare_queue_operations_count"
75+
queueOperationBytesMetricName MetricName = "cloudflare_queue_operations_bytes"
76+
queueOperationLagTimeMetricName MetricName = "cloudflare_queue_operations_lag_time"
77+
queueOperationRetryCountMetricName MetricName = "cloudflare_queue_operations_retry_count"
7178
)
7279

7380
type MetricsSet map[MetricName]struct{}
@@ -360,6 +367,41 @@ var (
360367
Help: "Subrequest response time quantiles (microseconds)",
361368
}, []string{"script_name", "account", "quantile"})
362369

370+
queueBacklogMessages = prometheus.NewGaugeVec(prometheus.GaugeOpts{
371+
Name: queueBacklogMessagesMetricName.String(),
372+
Help: "Average number of messages in queue backlog",
373+
}, []string{"queue_name", "account"})
374+
375+
queueBacklogBytes = prometheus.NewGaugeVec(prometheus.GaugeOpts{
376+
Name: queueBacklogBytesMetricName.String(),
377+
Help: "Average backlog size in bytes",
378+
}, []string{"queue_name", "account"})
379+
380+
queueConsumerConcurrency = prometheus.NewGaugeVec(prometheus.GaugeOpts{
381+
Name: queueConsumerConcurrencyMetricName.String(),
382+
Help: "Average number of concurrent queue consumers",
383+
}, []string{"queue_name", "account"})
384+
385+
queueOperations = prometheus.NewGaugeVec(prometheus.GaugeOpts{
386+
Name: queueOperationsMetricName.String(),
387+
Help: "Number of queue message operations",
388+
}, []string{"queue_name", "account", "action_type", "outcome"})
389+
390+
queueOperationBytes = prometheus.NewGaugeVec(prometheus.GaugeOpts{
391+
Name: queueOperationBytesMetricName.String(),
392+
Help: "Total bytes processed by queue message operations",
393+
}, []string{"queue_name", "account", "action_type", "outcome"})
394+
395+
queueOperationLagTime = prometheus.NewGaugeVec(prometheus.GaugeOpts{
396+
Name: queueOperationLagTimeMetricName.String(),
397+
Help: "Average lag time between write and read/delete (milliseconds)",
398+
}, []string{"queue_name", "account", "action_type", "outcome"})
399+
400+
queueOperationRetryCount = prometheus.NewGaugeVec(prometheus.GaugeOpts{
401+
Name: queueOperationRetryCountMetricName.String(),
402+
Help: "Average retry count for queue message operations",
403+
}, []string{"queue_name", "account", "action_type", "outcome"})
404+
363405
dnsFirewallQueryCount = prometheus.NewGaugeVec(prometheus.GaugeOpts{
364406
Name: dnsFirewallQueryCountMetricName.String(),
365407
Help: "DNS Firewall query count by query type and response code",
@@ -416,6 +458,13 @@ func buildAllMetricsSet() MetricsSet {
416458
allMetricsSet.Add(kvLatencyMetricName)
417459
allMetricsSet.Add(workerSubrequestsCountMetricName)
418460
allMetricsSet.Add(workerSubrequestTimeMetricName)
461+
allMetricsSet.Add(queueBacklogMessagesMetricName)
462+
allMetricsSet.Add(queueBacklogBytesMetricName)
463+
allMetricsSet.Add(queueConsumerConcurrencyMetricName)
464+
allMetricsSet.Add(queueOperationsMetricName)
465+
allMetricsSet.Add(queueOperationBytesMetricName)
466+
allMetricsSet.Add(queueOperationLagTimeMetricName)
467+
allMetricsSet.Add(queueOperationRetryCountMetricName)
419468
return allMetricsSet
420469
}
421470

@@ -577,6 +626,27 @@ func mustRegisterMetrics(deniedMetrics MetricsSet) {
577626
if !deniedMetrics.Has(workerSubrequestTimeMetricName) {
578627
prometheus.MustRegister(workerSubrequestTime)
579628
}
629+
if !deniedMetrics.Has(queueBacklogMessagesMetricName) {
630+
prometheus.MustRegister(queueBacklogMessages)
631+
}
632+
if !deniedMetrics.Has(queueBacklogBytesMetricName) {
633+
prometheus.MustRegister(queueBacklogBytes)
634+
}
635+
if !deniedMetrics.Has(queueConsumerConcurrencyMetricName) {
636+
prometheus.MustRegister(queueConsumerConcurrency)
637+
}
638+
if !deniedMetrics.Has(queueOperationsMetricName) {
639+
prometheus.MustRegister(queueOperations)
640+
}
641+
if !deniedMetrics.Has(queueOperationBytesMetricName) {
642+
prometheus.MustRegister(queueOperationBytes)
643+
}
644+
if !deniedMetrics.Has(queueOperationLagTimeMetricName) {
645+
prometheus.MustRegister(queueOperationLagTime)
646+
}
647+
if !deniedMetrics.Has(queueOperationRetryCountMetricName) {
648+
prometheus.MustRegister(queueOperationRetryCount)
649+
}
580650
}
581651

582652
func fetchLoadblancerPoolsHealth(account cfaccounts.Account, wg *sync.WaitGroup) {
@@ -706,6 +776,68 @@ func fetchWorkerSubrequestAnalytics(account cfaccounts.Account, wg *sync.WaitGro
706776
}
707777
}
708778

779+
func fetchQueueAnalytics(account cfaccounts.Account, wg *sync.WaitGroup, deniedMetricsSet MetricsSet) {
780+
wg.Add(1)
781+
defer wg.Done()
782+
783+
names, err := fetchQueueNames(account.ID)
784+
if err != nil {
785+
log.Error("failed to fetch queue names for account ", account.ID, ": ", err)
786+
return
787+
}
788+
789+
r, err := fetchQueueMetrics(account.ID)
790+
if err != nil {
791+
log.Error("failed to fetch queue metrics for account ", account.ID, ": ", err)
792+
return
793+
}
794+
795+
accountName := strings.ToLower(strings.ReplaceAll(account.Name, " ", "-"))
796+
797+
resolveQueueName := func(queueID string) string {
798+
if name, ok := names[queueID]; ok {
799+
return name
800+
}
801+
return queueID
802+
}
803+
804+
for _, a := range r.Viewer.Accounts {
805+
for _, b := range a.QueueBacklogAdaptiveGroups {
806+
qName := resolveQueueName(b.Dimensions.QueueID)
807+
if !deniedMetricsSet.Has(queueBacklogMessagesMetricName) {
808+
queueBacklogMessages.With(prometheus.Labels{"queue_name": qName, "account": accountName}).Set(b.Avg.Messages)
809+
}
810+
if !deniedMetricsSet.Has(queueBacklogBytesMetricName) {
811+
queueBacklogBytes.With(prometheus.Labels{"queue_name": qName, "account": accountName}).Set(b.Avg.Bytes)
812+
}
813+
}
814+
815+
for _, c := range a.QueueConsumerMetricsAdaptiveGroups {
816+
qName := resolveQueueName(c.Dimensions.QueueID)
817+
if !deniedMetricsSet.Has(queueConsumerConcurrencyMetricName) {
818+
queueConsumerConcurrency.With(prometheus.Labels{"queue_name": qName, "account": accountName}).Set(c.Avg.Concurrency)
819+
}
820+
}
821+
822+
for _, op := range a.QueueMessageOperationsAdaptiveGroups {
823+
qName := resolveQueueName(op.Dimensions.QueueID)
824+
labels := prometheus.Labels{"queue_name": qName, "account": accountName, "action_type": op.Dimensions.ActionType, "outcome": op.Dimensions.Outcome}
825+
if !deniedMetricsSet.Has(queueOperationsMetricName) {
826+
queueOperations.With(labels).Add(float64(op.Sum.BillableOperations))
827+
}
828+
if !deniedMetricsSet.Has(queueOperationBytesMetricName) {
829+
queueOperationBytes.With(labels).Add(float64(op.Sum.Bytes))
830+
}
831+
if !deniedMetricsSet.Has(queueOperationLagTimeMetricName) {
832+
queueOperationLagTime.With(labels).Set(op.Avg.LagTime)
833+
}
834+
if !deniedMetricsSet.Has(queueOperationRetryCountMetricName) {
835+
queueOperationRetryCount.With(labels).Set(op.Avg.RetryCount)
836+
}
837+
}
838+
}
839+
}
840+
709841
func fetchLogpushAnalyticsForAccount(account cfaccounts.Account, wg *sync.WaitGroup) {
710842
wg.Add(1)
711843
defer wg.Done()

0 commit comments

Comments
 (0)