Skip to content

Commit 475902e

Browse files
nipunn1313Convex, Inc.
authored andcommitted
Move cron job span inward (#41123)
So it covers different spans on failures - so the span completes on failure. Also add some tags so we can query effectively. I would love to tag on success/failure, but could not figure that out. GitOrigin-RevId: 1a5d85efcc23d183b300a8524395b0ccc29de66b
1 parent 1ff1ea5 commit 475902e

File tree

2 files changed

+23
-15
lines changed

2 files changed

+23
-15
lines changed

crates/application/src/cron_jobs/mod.rs

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
use std::{
2-
collections::{
3-
BTreeMap,
4-
HashSet,
5-
},
2+
collections::HashSet,
63
sync::Arc,
74
time::Duration,
85
};
@@ -56,6 +53,7 @@ use futures::{
5653
TryStreamExt,
5754
};
5855
use keybroker::Identity;
56+
use maplit::btreemap;
5957
use model::{
6058
backend_state::BackendStateModel,
6159
cron_jobs::{
@@ -102,7 +100,6 @@ const CRON_LOG_MAX_LOG_LINE_LENGTH: usize = 1000;
102100
// refactored later.
103101
pub struct CronJobExecutor<RT: Runtime> {
104102
context: CronJobContext<RT>,
105-
instance_name: String,
106103
running_job_ids: HashSet<ResolvedDocumentId>,
107104
/// Some if there's at least one pending job. May be in the past!
108105
next_job_ready_time: Option<Timestamp>,
@@ -113,6 +110,7 @@ pub struct CronJobExecutor<RT: Runtime> {
113110
#[derive(Clone)]
114111
pub struct CronJobContext<RT: Runtime> {
115112
rt: RT,
113+
instance_name: String,
116114
database: Database<RT>,
117115
runner: Arc<ApplicationFunctionRunner<RT>>,
118116
function_log: FunctionExecutionLog<RT>,
@@ -131,11 +129,11 @@ impl<RT: Runtime> CronJobExecutor<RT> {
131129
let mut executor = Self {
132130
context: CronJobContext {
133131
rt,
132+
instance_name,
134133
database,
135134
runner,
136135
function_log,
137136
},
138-
instance_name,
139137
running_job_ids: HashSet::new(),
140138
next_job_ready_time: None,
141139
job_finished_tx,
@@ -226,12 +224,6 @@ impl<RT: Runtime> CronJobExecutor<RT> {
226224
if next_ts > now || self.running_job_ids.len() == *SCHEDULED_JOB_EXECUTION_PARALLELISM {
227225
return Ok(Some(next_ts));
228226
}
229-
let root = get_sampled_span(
230-
&self.instance_name,
231-
"crons/execute_job",
232-
&mut self.context.rt.rng(),
233-
BTreeMap::new(),
234-
);
235227
let sentry_hub = sentry::Hub::with(|hub| sentry::Hub::new_from_top(hub));
236228
let context = self.context.clone();
237229
let tx = self.job_finished_tx.clone();
@@ -248,7 +240,6 @@ impl<RT: Runtime> CronJobExecutor<RT> {
248240
},
249241
}
250242
}
251-
.in_span(root)
252243
.bind_hub(sentry_hub),
253244
);
254245
self.running_job_ids.insert(job_id);
@@ -261,12 +252,14 @@ impl<RT: Runtime> CronJobContext<RT> {
261252
#[cfg(any(test, feature = "testing"))]
262253
pub fn new(
263254
rt: RT,
255+
instance_name: String,
264256
database: Database<RT>,
265257
runner: Arc<ApplicationFunctionRunner<RT>>,
266258
function_log: FunctionExecutionLog<RT>,
267259
) -> Self {
268260
Self {
269261
rt,
262+
instance_name,
270263
database,
271264
runner,
272265
function_log,
@@ -279,7 +272,20 @@ impl<RT: Runtime> CronJobContext<RT> {
279272
let mut function_backoff = Backoff::new(INITIAL_BACKOFF, MAX_BACKOFF);
280273
loop {
281274
let mutation_retry_count = function_backoff.failures() as usize;
282-
let result = self.run_function(job.clone(), mutation_retry_count).await;
275+
let root = get_sampled_span(
276+
&self.instance_name,
277+
"crons/run_function",
278+
&mut self.rt.rng(),
279+
btreemap! {
280+
"job_id".to_string() => job.id.to_string(),
281+
"component".to_string() => format!("{:?}", job.component),
282+
"job_name".to_string() => job.name.to_string(),
283+
},
284+
);
285+
let result = self
286+
.run_function(job.clone(), mutation_retry_count)
287+
.in_span(root)
288+
.await;
283289
match result {
284290
Ok(result) => {
285291
metrics::log_cron_job_success(function_backoff.failures());
@@ -288,8 +294,9 @@ impl<RT: Runtime> CronJobContext<RT> {
288294
Err(mut e) => {
289295
let delay = function_backoff.fail(&mut self.rt.rng());
290296
tracing::error!(
291-
"System error executing job {}: {}, sleeping {delay:?}",
297+
"System error executing job {} in {:?}: {}, sleeping {delay:?}",
292298
job.id,
299+
job.component,
293300
job.name
294301
);
295302
report_error(&mut e).await;

crates/application/src/test_helpers.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,7 @@ impl<RT: Runtime> ApplicationTestExt<RT> for Application<RT> {
319319
async fn test_one_off_cron_job_executor_run(&self, job: CronJob) -> anyhow::Result<()> {
320320
let test_executor = CronJobContext::new(
321321
self.runtime.clone(),
322+
self.instance_name.clone(),
322323
self.database.clone(),
323324
self.runner.clone(),
324325
self.function_log.clone(),

0 commit comments

Comments
 (0)