Skip to content

Commit 4a99d0e

Browse files
committed
feat(): add more worker metrics
1 parent 71ba03c commit 4a99d0e

File tree

3 files changed

+89
-52
lines changed

3 files changed

+89
-52
lines changed

gql/workers_query.graphql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ query GetWorkersAnalyticsQuery($accountTag: string!, $datetimeStart: Time!, $dat
1515
requests
1616
errors
1717
duration
18+
wallTime
19+
subrequests
1820
}
1921

2022
quantiles {

src/gql.rs

Lines changed: 58 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use graphql_client::{GraphQLQuery, Response};
44
use opentelemetry_proto::tonic::metrics::v1::Metric;
55
use prometheus::{CounterVec, GaugeVec, Opts, Registry};
66
use crate::metrics::prometheus_registry_to_opentelemetry_metrics;
7-
use web_time::SystemTime;
87
use worker::console_log;
98

109
// The paths are relative to the directory where your `Cargo.toml` is located.
@@ -72,9 +71,11 @@ type float64 = f64;
7271
#[allow(non_camel_case_types)]
7372
type uint16 = u16;
7473

75-
pub async fn do_get_workers_analytics_query(cloudflare_api_url: &String, cloudflare_api_key: &String, variables: get_workers_analytics_query::Variables) -> Result<Vec<Metric>, Box<dyn Error>> {
74+
pub async fn do_get_workers_analytics_query(cloudflare_api_url: &String, cloudflare_api_key: &String, variables: get_workers_analytics_query::Variables, debug_logging: bool, fallback_timestamp_nanos: u64) -> Result<Vec<Metric>, Box<dyn Error>> {
7675
let request_body = GetWorkersAnalyticsQuery::build_query(variables);
77-
console_log!("[Workers] GraphQL request: {}", serde_json::to_string_pretty(&request_body).unwrap_or_default());
76+
if debug_logging {
77+
console_log!("[Workers] GraphQL request: {}", serde_json::to_string_pretty(&request_body).unwrap_or_default());
78+
}
7879
let client = reqwest::Client::new();
7980
let res = client.post(cloudflare_api_url)
8081
.bearer_auth(cloudflare_api_key)
@@ -86,7 +87,9 @@ pub async fn do_get_workers_analytics_query(cloudflare_api_url: &String, cloudfl
8687
}
8788

8889
let response_text = res.text().await?;
89-
console_log!("[Workers] GraphQL response: {}", response_text);
90+
if debug_logging {
91+
console_log!("[Workers] GraphQL response: {}", response_text);
92+
}
9093
let response_body: Response<get_workers_analytics_query::ResponseData> = serde_json::from_str(&response_text)?;
9194
if response_body.errors.is_some() {
9295
console_log!("[Workers] GraphQL query failed: {:?}", response_body.errors);
@@ -111,6 +114,14 @@ pub async fn do_get_workers_analytics_query(cloudflare_api_url: &String, cloudfl
111114
let worker_duration = GaugeVec::new(worker_duration_opts, &["script_name", "quantile"]).unwrap();
112115
registry.register(Box::new(worker_duration.clone())).unwrap();
113116

117+
let worker_wall_time_opts = Opts::new("cloudflare_worker_wall_time", "Sum of wall time - microseconds");
118+
let worker_wall_time = CounterVec::new(worker_wall_time_opts, &["script_name"]).unwrap();
119+
registry.register(Box::new(worker_wall_time.clone())).unwrap();
120+
121+
let worker_subrequests_opts = Opts::new("cloudflare_worker_subrequests", "Sum of subrequests");
122+
let worker_subrequests = CounterVec::new(worker_subrequests_opts, &["script_name"]).unwrap();
123+
registry.register(Box::new(worker_subrequests.clone())).unwrap();
124+
114125
let mut last_datetime: Option<Time> = None;
115126
for account in response_data.viewer.unwrap().accounts.iter() {
116127
for worker in account.workers_invocations_adaptive.iter() {
@@ -122,6 +133,8 @@ pub async fn do_get_workers_analytics_query(cloudflare_api_url: &String, cloudfl
122133

123134
worker_requests.with_label_values(&[script_name.as_str()]).inc_by(sum.requests as f64);
124135
worker_errors.with_label_values(&[script_name.as_str()]).inc_by(sum.errors as f64);
136+
worker_wall_time.with_label_values(&[script_name.as_str()]).inc_by(sum.wall_time as f64);
137+
worker_subrequests.with_label_values(&[script_name.as_str()]).inc_by(sum.subrequests as f64);
125138
worker_cpu_time.with_label_values(&[script_name.as_str(), "P50"]).set(quantiles.cpu_time_p50 as f64);
126139
worker_cpu_time.with_label_values(&[script_name.as_str(), "P75"]).set(quantiles.cpu_time_p75 as f64);
127140
worker_cpu_time.with_label_values(&[script_name.as_str(), "P99"]).set(quantiles.cpu_time_p99 as f64);
@@ -136,16 +149,16 @@ pub async fn do_get_workers_analytics_query(cloudflare_api_url: &String, cloudfl
136149
let timestamp_nanos: u64 = last_datetime.map(|datetime| {
137150
let datetime: NaiveDateTime = NaiveDateTime::parse_from_str(&datetime, "%+").unwrap();
138151
datetime.and_utc().timestamp_nanos_opt().unwrap_or(0) as u64
139-
}).unwrap_or_else(|| {
140-
systemtime_to_nanos(SystemTime::now())
141-
});
152+
}).unwrap_or(fallback_timestamp_nanos);
142153

143154
Ok(prometheus_registry_to_opentelemetry_metrics(registry, timestamp_nanos))
144155
}
145156

146-
pub async fn do_get_d1_analytics_query(cloudflare_api_url: &String, cloudflare_api_key: &String, variables: get_d1_analytics_query::Variables) -> Result<Vec<Metric>, Box<dyn Error>> {
157+
pub async fn do_get_d1_analytics_query(cloudflare_api_url: &String, cloudflare_api_key: &String, variables: get_d1_analytics_query::Variables, debug_logging: bool, fallback_timestamp_nanos: u64) -> Result<Vec<Metric>, Box<dyn Error>> {
147158
let request_body = GetD1AnalyticsQuery::build_query(variables);
148-
console_log!("[D1] GraphQL request: {}", serde_json::to_string_pretty(&request_body).unwrap_or_default());
159+
if debug_logging {
160+
console_log!("[D1] GraphQL request: {}", serde_json::to_string_pretty(&request_body).unwrap_or_default());
161+
}
149162
let client = reqwest::Client::new();
150163
let res = client.post(cloudflare_api_url)
151164
.bearer_auth(cloudflare_api_key)
@@ -157,7 +170,9 @@ pub async fn do_get_d1_analytics_query(cloudflare_api_url: &String, cloudflare_a
157170
}
158171

159172
let response_text = res.text().await?;
160-
console_log!("[D1] GraphQL response: {}", response_text);
173+
if debug_logging {
174+
console_log!("[D1] GraphQL response: {}", response_text);
175+
}
161176
let response_body: Response<get_d1_analytics_query::ResponseData> = serde_json::from_str(&response_text)?;
162177
if response_body.errors.is_some() {
163178
console_log!("[D1] GraphQL query failed: {:?}", response_body.errors);
@@ -214,16 +229,16 @@ pub async fn do_get_d1_analytics_query(cloudflare_api_url: &String, cloudflare_a
214229
let timestamp_nanos: u64 = last_datetime.map(|datetime| {
215230
let datetime: NaiveDateTime = NaiveDateTime::parse_from_str(&datetime, "%+").unwrap();
216231
datetime.and_utc().timestamp_nanos_opt().unwrap_or(0) as u64
217-
}).unwrap_or_else(|| {
218-
systemtime_to_nanos(SystemTime::now())
219-
});
232+
}).unwrap_or(fallback_timestamp_nanos);
220233

221234
Ok(prometheus_registry_to_opentelemetry_metrics(registry, timestamp_nanos))
222235
}
223236

224-
pub async fn do_get_durableobjects_analytics_query(cloudflare_api_url: &String, cloudflare_api_key: &String, variables: get_durable_objects_analytics_query::Variables) -> Result<Vec<Metric>, Box<dyn Error>> {
237+
pub async fn do_get_durableobjects_analytics_query(cloudflare_api_url: &String, cloudflare_api_key: &String, variables: get_durable_objects_analytics_query::Variables, debug_logging: bool, fallback_timestamp_nanos: u64) -> Result<Vec<Metric>, Box<dyn Error>> {
225238
let request_body = GetDurableObjectsAnalyticsQuery::build_query(variables);
226-
console_log!("[DurableObjects] GraphQL request: {}", serde_json::to_string_pretty(&request_body).unwrap_or_default());
239+
if debug_logging {
240+
console_log!("[DurableObjects] GraphQL request: {}", serde_json::to_string_pretty(&request_body).unwrap_or_default());
241+
}
227242
let client = reqwest::Client::new();
228243
let res = client.post(cloudflare_api_url)
229244
.bearer_auth(cloudflare_api_key)
@@ -235,7 +250,9 @@ pub async fn do_get_durableobjects_analytics_query(cloudflare_api_url: &String,
235250
}
236251

237252
let response_text = res.text().await?;
238-
console_log!("[DurableObjects] GraphQL response: {}", response_text);
253+
if debug_logging {
254+
console_log!("[DurableObjects] GraphQL response: {}", response_text);
255+
}
239256
let response_body: Response<get_durable_objects_analytics_query::ResponseData> = serde_json::from_str(&response_text)?;
240257
if response_body.errors.is_some() {
241258
console_log!("[DurableObjects] GraphQL query failed: {:?}", response_body.errors);
@@ -291,16 +308,16 @@ pub async fn do_get_durableobjects_analytics_query(cloudflare_api_url: &String,
291308
let timestamp_nanos: u64 = last_datetime.map(|datetime| {
292309
let datetime: NaiveDateTime = NaiveDateTime::parse_from_str(&datetime, "%+").unwrap();
293310
datetime.and_utc().timestamp_nanos_opt().unwrap_or(0) as u64
294-
}).unwrap_or_else(|| {
295-
systemtime_to_nanos(SystemTime::now())
296-
});
311+
}).unwrap_or(fallback_timestamp_nanos);
297312

298313
Ok(prometheus_registry_to_opentelemetry_metrics(registry, timestamp_nanos))
299314
}
300315

301-
pub async fn do_get_queue_backlog_analytics_query(cloudflare_api_url: &String, cloudflare_api_key: &String, variables: get_queue_backlog_analytics_query::Variables) -> Result<Vec<Metric>, Box<dyn Error>> {
316+
pub async fn do_get_queue_backlog_analytics_query(cloudflare_api_url: &String, cloudflare_api_key: &String, variables: get_queue_backlog_analytics_query::Variables, debug_logging: bool, fallback_timestamp_nanos: u64) -> Result<Vec<Metric>, Box<dyn Error>> {
302317
let request_body = GetQueueBacklogAnalyticsQuery::build_query(variables);
303-
console_log!("[QueueBacklog] GraphQL request: {}", serde_json::to_string_pretty(&request_body).unwrap_or_default());
318+
if debug_logging {
319+
console_log!("[QueueBacklog] GraphQL request: {}", serde_json::to_string_pretty(&request_body).unwrap_or_default());
320+
}
304321
let client = reqwest::Client::new();
305322
let res = client.post(cloudflare_api_url)
306323
.bearer_auth(cloudflare_api_key)
@@ -312,7 +329,9 @@ pub async fn do_get_queue_backlog_analytics_query(cloudflare_api_url: &String, c
312329
}
313330

314331
let response_text = res.text().await?;
315-
console_log!("[QueueBacklog] GraphQL response: {}", response_text);
332+
if debug_logging {
333+
console_log!("[QueueBacklog] GraphQL response: {}", response_text);
334+
}
316335
let response_body: Response<get_queue_backlog_analytics_query::ResponseData> = serde_json::from_str(&response_text)?;
317336
if response_body.errors.is_some() {
318337
console_log!("[QueueBacklog] GraphQL query failed: {:?}", response_body.errors);
@@ -350,16 +369,16 @@ pub async fn do_get_queue_backlog_analytics_query(cloudflare_api_url: &String, c
350369
let timestamp_nanos: u64 = last_datetime.map(|datetime| {
351370
let datetime: NaiveDateTime = NaiveDateTime::parse_from_str(&datetime, "%+").unwrap();
352371
datetime.and_utc().timestamp_nanos_opt().unwrap_or(0) as u64
353-
}).unwrap_or_else(|| {
354-
systemtime_to_nanos(SystemTime::now())
355-
});
372+
}).unwrap_or(fallback_timestamp_nanos);
356373

357374
Ok(prometheus_registry_to_opentelemetry_metrics(registry, timestamp_nanos))
358375
}
359376

360-
pub async fn do_get_queue_operations_analytics_query(cloudflare_api_url: &String, cloudflare_api_key: &String, variables: get_queue_operations_analytics_query::Variables) -> Result<Vec<Metric>, Box<dyn Error>> {
377+
pub async fn do_get_queue_operations_analytics_query(cloudflare_api_url: &String, cloudflare_api_key: &String, variables: get_queue_operations_analytics_query::Variables, debug_logging: bool, fallback_timestamp_nanos: u64) -> Result<Vec<Metric>, Box<dyn Error>> {
361378
let request_body = GetQueueOperationsAnalyticsQuery::build_query(variables);
362-
console_log!("[QueueOperations] GraphQL request: {}", serde_json::to_string_pretty(&request_body).unwrap_or_default());
379+
if debug_logging {
380+
console_log!("[QueueOperations] GraphQL request: {}", serde_json::to_string_pretty(&request_body).unwrap_or_default());
381+
}
363382
let client = reqwest::Client::new();
364383
let res = client.post(cloudflare_api_url)
365384
.bearer_auth(cloudflare_api_key)
@@ -371,7 +390,9 @@ pub async fn do_get_queue_operations_analytics_query(cloudflare_api_url: &String
371390
}
372391

373392
let response_text = res.text().await?;
374-
console_log!("[QueueOperations] GraphQL response: {}", response_text);
393+
if debug_logging {
394+
console_log!("[QueueOperations] GraphQL response: {}", response_text);
395+
}
375396
let response_body: Response<get_queue_operations_analytics_query::ResponseData> = serde_json::from_str(&response_text)?;
376397
if response_body.errors.is_some() {
377398
console_log!("[QueueOperations] GraphQL query failed: {:?}", response_body.errors);
@@ -424,16 +445,16 @@ pub async fn do_get_queue_operations_analytics_query(cloudflare_api_url: &String
424445
let timestamp_nanos: u64 = last_datetime.map(|datetime| {
425446
let datetime: NaiveDateTime = NaiveDateTime::parse_from_str(&datetime, "%+").unwrap();
426447
datetime.and_utc().timestamp_nanos_opt().unwrap_or(0) as u64
427-
}).unwrap_or_else(|| {
428-
systemtime_to_nanos(SystemTime::now())
429-
});
448+
}).unwrap_or(fallback_timestamp_nanos);
430449

431450
Ok(prometheus_registry_to_opentelemetry_metrics(registry, timestamp_nanos))
432451
}
433452

434-
pub async fn do_get_zone_http_requests_query(cloudflare_api_url: &String, cloudflare_api_key: &String, variables: get_zone_http_requests_query::Variables) -> Result<Vec<Metric>, Box<dyn Error>> {
453+
pub async fn do_get_zone_http_requests_query(cloudflare_api_url: &String, cloudflare_api_key: &String, variables: get_zone_http_requests_query::Variables, debug_logging: bool, fallback_timestamp_nanos: u64) -> Result<Vec<Metric>, Box<dyn Error>> {
435454
let request_body = GetZoneHttpRequestsQuery::build_query(variables);
436-
console_log!("[ZoneHttpRequests] GraphQL request: {}", serde_json::to_string_pretty(&request_body).unwrap_or_default());
455+
if debug_logging {
456+
console_log!("[ZoneHttpRequests] GraphQL request: {}", serde_json::to_string_pretty(&request_body).unwrap_or_default());
457+
}
437458
let client = reqwest::Client::new();
438459
let res = client.post(cloudflare_api_url)
439460
.bearer_auth(cloudflare_api_key)
@@ -445,7 +466,9 @@ pub async fn do_get_zone_http_requests_query(cloudflare_api_url: &String, cloudf
445466
}
446467

447468
let response_text = res.text().await?;
448-
console_log!("[ZoneHttpRequests] GraphQL response: {}", response_text);
469+
if debug_logging {
470+
console_log!("[ZoneHttpRequests] GraphQL response: {}", response_text);
471+
}
449472
let response_body: Response<get_zone_http_requests_query::ResponseData> = serde_json::from_str(&response_text)?;
450473
if response_body.errors.is_some() {
451474
console_log!("[ZoneHttpRequests] GraphQL query failed: {:?}", response_body.errors);
@@ -487,14 +510,8 @@ pub async fn do_get_zone_http_requests_query(cloudflare_api_url: &String, cloudf
487510
let timestamp_nanos: u64 = last_datetime.map(|datetime| {
488511
let datetime: NaiveDateTime = NaiveDateTime::parse_from_str(&datetime, "%+").unwrap();
489512
datetime.and_utc().timestamp_nanos_opt().unwrap_or(0) as u64
490-
}).unwrap_or_else(|| {
491-
systemtime_to_nanos(SystemTime::now())
492-
});
513+
}).unwrap_or(fallback_timestamp_nanos);
493514

494515
Ok(prometheus_registry_to_opentelemetry_metrics(registry, timestamp_nanos))
495516
}
496517

497-
fn systemtime_to_nanos(time: web_time::SystemTime) -> u64 {
498-
let duration = time.duration_since(web_time::SystemTime::UNIX_EPOCH).unwrap();
499-
duration.as_nanos() as u64
500-
}

src/lib.rs

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,16 @@ use crate::gql::{get_workers_analytics_query, do_get_workers_analytics_query, do
1212
mod gql;
1313
mod metrics;
1414

15+
const DEFAULT_SCRAPE_DELAY_SECONDS: i64 = 300;
16+
17+
fn get_scrape_delay_seconds(env: &Env) -> i64 {
18+
env.var("SCRAPE_DELAY")
19+
.ok()
20+
.and_then(|val| val.to_string().parse::<i64>().ok())
21+
.filter(|val| *val >= 0)
22+
.unwrap_or(DEFAULT_SCRAPE_DELAY_SECONDS)
23+
}
24+
1525
#[worker::send]
1626
pub async fn do_fetch(
1727
url: String,
@@ -60,9 +70,15 @@ async fn do_trigger(env: Env) -> Result<()> {
6070
let cloudflare_api_url = env.var("CLOUDFLARE_API_URL")?.to_string();
6171
let cloudflare_api_key = env.var("CLOUDFLARE_API_KEY")?.to_string();
6272
let cloudflare_account_id = env.var("CLOUDFLARE_ACCOUNT_ID")?.to_string();
73+
let debug_logging: bool = match env.var("DEBUG_LOGGING") {
74+
Ok(val) => matches!(val.to_string().to_lowercase().as_str(), "true" | "1" | "yes"),
75+
Err(_) => false,
76+
};
6377

64-
let end = chrono::Utc::now().round_subsecs(0);
78+
let scrape_delay_seconds = get_scrape_delay_seconds(&env);
79+
let end = (chrono::Utc::now() - chrono::Duration::seconds(scrape_delay_seconds)).round_subsecs(0);
6580
let start = (end - chrono::Duration::minutes(1)).round_subsecs(0);
81+
let fallback_timestamp_nanos = end.timestamp_nanos_opt().unwrap_or(0) as u64;
6682

6783
console_log!("Fetching!");
6884
let mut all_metrics = Vec::new();
@@ -72,7 +88,7 @@ async fn do_trigger(env: Env) -> Result<()> {
7288
datetime_start: start.to_rfc3339(),
7389
datetime_end: end.to_rfc3339(),
7490
limit: 9999,
75-
}).await;
91+
}, debug_logging, fallback_timestamp_nanos).await;
7692
match result {
7793
Ok(metrics) => {
7894
for metric in metrics {
@@ -90,7 +106,7 @@ async fn do_trigger(env: Env) -> Result<()> {
90106
datetime_start: start.to_rfc3339(),
91107
datetime_end: end.to_rfc3339(),
92108
limit: 9999,
93-
}).await;
109+
}, debug_logging, fallback_timestamp_nanos).await;
94110
match result {
95111
Ok(metrics) => {
96112
for metric in metrics {
@@ -108,7 +124,7 @@ async fn do_trigger(env: Env) -> Result<()> {
108124
datetime_start: start.to_rfc3339(),
109125
datetime_end: end.to_rfc3339(),
110126
limit: 9999,
111-
}).await;
127+
}, debug_logging, fallback_timestamp_nanos).await;
112128
match result {
113129
Ok(metrics) => {
114130
for metric in metrics {
@@ -126,7 +142,7 @@ async fn do_trigger(env: Env) -> Result<()> {
126142
datetime_start: start.to_rfc3339(),
127143
datetime_end: end.to_rfc3339(),
128144
limit: 9999,
129-
}).await;
145+
}, debug_logging, fallback_timestamp_nanos).await;
130146
match result {
131147
Ok(metrics) => {
132148
for metric in metrics {
@@ -144,7 +160,7 @@ async fn do_trigger(env: Env) -> Result<()> {
144160
datetime_start: start.to_rfc3339(),
145161
datetime_end: end.to_rfc3339(),
146162
limit: 9999,
147-
}).await;
163+
}, debug_logging, fallback_timestamp_nanos).await;
148164
match result {
149165
Ok(metrics) => {
150166
for metric in metrics {
@@ -171,7 +187,7 @@ async fn do_trigger(env: Env) -> Result<()> {
171187
datetime_start: start.to_rfc3339(),
172188
datetime_end: end.to_rfc3339(),
173189
limit: 9999,
174-
}).await;
190+
}, debug_logging, fallback_timestamp_nanos).await;
175191
match result {
176192
Ok(metrics) => {
177193
for metric in metrics {
@@ -188,10 +204,10 @@ async fn do_trigger(env: Env) -> Result<()> {
188204

189205
console_log!("Done fetching!");
190206

191-
do_push_metrics(env, all_metrics).await
207+
do_push_metrics(env, all_metrics, debug_logging).await
192208
}
193209

194-
async fn do_push_metrics(env: Env, metrics: Vec<Metric>) -> Result<()> {
210+
async fn do_push_metrics(env: Env, metrics: Vec<Metric>, debug_logging: bool) -> Result<()> {
195211
let metrics_url = env.var("METRICS_URL")?.to_string();
196212
let otlp_headers = match env.var("OTLP_HEADERS") {
197213
Ok(val) => val.to_string(),
@@ -225,8 +241,10 @@ async fn do_push_metrics(env: Env, metrics: Vec<Metric>) -> Result<()> {
225241
};
226242

227243
// Log the OTLP payload as JSON for debugging
228-
let metrics_json_for_logging = serde_json::to_string_pretty(&export_request).unwrap();
229-
console_log!("OTLP metrics payload:\n{}", metrics_json_for_logging);
244+
if debug_logging {
245+
let metrics_json_for_logging = serde_json::to_string_pretty(&export_request).unwrap();
246+
console_log!("OTLP metrics payload:\n{}", metrics_json_for_logging);
247+
}
230248

231249
let js_value: JsValue;
232250
let content_type: String;

0 commit comments

Comments
 (0)