diff --git a/engine/packages/gasoline/src/builder/workflow/lupe.rs b/engine/packages/gasoline/src/builder/workflow/lupe.rs index 8b45b79615..174736782b 100644 --- a/engine/packages/gasoline/src/builder/workflow/lupe.rs +++ b/engine/packages/gasoline/src/builder/workflow/lupe.rs @@ -117,7 +117,7 @@ impl<'a, S: Serialize + DeserializeOwned> LoopBuilder<'a, S> { // Used to defer loop upsertion for parallelization let mut loop_event_upsert_fut = None; - let mut iteration_dt = Duration::ZERO; + let mut iteration_dt: Option = None; loop { ctx.check_stop()?; @@ -155,7 +155,7 @@ impl<'a, S: Serialize + DeserializeOwned> LoopBuilder<'a, S> { loop_event_commit_res, loop_event_upsert_res, branch_commit_res, - (loop_res, cb_dt), + (cb_res, cb_dt), ) = tokio::join!( async { if let Some(loop_event_init_fut) = loop_event_init_fut.take() { @@ -186,10 +186,12 @@ impl<'a, S: Serialize + DeserializeOwned> LoopBuilder<'a, S> { ) .await?; - // Only record iteration duration if its not a replay - metrics::LOOP_ITERATION_DURATION - .with_label_values(&[&ctx.name().to_string()]) - .observe(iteration_dt.as_secs_f64()); + if let Some(iteration_dt) = &iteration_dt { + // Only record iteration duration if its not a replay + metrics::LOOP_ITERATION_DURATION + .with_label_values(&[&ctx.name().to_string()]) + .observe(iteration_dt.as_secs_f64()); + } } anyhow::Ok(()) @@ -197,10 +199,10 @@ impl<'a, S: Serialize + DeserializeOwned> LoopBuilder<'a, S> { async { let iteration_start_instant = Instant::now(); - ( - cb(&mut iteration_branch, &mut state).await, - iteration_start_instant.elapsed(), - ) + let cb_res = cb(&mut iteration_branch, &mut state).await; + let cb_dt = cb_res.is_ok().then(|| iteration_start_instant.elapsed()); + + (cb_res, cb_dt) } ); @@ -210,8 +212,7 @@ impl<'a, S: Serialize + DeserializeOwned> LoopBuilder<'a, S> { iteration_dt = cb_dt; - // Run loop - match loop_res? { + match cb_res? { Loop::Continue => { iteration += 1;