Skip to content

Commit 3d905da

Browse files
authored
SD-11577: add worker subrequest support (#12)
1 parent a9d9406 commit 3d905da

File tree

4 files changed

+117
-0
lines changed

4 files changed

+117
-0
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,8 @@ Note: `ZONE_<name>` configuration is not supported as flag.
131131
# HELP cloudflare_r2_storage_total_bytes Total storage used by R2
132132
# HELP cloudflare_kv_requests_count Number of KV operations by namespace and action type
133133
# HELP cloudflare_kv_latency KV operation latency quantiles (milliseconds)
134+
# HELP cloudflare_worker_subrequests_count Number of subrequests by script name
135+
# HELP cloudflare_worker_subrequest_time Subrequest response time quantiles (microseconds)
134136
```
135137

136138
## Helm chart repository

cloudflare.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,29 @@ type kvAccountResp struct {
323323
} `json:"kvOperationsAdaptiveGroups"`
324324
}
325325

326+
type cloudflareResponseSubrequests struct {
327+
Viewer struct {
328+
Accounts []subrequestsAccountResp `json:"accounts"`
329+
} `json:"viewer"`
330+
}
331+
332+
type subrequestsAccountResp struct {
333+
WorkersSubrequestsAdaptiveGroups []struct {
334+
Dimensions struct {
335+
ScriptName string `json:"scriptName"`
336+
} `json:"dimensions"`
337+
Sum struct {
338+
Subrequests uint64 `json:"subrequests"`
339+
} `json:"sum"`
340+
Quantiles struct {
341+
TimeToResponseUsP50 float32 `json:"timeToResponseUsP50"`
342+
TimeToResponseUsP75 float32 `json:"timeToResponseUsP75"`
343+
TimeToResponseUsP99 float32 `json:"timeToResponseUsP99"`
344+
TimeToResponseUsP999 float32 `json:"timeToResponseUsP999"`
345+
} `json:"quantiles"`
346+
} `json:"workersSubrequestsAdaptiveGroups"`
347+
}
348+
326349
type cloudflareResponseDNSFirewall struct {
327350
Viewer struct {
328351
Accounts []dnsFirewallAccountResp `json:"accounts"`
@@ -1154,6 +1177,50 @@ func fetchKVOperations(accountID string) (*cloudflareResponseKV, error) {
11541177
return &resp, nil
11551178
}
11561179

1180+
func fetchWorkerSubrequests(accountID string) (*cloudflareResponseSubrequests, error) {
1181+
request := graphql.NewRequest(`
1182+
query ($accountID: String!, $mintime: Time!, $maxtime: Time!, $limit: Int!) {
1183+
viewer {
1184+
accounts(filter: {accountTag: $accountID}) {
1185+
workersSubrequestsAdaptiveGroups(limit: $limit, filter: {datetime_geq: $mintime, datetime_lt: $maxtime}) {
1186+
dimensions {
1187+
scriptName
1188+
}
1189+
sum {
1190+
subrequests
1191+
}
1192+
quantiles {
1193+
timeToResponseUsP50
1194+
timeToResponseUsP75
1195+
timeToResponseUsP99
1196+
timeToResponseUsP999
1197+
}
1198+
}
1199+
}
1200+
}
1201+
}`)
1202+
1203+
now, now1mAgo := GetTimeRange()
1204+
request.Var("limit", gqlQueryLimit)
1205+
request.Var("maxtime", now)
1206+
request.Var("mintime", now1mAgo)
1207+
request.Var("accountID", accountID)
1208+
1209+
gql.Mu.RLock()
1210+
defer gql.Mu.RUnlock()
1211+
1212+
ctx, cancel := context.WithTimeout(context.Background(), cftimeout)
1213+
defer cancel()
1214+
1215+
var resp cloudflareResponseSubrequests
1216+
if err := gql.Client.Run(ctx, request, &resp); err != nil {
1217+
log.Errorf("error fetching worker subrequests, err:%v", err)
1218+
return nil, err
1219+
}
1220+
1221+
return &resp, nil
1222+
}
1223+
11571224
func fetchDNSFirewallTotals(accountID string) (*cloudflareResponseDNSFirewall, error) {
11581225
request := graphql.NewRequest(`
11591226
query ($accountID: string, $mintime: Time!, $maxtime: Time!, $limit: Int!) {

main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ func fetchMetrics(deniedMetricsSet MetricsSet) {
115115
for _, a := range accounts {
116116
go fetchWorkerAnalytics(a, &wg)
117117
go fetchKVAnalytics(a, &wg, deniedMetricsSet)
118+
go fetchWorkerSubrequestAnalytics(a, &wg, deniedMetricsSet)
118119
go fetchLogpushAnalyticsForAccount(a, &wg)
119120
go fetchR2StorageForAccount(a, &wg)
120121
go fetchLoadblancerPoolsHealth(a, &wg)

prometheus.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ const (
6666
dnsFirewallQueryCountMetricName MetricName = "cloudflare_dns_firewall_query_count"
6767
kvRequestsMetricName MetricName = "cloudflare_kv_requests_count"
6868
kvLatencyMetricName MetricName = "cloudflare_kv_latency"
69+
workerSubrequestsCountMetricName MetricName = "cloudflare_worker_subrequests_count"
70+
workerSubrequestTimeMetricName MetricName = "cloudflare_worker_subrequest_time"
6971
)
7072

7173
type MetricsSet map[MetricName]struct{}
@@ -348,6 +350,16 @@ var (
348350
Help: "KV operation latency quantiles (milliseconds)",
349351
}, []string{"namespace_id", "action_type", "account", "quantile"})
350352

353+
workerSubrequestsCount = prometheus.NewGaugeVec(prometheus.GaugeOpts{
354+
Name: workerSubrequestsCountMetricName.String(),
355+
Help: "Number of subrequests by script name",
356+
}, []string{"script_name", "account"})
357+
358+
workerSubrequestTime = prometheus.NewGaugeVec(prometheus.GaugeOpts{
359+
Name: workerSubrequestTimeMetricName.String(),
360+
Help: "Subrequest response time quantiles (microseconds)",
361+
}, []string{"script_name", "account", "quantile"})
362+
351363
dnsFirewallQueryCount = prometheus.NewGaugeVec(prometheus.GaugeOpts{
352364
Name: dnsFirewallQueryCountMetricName.String(),
353365
Help: "DNS Firewall query count by query type and response code",
@@ -402,6 +414,8 @@ func buildAllMetricsSet() MetricsSet {
402414
allMetricsSet.Add(dnsFirewallQueryCountMetricName)
403415
allMetricsSet.Add(kvRequestsMetricName)
404416
allMetricsSet.Add(kvLatencyMetricName)
417+
allMetricsSet.Add(workerSubrequestsCountMetricName)
418+
allMetricsSet.Add(workerSubrequestTimeMetricName)
405419
return allMetricsSet
406420
}
407421

@@ -557,6 +571,12 @@ func mustRegisterMetrics(deniedMetrics MetricsSet) {
557571
if !deniedMetrics.Has(kvLatencyMetricName) {
558572
prometheus.MustRegister(kvLatency)
559573
}
574+
if !deniedMetrics.Has(workerSubrequestsCountMetricName) {
575+
prometheus.MustRegister(workerSubrequestsCount)
576+
}
577+
if !deniedMetrics.Has(workerSubrequestTimeMetricName) {
578+
prometheus.MustRegister(workerSubrequestTime)
579+
}
560580
}
561581

562582
func fetchLoadblancerPoolsHealth(account cfaccounts.Account, wg *sync.WaitGroup) {
@@ -659,6 +679,33 @@ func fetchKVAnalytics(account cfaccounts.Account, wg *sync.WaitGroup, deniedMetr
659679
}
660680
}
661681

682+
func fetchWorkerSubrequestAnalytics(account cfaccounts.Account, wg *sync.WaitGroup, deniedMetricsSet MetricsSet) {
683+
wg.Add(1)
684+
defer wg.Done()
685+
686+
r, err := fetchWorkerSubrequests(account.ID)
687+
if err != nil {
688+
log.Error("failed to fetch worker subrequests for account ", account.ID, ": ", err)
689+
return
690+
}
691+
692+
accountName := strings.ToLower(strings.ReplaceAll(account.Name, " ", "-"))
693+
694+
for _, a := range r.Viewer.Accounts {
695+
for _, sr := range a.WorkersSubrequestsAdaptiveGroups {
696+
if !deniedMetricsSet.Has(workerSubrequestsCountMetricName) {
697+
workerSubrequestsCount.With(prometheus.Labels{"script_name": sr.Dimensions.ScriptName, "account": accountName}).Add(float64(sr.Sum.Subrequests))
698+
}
699+
if !deniedMetricsSet.Has(workerSubrequestTimeMetricName) {
700+
workerSubrequestTime.With(prometheus.Labels{"script_name": sr.Dimensions.ScriptName, "account": accountName, "quantile": "P50"}).Set(float64(sr.Quantiles.TimeToResponseUsP50))
701+
workerSubrequestTime.With(prometheus.Labels{"script_name": sr.Dimensions.ScriptName, "account": accountName, "quantile": "P75"}).Set(float64(sr.Quantiles.TimeToResponseUsP75))
702+
workerSubrequestTime.With(prometheus.Labels{"script_name": sr.Dimensions.ScriptName, "account": accountName, "quantile": "P99"}).Set(float64(sr.Quantiles.TimeToResponseUsP99))
703+
workerSubrequestTime.With(prometheus.Labels{"script_name": sr.Dimensions.ScriptName, "account": accountName, "quantile": "P999"}).Set(float64(sr.Quantiles.TimeToResponseUsP999))
704+
}
705+
}
706+
}
707+
}
708+
662709
func fetchLogpushAnalyticsForAccount(account cfaccounts.Account, wg *sync.WaitGroup) {
663710
wg.Add(1)
664711
defer wg.Done()

0 commit comments

Comments
 (0)