Skip to content
Merged
213 changes: 184 additions & 29 deletions src/kafka/inflight_activation_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::{
store::inflight_activation::InflightActivation,
};
use chrono::Utc;
use futures::future::join_all;
use rdkafka::config::ClientConfig;
use rdkafka::{
producer::{FutureProducer, FutureRecord},
Expand All @@ -14,6 +15,7 @@ use super::consumer::{
ReduceConfig, ReduceShutdownBehaviour, ReduceShutdownCondition, Reducer,
ReducerWhenFullBehaviour,
};
use super::utils::tag_for_forwarding;

pub struct ActivationBatcherConfig {
pub kafka_config: ClientConfig,
Expand Down Expand Up @@ -41,6 +43,7 @@ impl ActivationBatcherConfig {
pub struct InflightActivationBatcher {
batch: Vec<InflightActivation>,
batch_size: usize,
forward_batch: Vec<Vec<u8>>, // payload
config: ActivationBatcherConfig,
runtime_config_manager: Arc<RuntimeConfigManager>,
producer: Arc<FutureProducer>,
Expand All @@ -60,6 +63,7 @@ impl InflightActivationBatcher {
Self {
batch: Vec::with_capacity(config.max_batch_len),
batch_size: 0,
forward_batch: Vec::with_capacity(config.max_batch_len),
config,
runtime_config_manager,
producer,
Expand Down Expand Up @@ -91,32 +95,24 @@ impl Reducer for InflightActivationBatcher {
}

if runtime_config.demoted_namespaces.contains(namespace) {
metrics::counter!(
"filter.forward_task_demoted_namespace",
"namespace" => namespace.clone(),
"taskname" => task_name.clone(),
)
.increment(1);

// The default demoted topic to forward tasks to is config.kafka_long_topic if not set in runtime config.
let topic = runtime_config
.demoted_topic
.clone()
.unwrap_or(self.config.kafka_long_topic.clone());
let delivery = self
.producer
.send(
FutureRecord::<(), Vec<u8>>::to(&topic).payload(&t.activation),
Timeout::After(Duration::from_millis(self.config.send_timeout_ms)),
)
.await;
if delivery.is_ok() {
metrics::counter!("filter.forward_task_demoted_namespace_success",
"namespace" => namespace.clone(),
"taskname" => task_name.clone(),
)
.increment(1);
return Ok(());
match tag_for_forwarding(&t.activation) {
Ok(Some(tagged_activation)) => {
// Forward it
metrics::counter!(
"filter.forward_task_demoted_namespace",
"namespace" => namespace.clone(),
"taskname" => task_name.clone(),
)
.increment(1);
self.forward_batch.push(tagged_activation);
return Ok(());
}
Ok(None) => {
// Already forwarded, fall through to add to batch
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have a metric here as well, as this shouldn't happen in normal scenarios.

}
Err(_) => {
// Decode error, fall through to add to batch to handle in upkeep
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have a metric at least for this case, so we can tell if something weird is happening.

}
}
}

Expand All @@ -127,12 +123,40 @@ impl Reducer for InflightActivationBatcher {
}

async fn flush(&mut self) -> Result<Option<Self::Output>, anyhow::Error> {
if self.batch.is_empty() {
if self.batch.is_empty() && self.forward_batch.is_empty() {
return Ok(None);
}

metrics::histogram!("consumer.batch_rows").record(self.batch.len() as f64);
metrics::histogram!("consumer.batch_bytes").record(self.batch_size as f64);
let runtime_config = self.runtime_config_manager.read().await;

if !self.batch.is_empty() {
metrics::histogram!("consumer.batch_rows").record(self.batch.len() as f64);
metrics::histogram!("consumer.batch_bytes").record(self.batch_size as f64);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not always make the metrics? The zeros are meaningful.


// Send all forward batch in parallel
if !self.forward_batch.is_empty() {
// The default demoted topic to forward tasks to is config.kafka_long_topic if not set in runtime config.
let topic = runtime_config
.demoted_topic
.clone()
.unwrap_or(self.config.kafka_long_topic.clone());
let sends = self.forward_batch.iter().map(|payload| {
self.producer.send(
FutureRecord::<(), Vec<u8>>::to(&topic).payload(payload),
Timeout::After(Duration::from_millis(self.config.send_timeout_ms)),
)
});
Comment on lines 143 to 148
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One concern I have with demoted namespaces, is that the producer cluster won't always have a long topic deployed on it. For example, if a broker is connected to kafka-events and we need to demote to the long/limited topic, that topic only exists on kafka-small. The current logic will result in the broker failing to produce.

I think we would need to expand the application configuration to have broker configuration for the demoted namespace flow. That would also imply that the topic we demote to is also fixed and could be removed from runtime config.


let results = join_all(sends).await;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much better 👏

let success_count = results.iter().filter(|r| r.is_ok()).count();

metrics::histogram!("consumer.forwarded_rows").record(success_count as f64);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a metric for failed produces as well please.

metrics::counter!("filter.forward_task_demoted_namespace_success")
.increment(success_count as u64);

self.forward_batch.clear();
}

self.batch_size = 0;

Expand All @@ -144,6 +168,7 @@ impl Reducer for InflightActivationBatcher {

fn reset(&mut self) {
self.batch_size = 0;
self.forward_batch.clear();
self.batch.clear();
}

Expand All @@ -168,6 +193,7 @@ mod tests {
ActivationBatcherConfig, Config, InflightActivation, InflightActivationBatcher, Reducer,
RuntimeConfigManager,
};
use crate::kafka::utils::tag_for_forwarding;
use chrono::Utc;
use std::collections::HashMap;
use tokio::fs;
Expand All @@ -178,6 +204,90 @@ mod tests {

use crate::store::inflight_activation::InflightActivationStatus;

#[test]
fn test_tag_for_forwarding() {
let original = TaskActivation {
id: "test-id".to_string(),
namespace: "bad_namespace".to_string(),
taskname: "test_task".to_string(),
parameters: r#"{"key":"value"}"#.to_string(),
headers: HashMap::new(),
received_at: None,
retry_state: None,
processing_deadline_duration: 60,
expires: Some(300),
delay: Some(10),
};

let encoded = original.encode_to_vec();
let tagged = tag_for_forwarding(&encoded).unwrap();

assert!(
tagged.is_some(),
"Should return Some for untagged activation"
);
let decoded = TaskActivation::decode(&tagged.unwrap() as &[u8]).unwrap();

// Namespace should be preserved (not modified)
assert_eq!(decoded.namespace, "bad_namespace");

// Should have forwarded header added
assert_eq!(
decoded.headers.get("forwarded"),
Some(&"true".to_string()),
"Should have forwarded header set to true"
);

// All other fields should be preserved
assert_eq!(decoded.id, original.id);
assert_eq!(decoded.taskname, original.taskname);
assert_eq!(decoded.parameters, original.parameters);
assert_eq!(decoded.received_at, original.received_at);
assert_eq!(decoded.retry_state, original.retry_state);
assert_eq!(
decoded.processing_deadline_duration,
original.processing_deadline_duration
);
assert_eq!(decoded.expires, original.expires);
assert_eq!(decoded.delay, original.delay);
}

#[test]
fn test_tag_for_forwarding_already_tagged() {
let mut original = TaskActivation {
id: "test-id".to_string(),
namespace: "bad_namespace".to_string(),
taskname: "test_task".to_string(),
parameters: r#"{"key":"value"}"#.to_string(),
headers: HashMap::new(),
received_at: None,
retry_state: None,
processing_deadline_duration: 60,
expires: Some(300),
delay: Some(10),
};

// Pre-tag it
original
.headers
.insert("forwarded".to_string(), "true".to_string());

let encoded = original.encode_to_vec();
let result = tag_for_forwarding(&encoded).unwrap();

assert!(
result.is_none(),
"Should return None for already tagged activation"
);
}

#[test]
fn test_tag_for_forwarding_decode_error() {
let invalid_bytes = vec![0xFF, 0xFF, 0xFF]; // Invalid protobuf
let result = tag_for_forwarding(&invalid_bytes);
assert!(result.is_err());
}

#[tokio::test]
async fn test_drop_task_due_to_killswitch() {
let test_yaml = r#"
Expand Down Expand Up @@ -460,8 +570,53 @@ demoted_namespaces:
on_attempts_exceeded: OnAttemptsExceeded::Discard,
};

let inflight_activation_1 = InflightActivation {
id: "1".to_string(),
activation: TaskActivation {
id: "1".to_string(),
namespace: "good_namespace".to_string(),
taskname: "good_task".to_string(),
parameters: "{}".to_string(),
headers: HashMap::new(),
received_at: None,
retry_state: None,
processing_deadline_duration: 0,
expires: Some(0),
delay: None,
}
.encode_to_vec(),
status: InflightActivationStatus::Pending,
partition: 0,
offset: 0,
added_at: Utc::now(),
received_at: Utc::now(),
processing_attempts: 0,
processing_deadline_duration: 0,
expires_at: None,
delay_until: None,
processing_deadline: None,
at_most_once: false,
namespace: "good_namespace".to_string(),
taskname: "good_task".to_string(),
on_attempts_exceeded: OnAttemptsExceeded::Discard,
};

batcher.reduce(inflight_activation_0).await.unwrap();
batcher.reduce(inflight_activation_1).await.unwrap();

assert_eq!(batcher.batch.len(), 1);
assert_eq!(batcher.forward_batch.len(), 1);

let flush_result = batcher.flush().await.unwrap();
assert!(flush_result.is_some());
assert_eq!(flush_result.as_ref().unwrap().len(), 1);
assert_eq!(
flush_result.as_ref().unwrap()[0].namespace,
"good_namespace"
);
assert_eq!(flush_result.as_ref().unwrap()[0].taskname, "good_task");
assert_eq!(batcher.batch.len(), 0);
assert_eq!(batcher.forward_batch.len(), 0);

fs::remove_file(test_path).await.unwrap();
}
Expand Down
30 changes: 30 additions & 0 deletions src/kafka/inflight_activation_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ impl Reducer for InflightActivationWriter {
return Ok(None);
};

// If batch is empty (all tasks were forwarded), just mark as complete
if batch.is_empty() {
self.batch.take();
return Ok(Some(()));
}

// Check if writing the batch would exceed the limits
let exceeded_pending_limit = self
.store
Expand Down Expand Up @@ -851,4 +857,28 @@ mod tests {
let count_pending = writer.store.count_pending_activations().await.unwrap();
assert_eq!(count_pending, 200);
}

#[tokio::test]
async fn test_writer_flush_empty_batch() {
let writer_config = ActivationWriterConfig {
db_max_size: None,
max_buf_len: 100,
max_pending_activations: 10,
max_processing_activations: 10,
max_delay_activations: 10,
write_failure_backoff_ms: 4000,
};
let store = Arc::new(
InflightActivationStore::new(
&generate_temp_filename(),
InflightActivationStoreConfig::from_config(&create_integration_config()),
)
.await
.unwrap(),
);
let mut writer = InflightActivationWriter::new(store.clone(), writer_config);
writer.reduce(vec![]).await.unwrap();
let flush_result = writer.flush().await.unwrap();
assert!(flush_result.is_some());
}
}
1 change: 1 addition & 0 deletions src/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ pub mod deserialize_activation;
pub mod inflight_activation_batcher;
pub mod inflight_activation_writer;
pub mod os_stream_writer;
pub mod utils;
24 changes: 24 additions & 0 deletions src/kafka/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use prost::Message;
use sentry_protos::taskbroker::v1::TaskActivation;

/// Tags a TaskActivation for forwarding to demoted topic by adding a header marker.
/// This prevents infinite forwarding loops when tasks are received by another
/// broker with the same demoted_namespaces configuration.
///
/// # Arguments
/// * `activation_bytes` - the activation protobuf of a TaskActivation
///
/// # Returns
/// * `Ok(Some(Vec<u8>))` - The protobuf of the TaskActivation with forwarding header added
/// * `Ok(None)` - Already tagged for forwarding (skip forwarding)
/// * `Err(DecodeError)` - If the protobuf cannot be decoded
pub fn tag_for_forwarding(activation_bytes: &[u8]) -> Result<Option<Vec<u8>>, prost::DecodeError> {
let mut activation = TaskActivation::decode(activation_bytes)?;
if activation.headers.get("forwarded").map(|v| v.as_str()) == Some("true") {
return Ok(None);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the safety property that this provides. Would it be possible to simplify this and avoid deserializing and mutating the activation? We'll have the namespace on the InflightActivation, wouldn't skipping forwarding when the task's namespace matches the demoted namespace option work provide the same safety against loops?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that wouldn't provide safety against loops since all brokers share the same runtime config, however something similar I think would work is to compare between TASKBROKER_LONG_TOPIC and the topic we are trying to forward tasks to. If they are the same, we skip the forward.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that wouldn't provide safety against loops since all brokers share the same runtime config

Each replica in a processing pool does have the same runtime config, but each pool has configuration that is separate from other pools. We could have config to demote tasks in the default pool, and not have that config present on the long or limited pools. Comparing the topic names as we talked about today offline is another solution that would prevent loops.

activation
.headers
.insert("forwarded".to_string(), "true".to_string());
Ok(Some(activation.encode_to_vec()))
}
Loading
Loading