diff --git a/src/config.rs b/src/config.rs index 61e4335f..0b8c7ff5 100644 --- a/src/config.rs +++ b/src/config.rs @@ -79,7 +79,7 @@ pub struct Config { /// to allow temporary worker deaths to be resolved. pub deadletter_deadline: usize, - // The frequency at which upkeep tasks are spawned. + /// The frequency at which upkeep tasks are spawned. pub upkeep_task_interval_ms: u64, } diff --git a/src/consumer/deserialize_activation.rs b/src/consumer/deserialize_activation.rs index cfad6513..409963be 100644 --- a/src/consumer/deserialize_activation.rs +++ b/src/consumer/deserialize_activation.rs @@ -1,7 +1,8 @@ +use std::ops::Add; use std::{sync::Arc, time::Duration}; use anyhow::{anyhow, Error}; -use chrono::Utc; +use chrono::{MappedLocalTime, TimeZone, Utc}; use prost::Message as _; use rdkafka::{message::OwnedMessage, Message}; use sentry_protos::sentry::v1::TaskActivation; @@ -12,14 +13,14 @@ use crate::{ }; pub struct DeserializeConfig { - pub deadletter_deadline: Option, + pub deadletter_deadline: Duration, } impl DeserializeConfig { /// Convert from application into service configuration pub fn from_config(config: &Config) -> Self { Self { - deadletter_deadline: Some(Duration::from_secs(config.deadletter_deadline as u64)), + deadletter_deadline: Duration::from_secs(config.deadletter_deadline as u64), } } } @@ -40,17 +41,140 @@ pub fn new( .or(Some(false)) .expect("could not access at_most_once"); } + let now = Utc::now(); + + // Determine the deadletter_at time using config and activation expires time. + let mut deadletter_at = now.add(config.deadletter_deadline); + if let Some(expires) = activation.expires { + let expires_duration = Duration::from_secs(expires); + if expires_duration < config.deadletter_deadline { + // Expiry times are based on the time the task was received + // not the time it was dequeued from Kafka. + let activation_received = activation.received_at.map_or(now, |ts| { + match Utc.timestamp_opt(ts.seconds, ts.nanos as u32) { + MappedLocalTime::Single(ts) => ts, + _ => now, + } + }); + deadletter_at = activation_received + expires_duration; + } + } + Ok(InflightActivation { activation, status: InflightActivationStatus::Pending, partition: msg.partition(), offset: msg.offset(), added_at: Utc::now(), - deadletter_at: config - .deadletter_deadline - .map(|duration| Utc::now() + duration), + deadletter_at: Some(deadletter_at), processing_deadline: None, at_most_once, }) } } + +#[cfg(test)] +mod tests { + use std::{collections::HashMap, sync::Arc, time::Duration}; + + use chrono::Utc; + use prost::Message as _; + use rdkafka::{message::OwnedMessage, Timestamp}; + use sentry_protos::sentry::v1::TaskActivation; + + use super::{new, DeserializeConfig}; + + #[test] + fn test_deadletter_from_config() { + let config = DeserializeConfig { + deadletter_deadline: Duration::from_secs(900), + }; + let deserializer = new(config); + let now = Utc::now(); + let the_past = now - Duration::from_secs(60 * 10); + + #[allow(deprecated)] + let activation = TaskActivation { + id: "id_0".into(), + namespace: "namespace".into(), + taskname: "taskname".into(), + parameters: "{}".into(), + headers: HashMap::new(), + // not used when the activation doesn't have expires. + received_at: Some(prost_types::Timestamp { + seconds: the_past.timestamp(), + nanos: 0, + }), + deadline: None, + retry_state: None, + processing_deadline_duration: 10, + expires: None, + }; + let message = OwnedMessage::new( + Some(activation.encode_to_vec()), + None, + "taskworker".into(), + Timestamp::now(), + 0, + 0, + None, + ); + let arc_message = Arc::new(message); + let inflight_opt = deserializer(arc_message); + + assert!(inflight_opt.is_ok()); + let inflight = inflight_opt.unwrap(); + let delta = inflight.deadletter_at.unwrap() - now; + assert!( + delta.num_seconds() >= 900, + "Should have at least 900 seconds of delay from now" + ); + } + + #[test] + fn test_expires_deadletter() { + let config = DeserializeConfig { + deadletter_deadline: Duration::from_secs(900), + }; + let deserializer = new(config); + let now = Utc::now(); + let the_past = now - Duration::from_secs(60 * 10); + + #[allow(deprecated)] + let activation = TaskActivation { + id: "id_0".into(), + namespace: "namespace".into(), + taskname: "taskname".into(), + parameters: "{}".into(), + headers: HashMap::new(), + // used because the activation has expires + received_at: Some(prost_types::Timestamp { + seconds: the_past.timestamp(), + nanos: 0, + }), + deadline: None, + retry_state: None, + processing_deadline_duration: 10, + expires: Some(100), + }; + let message = OwnedMessage::new( + Some(activation.encode_to_vec()), + None, + "taskworker".into(), + Timestamp::now(), + 0, + 0, + None, + ); + let arc_message = Arc::new(message); + let inflight_opt = deserializer(arc_message); + + assert!(inflight_opt.is_ok()); + let inflight = inflight_opt.unwrap(); + let delta = inflight.deadletter_at.unwrap() - the_past; + assert!( + delta.num_seconds() >= 99, + "Should have ~100 seconds of delay from received_at" + ); + } +} diff --git a/src/inflight_activation_store.rs b/src/inflight_activation_store.rs index 0a470089..79b74dda 100644 --- a/src/inflight_activation_store.rs +++ b/src/inflight_activation_store.rs @@ -54,13 +54,30 @@ impl From for InflightActivationStatus { #[derive(Clone, Debug, PartialEq)] pub struct InflightActivation { + /// The protobuf activation that was received from kafka pub activation: TaskActivation, + + /// The current status of the activation pub status: InflightActivationStatus, + + /// The partition the activation was received from pub partition: i32, + + /// The offset the activation had pub offset: i64, + + /// The timestamp when the activation was stored in activation store. pub added_at: DateTime, + + /// The timestamp after which a task should be deadlettered/discarded pub deadletter_at: Option>, + + /// The timestamp for when processing should be complete pub processing_deadline: Option>, + + /// Whether or not the activation uses at_most_once. + /// When enabled activations are not retried when processing_deadlines + /// are exceeded. pub at_most_once: bool, } @@ -403,6 +420,8 @@ impl InflightActivationStore { to_discard.push(activation.id); continue; } + // We could be deadlettering because of activation.expires + // when a task expires we still deadletter if configured. let retry_state = &activation.retry_state.as_ref().unwrap(); if retry_state.discard_after_attempt.is_some() { to_discard.push(activation.id);