Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub struct Config {
/// to allow temporary worker deaths to be resolved.
pub deadletter_deadline: usize,

// The frequency at which upkeep tasks are spawned.
/// The frequency at which upkeep tasks are spawned.
pub upkeep_task_interval_ms: u64,
}

Expand Down
136 changes: 130 additions & 6 deletions src/consumer/deserialize_activation.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::ops::Add;
use std::{sync::Arc, time::Duration};

use anyhow::{anyhow, Error};
use chrono::Utc;
use chrono::{MappedLocalTime, TimeZone, Utc};
use prost::Message as _;
use rdkafka::{message::OwnedMessage, Message};
use sentry_protos::sentry::v1::TaskActivation;
Expand All @@ -12,14 +13,14 @@ use crate::{
};

pub struct DeserializeConfig {
pub deadletter_deadline: Option<Duration>,
pub deadletter_deadline: Duration,
}

impl DeserializeConfig {
/// Convert from application into service configuration
pub fn from_config(config: &Config) -> Self {
Self {
deadletter_deadline: Some(Duration::from_secs(config.deadletter_deadline as u64)),
deadletter_deadline: Duration::from_secs(config.deadletter_deadline as u64),
}
}
}
Expand All @@ -40,17 +41,140 @@ pub fn new(
.or(Some(false))
.expect("could not access at_most_once");
}
let now = Utc::now();

// Determine the deadletter_at time using config and activation expires time.
let mut deadletter_at = now.add(config.deadletter_deadline);
if let Some(expires) = activation.expires {
let expires_duration = Duration::from_secs(expires);
if expires_duration < config.deadletter_deadline {
// Expiry times are based on the time the task was received
// not the time it was dequeued from Kafka.
let activation_received = activation.received_at.map_or(now, |ts| {
match Utc.timestamp_opt(ts.seconds, ts.nanos as u32) {
MappedLocalTime::Single(ts) => ts,
_ => now,
}
});
deadletter_at = activation_received + expires_duration;
}
}

Ok(InflightActivation {
activation,
status: InflightActivationStatus::Pending,
partition: msg.partition(),
offset: msg.offset(),
added_at: Utc::now(),
deadletter_at: config
.deadletter_deadline
.map(|duration| Utc::now() + duration),
deadletter_at: Some(deadletter_at),
processing_deadline: None,
at_most_once,
})
}
}

#[cfg(test)]
mod tests {
use std::{collections::HashMap, sync::Arc, time::Duration};

use chrono::Utc;
use prost::Message as _;
use rdkafka::{message::OwnedMessage, Timestamp};
use sentry_protos::sentry::v1::TaskActivation;

use super::{new, DeserializeConfig};

#[test]
fn test_deadletter_from_config() {
let config = DeserializeConfig {
deadletter_deadline: Duration::from_secs(900),
};
let deserializer = new(config);
let now = Utc::now();
let the_past = now - Duration::from_secs(60 * 10);

#[allow(deprecated)]
let activation = TaskActivation {
id: "id_0".into(),
namespace: "namespace".into(),
taskname: "taskname".into(),
parameters: "{}".into(),
headers: HashMap::new(),
// not used when the activation doesn't have expires.
received_at: Some(prost_types::Timestamp {
seconds: the_past.timestamp(),
nanos: 0,
}),
deadline: None,
retry_state: None,
processing_deadline_duration: 10,
expires: None,
};
let message = OwnedMessage::new(
Some(activation.encode_to_vec()),
None,
"taskworker".into(),
Timestamp::now(),
0,
0,
None,
);
let arc_message = Arc::new(message);
let inflight_opt = deserializer(arc_message);

assert!(inflight_opt.is_ok());
let inflight = inflight_opt.unwrap();
let delta = inflight.deadletter_at.unwrap() - now;
assert!(
delta.num_seconds() >= 900,
"Should have at least 900 seconds of delay from now"
);
}

#[test]
fn test_expires_deadletter() {
let config = DeserializeConfig {
deadletter_deadline: Duration::from_secs(900),
};
let deserializer = new(config);
let now = Utc::now();
let the_past = now - Duration::from_secs(60 * 10);

#[allow(deprecated)]
let activation = TaskActivation {
id: "id_0".into(),
namespace: "namespace".into(),
taskname: "taskname".into(),
parameters: "{}".into(),
headers: HashMap::new(),
// used because the activation has expires
received_at: Some(prost_types::Timestamp {
seconds: the_past.timestamp(),
nanos: 0,
}),
deadline: None,
retry_state: None,
processing_deadline_duration: 10,
expires: Some(100),
};
let message = OwnedMessage::new(
Some(activation.encode_to_vec()),
None,
"taskworker".into(),
Timestamp::now(),
0,
0,
None,
);
let arc_message = Arc::new(message);
let inflight_opt = deserializer(arc_message);

assert!(inflight_opt.is_ok());
let inflight = inflight_opt.unwrap();
let delta = inflight.deadletter_at.unwrap() - the_past;
assert!(
delta.num_seconds() >= 99,
"Should have ~100 seconds of delay from received_at"
);
}
}
19 changes: 19 additions & 0 deletions src/inflight_activation_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,30 @@ impl From<TaskActivationStatus> for InflightActivationStatus {

#[derive(Clone, Debug, PartialEq)]
pub struct InflightActivation {
/// The protobuf activation that was received from kafka
pub activation: TaskActivation,

/// The current status of the activation
pub status: InflightActivationStatus,

/// The partition the activation was received from
pub partition: i32,

/// The offset the activation had
pub offset: i64,

/// The timestamp when the activation was stored in activation store.
pub added_at: DateTime<Utc>,

/// The timestamp after which a task should be deadlettered/discarded
pub deadletter_at: Option<DateTime<Utc>>,

/// The timestamp for when processing should be complete
pub processing_deadline: Option<DateTime<Utc>>,

/// Whether or not the activation uses at_most_once.
/// When enabled activations are not retried when processing_deadlines
/// are exceeded.
pub at_most_once: bool,
}

Expand Down Expand Up @@ -403,6 +420,8 @@ impl InflightActivationStore {
to_discard.push(activation.id);
continue;
}
// We could be deadlettering because of activation.expires
// when a task expires we still deadletter if configured.
let retry_state = &activation.retry_state.as_ref().unwrap();
if retry_state.discard_after_attempt.is_some() {
to_discard.push(activation.id);
Expand Down
Loading