Skip to content

Commit 82d2512

Browse files
authored
fix: Reset received_at for retry tasks (#75)
We base latency metrics off of `activation.received_at`. Retry activations should have their received_at reset so that when they are exeucted their latency doesn't include the latency from the original activation. Refs #68
1 parent b77db7c commit 82d2512

File tree

2 files changed

+25
-3
lines changed

2 files changed

+25
-3
lines changed

src/test_utils.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::{
1515
InflightActivation, InflightActivationStatus, InflightActivationStore,
1616
},
1717
};
18-
use chrono::Utc;
18+
use chrono::{Timelike, Utc};
1919
use sentry_protos::sentry::v1::TaskActivation;
2020

2121
/// Generate a unique filename for isolated SQLite databases.
@@ -28,6 +28,7 @@ pub fn generate_temp_filename() -> String {
2828
pub fn make_activations(count: u32) -> Vec<InflightActivation> {
2929
let mut records: Vec<InflightActivation> = vec![];
3030
for i in 0..count {
31+
let now = Utc::now();
3132
#[allow(deprecated)]
3233
let item = InflightActivation {
3334
activation: TaskActivation {
@@ -37,8 +38,8 @@ pub fn make_activations(count: u32) -> Vec<InflightActivation> {
3738
parameters: "{}".into(),
3839
headers: HashMap::new(),
3940
received_at: Some(prost_types::Timestamp {
40-
seconds: Utc::now().timestamp(),
41-
nanos: 0,
41+
seconds: now.timestamp(),
42+
nanos: now.nanosecond() as i32,
4243
}),
4344
deadline: None,
4445
retry_state: None,

src/upkeep.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
use chrono::{Timelike, Utc};
12
use prost::Message;
3+
use prost_types::Timestamp;
24
use rdkafka::{
35
producer::{FutureProducer, FutureRecord},
46
util::Timeout,
@@ -192,7 +194,12 @@ pub async fn do_upkeep(
192194
fn create_retry_activation(inflight_activation: &InflightActivation) -> TaskActivation {
193195
let mut new_activation = inflight_activation.activation.clone();
194196

197+
let now = Utc::now();
195198
new_activation.id = Uuid::new_v4().into();
199+
new_activation.received_at = Some(Timestamp {
200+
seconds: now.timestamp(),
201+
nanos: now.nanosecond() as i32,
202+
});
196203
if new_activation.retry_state.is_some() {
197204
new_activation.retry_state.as_mut().unwrap().attempts += 1;
198205
}
@@ -206,6 +213,7 @@ mod tests {
206213
use std::sync::Arc;
207214

208215
use chrono::{TimeDelta, TimeZone, Utc};
216+
use prost_types::Timestamp;
209217
use sentry_protos::sentry::v1::RetryState;
210218

211219
use crate::{
@@ -231,6 +239,12 @@ mod tests {
231239
let store = create_inflight_store().await;
232240
let producer = create_producer(config.clone());
233241
let mut records = make_activations(2);
242+
243+
let old = Utc.with_ymd_and_hms(2024, 12, 1, 0, 0, 0).unwrap();
244+
records[0].activation.received_at = Some(Timestamp {
245+
seconds: old.timestamp(),
246+
nanos: 0,
247+
});
234248
records[0].activation.parameters = r#"{"a":"b"}"#.into();
235249
records[0].status = InflightActivationStatus::Retry;
236250
records[0].activation.retry_state = Some(RetryState {
@@ -255,10 +269,17 @@ mod tests {
255269
assert_ne!(activation.id, records[0].activation.id);
256270
// Should increment the attempt counter
257271
assert_eq!(activation.retry_state.as_ref().unwrap().attempts, 2);
272+
258273
// Retry should retain task and parameters of original task
259274
assert_eq!(activation.taskname, records[0].activation.taskname);
260275
assert_eq!(activation.namespace, records[0].activation.namespace);
261276
assert_eq!(activation.parameters, records[0].activation.parameters);
277+
// received_at should be set be later than the original activation
278+
assert!(
279+
activation.received_at.unwrap().seconds
280+
> records[0].activation.received_at.unwrap().seconds,
281+
"retry activation should have a later timestamp"
282+
);
262283
}
263284

264285
#[tokio::test]

0 commit comments

Comments
 (0)