Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 98 additions & 47 deletions src/kafka/inflight_activation_batcher.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
use super::consumer::{
ReduceConfig, ReduceShutdownBehaviour, ReduceShutdownCondition, Reducer,
ReducerWhenFullBehaviour,
};
use super::utils::tag_for_forwarding;
use crate::{
config::Config, runtime_config::RuntimeConfigManager,
store::inflight_activation::InflightActivation,
};
use chrono::Utc;
use futures::future::join_all;
use rdkafka::config::ClientConfig;
use rdkafka::{
producer::{FutureProducer, FutureRecord},
util::Timeout,
};
use std::{mem::replace, sync::Arc, time::Duration};

use super::consumer::{
ReduceConfig, ReduceShutdownBehaviour, ReduceShutdownCondition, Reducer,
ReducerWhenFullBehaviour,
};
use super::utils::modify_activation_namespace;

pub struct ActivationBatcherConfig {
pub kafka_config: ClientConfig,
pub kafka_long_topic: String,
Expand All @@ -42,7 +42,7 @@ impl ActivationBatcherConfig {
pub struct InflightActivationBatcher {
batch: Vec<InflightActivation>,
batch_size: usize,
forwarded_count: usize,
forward_batch: Vec<Vec<u8>>, // payload
config: ActivationBatcherConfig,
runtime_config_manager: Arc<RuntimeConfigManager>,
producer: Arc<FutureProducer>,
Expand All @@ -62,7 +62,7 @@ impl InflightActivationBatcher {
Self {
batch: Vec::with_capacity(config.max_batch_len),
batch_size: 0,
forwarded_count: 0,
forward_batch: Vec::with_capacity(config.max_batch_len),
config,
runtime_config_manager,
producer,
Expand Down Expand Up @@ -94,36 +94,24 @@ impl Reducer for InflightActivationBatcher {
}

if runtime_config.demoted_namespaces.contains(namespace) {
metrics::counter!(
"filter.forward_task_demoted_namespace",
"namespace" => namespace.clone(),
"taskname" => task_name.clone(),
)
.increment(1);

// The default demoted topic to forward tasks to is config.kafka_long_topic if not set in runtime config.
let topic = runtime_config
.demoted_topic
.clone()
.unwrap_or(self.config.kafka_long_topic.clone());

if let Ok(modified_activation) = modify_activation_namespace(&t.activation) {
let delivery = self
.producer
.send(
FutureRecord::<(), Vec<u8>>::to(&topic).payload(&modified_activation),
Timeout::After(Duration::from_millis(self.config.send_timeout_ms)),
)
.await;
if delivery.is_ok() {
metrics::counter!("filter.forward_task_demoted_namespace_success",
match tag_for_forwarding(&t.activation) {
Ok(Some(tagged_activation)) => {
// Forward it
metrics::counter!(
"filter.forward_task_demoted_namespace",
"namespace" => namespace.clone(),
"taskname" => task_name.clone(),
)
.increment(1);
self.forwarded_count += 1;
self.forward_batch.push(tagged_activation);
return Ok(());
}
Ok(None) => {
// Already forwarded, fall through to add to batch
}
Err(_) => {
// Decode error, fall through to add to batch to handle in upkeep
}
}
}

Expand All @@ -134,19 +122,42 @@ impl Reducer for InflightActivationBatcher {
}

async fn flush(&mut self) -> Result<Option<Self::Output>, anyhow::Error> {
if self.batch.is_empty() && self.forwarded_count == 0 {
if self.batch.is_empty() && self.forward_batch.is_empty() {
return Ok(None);
}

if self.batch.is_empty() {
metrics::histogram!("consumer.forwarded_rows").record(self.forwarded_count as f64);
} else {
let runtime_config = self.runtime_config_manager.read().await;

if !self.batch.is_empty() {
metrics::histogram!("consumer.batch_rows").record(self.batch.len() as f64);
metrics::histogram!("consumer.batch_bytes").record(self.batch_size as f64);
}

// Send all forward batch in parallel
if !self.forward_batch.is_empty() {
// The default demoted topic to forward tasks to is config.kafka_long_topic if not set in runtime config.
let topic = runtime_config
.demoted_topic
.clone()
.unwrap_or(self.config.kafka_long_topic.clone());
let sends = self.forward_batch.iter().map(|payload| {
self.producer.send(
FutureRecord::<(), Vec<u8>>::to(&topic).payload(payload),
Timeout::After(Duration::from_millis(self.config.send_timeout_ms)),
)
});

let results = join_all(sends).await;
let success_count = results.iter().filter(|r| r.is_ok()).count();

metrics::histogram!("consumer.forwarded_rows").record(success_count as f64);
metrics::counter!("filter.forward_task_demoted_namespace_success")
.increment(success_count as u64);

self.forward_batch.clear();
}

self.batch_size = 0;
self.forwarded_count = 0;

Ok(Some(replace(
&mut self.batch,
Expand All @@ -156,7 +167,7 @@ impl Reducer for InflightActivationBatcher {

fn reset(&mut self) {
self.batch_size = 0;
self.forwarded_count = 0;
self.forward_batch.clear();
self.batch.clear();
}

Expand All @@ -181,7 +192,7 @@ mod tests {
ActivationBatcherConfig, Config, InflightActivation, InflightActivationBatcher, Reducer,
RuntimeConfigManager,
};
use crate::kafka::utils::modify_activation_namespace;
use crate::kafka::utils::tag_for_forwarding;
use chrono::Utc;
use std::collections::HashMap;
use tokio::fs;
Expand All @@ -193,7 +204,7 @@ mod tests {
use crate::store::inflight_activation::InflightActivationStatus;

#[test]
fn test_modify_namespace_for_forwarding() {
fn test_tag_for_forwarding() {
let original = TaskActivation {
id: "test-id".to_string(),
namespace: "bad_namespace".to_string(),
Expand All @@ -208,17 +219,28 @@ mod tests {
};

let encoded = original.encode_to_vec();
let modified = modify_activation_namespace(&encoded).unwrap();
let decoded = TaskActivation::decode(&modified as &[u8]).unwrap();
let tagged = tag_for_forwarding(&encoded).unwrap();

// Namespace should be changed to "long"
assert_eq!(decoded.namespace, "bad_namespace_long");
assert!(
tagged.is_some(),
"Should return Some for untagged activation"
);
let decoded = TaskActivation::decode(&tagged.unwrap() as &[u8]).unwrap();

// Namespace should be preserved (not modified)
assert_eq!(decoded.namespace, "bad_namespace");

// Should have forwarded header added
assert_eq!(
decoded.headers.get("forwarded"),
Some(&"true".to_string()),
"Should have forwarded header set to true"
);

// All other fields should be preserved
assert_eq!(decoded.id, original.id);
assert_eq!(decoded.taskname, original.taskname);
assert_eq!(decoded.parameters, original.parameters);
assert_eq!(decoded.headers, original.headers);
assert_eq!(decoded.received_at, original.received_at);
assert_eq!(decoded.retry_state, original.retry_state);
assert_eq!(
Expand All @@ -230,9 +252,38 @@ mod tests {
}

#[test]
fn test_modify_namespace_for_forwarding_decode_error() {
fn test_tag_for_forwarding_already_tagged() {
let mut original = TaskActivation {
id: "test-id".to_string(),
namespace: "bad_namespace".to_string(),
taskname: "test_task".to_string(),
parameters: r#"{"key":"value"}"#.to_string(),
headers: HashMap::new(),
received_at: None,
retry_state: None,
processing_deadline_duration: 60,
expires: Some(300),
delay: Some(10),
};

// Pre-tag it
original
.headers
.insert("forwarded".to_string(), "true".to_string());

let encoded = original.encode_to_vec();
let result = tag_for_forwarding(&encoded).unwrap();

assert!(
result.is_none(),
"Should return None for already tagged activation"
);
}

#[test]
fn test_tag_for_forwarding_decode_error() {
let invalid_bytes = vec![0xFF, 0xFF, 0xFF]; // Invalid protobuf
let result = modify_activation_namespace(&invalid_bytes);
let result = tag_for_forwarding(&invalid_bytes);
assert!(result.is_err());
}

Expand Down
21 changes: 12 additions & 9 deletions src/kafka/utils.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
use prost::Message;
use sentry_protos::taskbroker::v1::TaskActivation;

/// The default namespace suffix for demoted/forwarded tasks
pub const DEFAULT_DEMOTED_NAMESPACE_SUFFIX: &str = "_long";

/// Modifies a TaskActivation's namespace for forwarding to demoted topic.
/// Tags a TaskActivation for forwarding to demoted topic by adding a header marker.
/// This prevents infinite forwarding loops when tasks are received by another
/// broker with the same demoted_namespaces configuration.
///
/// # Arguments
/// * `activation_bytes` - The serialized TaskActivation protobuf
/// * `activation_bytes` - the activation protobuf of a TaskActivation
///
/// # Returns
/// * `Ok(Vec<u8>)` - The protobuf of the TaskActivation with namespace modified
/// * `Ok(Some(Vec<u8>))` - The protobuf of the TaskActivation with forwarding header added
/// * `Ok(None)` - Already tagged for forwarding (skip forwarding)
/// * `Err(DecodeError)` - If the protobuf cannot be decoded
pub fn modify_activation_namespace(activation_bytes: &[u8]) -> Result<Vec<u8>, prost::DecodeError> {
pub fn tag_for_forwarding(activation_bytes: &[u8]) -> Result<Option<Vec<u8>>, prost::DecodeError> {
let mut activation = TaskActivation::decode(activation_bytes)?;
activation.namespace += DEFAULT_DEMOTED_NAMESPACE_SUFFIX;
Ok(activation.encode_to_vec())
if activation.headers.get("forwarded").map(|v| v.as_str()) == Some("true") {
return Ok(None);
}
activation
.headers
.insert("forwarded".to_string(), "true".to_string());
Ok(Some(activation.encode_to_vec()))
}
17 changes: 10 additions & 7 deletions src/upkeep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use uuid::Uuid;
use crate::{
SERVICE_NAME,
config::Config,
kafka::utils::modify_activation_namespace,
kafka::utils::tag_for_forwarding,
runtime_config::RuntimeConfigManager,
store::inflight_activation::{InflightActivationStatus, InflightActivationStore},
};
Expand Down Expand Up @@ -301,20 +301,23 @@ pub async fn do_upkeep(
let topic = runtime_config.demoted_topic.clone().unwrap_or(config.kafka_long_topic.clone());

async move {
metrics::counter!("upkeep.forward_demoted_namespace", "namespace" => inflight.namespace, "taskname" => inflight.taskname).increment(1);
metrics::counter!("upkeep.forward_demoted_namespace", "namespace" => inflight.namespace.clone(), "taskname" => inflight.taskname.clone()).increment(1);

let modified_activation = match modify_activation_namespace(&inflight.activation) {
Ok(modified_activation) => modified_activation,
let tagged_activation = match tag_for_forwarding(&inflight.activation) {
Ok(Some(tagged)) => tagged,
Ok(None) => {
return Ok(inflight.id);
}
Err(err) => {
error!("forward_demoted_namespace.modify_namespace.failure: {}", err);
return Err(anyhow::anyhow!("failed to modify namespace: {}", err));
error!("forward_demoted_namespace.tag_failure: {}", err);
return Err(anyhow::anyhow!("failed to tag for forwarding: {}", err));
}
};

let delivery = producer
.send(
FutureRecord::<(), Vec<u8>>::to(&topic)
.payload(&modified_activation),
.payload(&tagged_activation),
Timeout::After(Duration::from_millis(config.kafka_send_timeout_ms)),
)
.await;
Expand Down
Loading