Skip to content
Merged
114 changes: 93 additions & 21 deletions src/kafka/inflight_activation_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::{
store::inflight_activation::InflightActivation,
};
use chrono::Utc;
use futures::future::join_all;
use rdkafka::config::ClientConfig;
use rdkafka::{
producer::{FutureProducer, FutureRecord},
Expand All @@ -17,6 +18,7 @@ use super::consumer::{

pub struct ActivationBatcherConfig {
pub kafka_config: ClientConfig,
pub kafka_topic: String,
pub kafka_long_topic: String,
pub send_timeout_ms: u64,
pub max_batch_time_ms: u64,
Expand All @@ -29,6 +31,7 @@ impl ActivationBatcherConfig {
pub fn from_config(config: &Config) -> Self {
Self {
kafka_config: config.kafka_producer_config(),
kafka_topic: config.kafka_topic.clone(),
kafka_long_topic: config.kafka_long_topic.clone(),
send_timeout_ms: config.kafka_send_timeout_ms,
max_batch_time_ms: config.db_insert_batch_max_time_ms,
Expand All @@ -41,6 +44,7 @@ impl ActivationBatcherConfig {
pub struct InflightActivationBatcher {
batch: Vec<InflightActivation>,
batch_size: usize,
forward_batch: Vec<Vec<u8>>, // payload
config: ActivationBatcherConfig,
runtime_config_manager: Arc<RuntimeConfigManager>,
producer: Arc<FutureProducer>,
Expand All @@ -60,6 +64,7 @@ impl InflightActivationBatcher {
Self {
batch: Vec::with_capacity(config.max_batch_len),
batch_size: 0,
forward_batch: Vec::with_capacity(config.max_batch_len),
config,
runtime_config_manager,
producer,
Expand All @@ -74,6 +79,10 @@ impl Reducer for InflightActivationBatcher {

async fn reduce(&mut self, t: Self::Input) -> Result<(), anyhow::Error> {
let runtime_config = self.runtime_config_manager.read().await;
let forward_topic = runtime_config
.demoted_topic
.clone()
.unwrap_or(self.config.kafka_long_topic.clone());
let task_name = &t.taskname;
let namespace = &t.namespace;

Expand All @@ -91,31 +100,21 @@ impl Reducer for InflightActivationBatcher {
}

if runtime_config.demoted_namespaces.contains(namespace) {
metrics::counter!(
"filter.forward_task_demoted_namespace",
"namespace" => namespace.clone(),
"taskname" => task_name.clone(),
)
.increment(1);

// The default demoted topic to forward tasks to is config.kafka_long_topic if not set in runtime config.
let topic = runtime_config
.demoted_topic
.clone()
.unwrap_or(self.config.kafka_long_topic.clone());
let delivery = self
.producer
.send(
FutureRecord::<(), Vec<u8>>::to(&topic).payload(&t.activation),
Timeout::After(Duration::from_millis(self.config.send_timeout_ms)),
if forward_topic == self.config.kafka_topic {
metrics::counter!(
"filter.forward_task_demoted_namespace.skipped",
"namespace" => namespace.clone(),
"taskname" => task_name.clone(),
)
.await;
if delivery.is_ok() {
metrics::counter!("filter.forward_task_demoted_namespace_success",
.increment(1);
} else {
metrics::counter!(
"filter.forward_task_demoted_namespace",
"namespace" => namespace.clone(),
"taskname" => task_name.clone(),
)
.increment(1);
self.forward_batch.push(t.activation.clone());
return Ok(());
}
}
Expand All @@ -127,13 +126,38 @@ impl Reducer for InflightActivationBatcher {
}

async fn flush(&mut self) -> Result<Option<Self::Output>, anyhow::Error> {
if self.batch.is_empty() {
if self.batch.is_empty() && self.forward_batch.is_empty() {
return Ok(None);
}

metrics::histogram!("consumer.batch_rows").record(self.batch.len() as f64);
metrics::histogram!("consumer.batch_bytes").record(self.batch_size as f64);

// Send all forward batch in parallel
if !self.forward_batch.is_empty() {
let runtime_config = self.runtime_config_manager.read().await;
let forward_topic = runtime_config
.demoted_topic
.clone()
.unwrap_or(self.config.kafka_long_topic.clone());
let sends = self.forward_batch.iter().map(|payload| {
self.producer.send(
FutureRecord::<(), Vec<u8>>::to(&forward_topic).payload(payload),
Timeout::After(Duration::from_millis(self.config.send_timeout_ms)),
)
});

let results = join_all(sends).await;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much better 👏

let success_count = results.iter().filter(|r| r.is_ok()).count();

metrics::histogram!("consumer.forward_attempts").record(results.len() as f64);
metrics::histogram!("consumer.forward_successes").record(success_count as f64);
metrics::histogram!("consumer.forward_failures")
.record((results.len() - success_count) as f64);

self.forward_batch.clear();
}

self.batch_size = 0;

Ok(Some(replace(
Expand All @@ -144,6 +168,7 @@ impl Reducer for InflightActivationBatcher {

fn reset(&mut self) {
self.batch_size = 0;
self.forward_batch.clear();
self.batch.clear();
}

Expand Down Expand Up @@ -428,6 +453,8 @@ demoted_namespaces:
ActivationBatcherConfig::from_config(&config),
runtime_config,
);
println!("kafka_topic: {:?}", config.kafka_topic);
println!("kafka_long_topic: {:?}", config.kafka_long_topic);
Comment on lines +456 to +457
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some stray prints.


let inflight_activation_0 = InflightActivation {
id: "0".to_string(),
Expand Down Expand Up @@ -460,8 +487,53 @@ demoted_namespaces:
on_attempts_exceeded: OnAttemptsExceeded::Discard,
};

let inflight_activation_1 = InflightActivation {
id: "1".to_string(),
activation: TaskActivation {
id: "1".to_string(),
namespace: "good_namespace".to_string(),
taskname: "good_task".to_string(),
parameters: "{}".to_string(),
headers: HashMap::new(),
received_at: None,
retry_state: None,
processing_deadline_duration: 0,
expires: Some(0),
delay: None,
}
.encode_to_vec(),
status: InflightActivationStatus::Pending,
partition: 0,
offset: 0,
added_at: Utc::now(),
received_at: Utc::now(),
processing_attempts: 0,
processing_deadline_duration: 0,
expires_at: None,
delay_until: None,
processing_deadline: None,
at_most_once: false,
namespace: "good_namespace".to_string(),
taskname: "good_task".to_string(),
on_attempts_exceeded: OnAttemptsExceeded::Discard,
};

batcher.reduce(inflight_activation_0).await.unwrap();
batcher.reduce(inflight_activation_1).await.unwrap();

assert_eq!(batcher.batch.len(), 1);
assert_eq!(batcher.forward_batch.len(), 1);

let flush_result = batcher.flush().await.unwrap();
assert!(flush_result.is_some());
assert_eq!(flush_result.as_ref().unwrap().len(), 1);
assert_eq!(
flush_result.as_ref().unwrap()[0].namespace,
"good_namespace"
);
assert_eq!(flush_result.as_ref().unwrap()[0].taskname, "good_task");
assert_eq!(batcher.batch.len(), 0);
assert_eq!(batcher.forward_batch.len(), 0);

fs::remove_file(test_path).await.unwrap();
}
Expand Down
30 changes: 30 additions & 0 deletions src/kafka/inflight_activation_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ impl Reducer for InflightActivationWriter {
return Ok(None);
};

// If batch is empty (all tasks were forwarded), just mark as complete
if batch.is_empty() {
self.batch.take();
return Ok(Some(()));
}

// Check if writing the batch would exceed the limits
let exceeded_pending_limit = self
.store
Expand Down Expand Up @@ -851,4 +857,28 @@ mod tests {
let count_pending = writer.store.count_pending_activations().await.unwrap();
assert_eq!(count_pending, 200);
}

#[tokio::test]
async fn test_writer_flush_empty_batch() {
let writer_config = ActivationWriterConfig {
db_max_size: None,
max_buf_len: 100,
max_pending_activations: 10,
max_processing_activations: 10,
max_delay_activations: 10,
write_failure_backoff_ms: 4000,
};
let store = Arc::new(
InflightActivationStore::new(
&generate_temp_filename(),
InflightActivationStoreConfig::from_config(&create_integration_config()),
)
.await
.unwrap(),
);
let mut writer = InflightActivationWriter::new(store.clone(), writer_config);
writer.reduce(vec![]).await.unwrap();
let flush_result = writer.flush().await.unwrap();
assert!(flush_result.is_some());
}
}
23 changes: 12 additions & 11 deletions src/upkeep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,11 @@ pub async fn do_upkeep(

// 12. Forward tasks from demoted namespaces to "long" namespace
let demoted_namespaces = runtime_config.demoted_namespaces.clone();
if !demoted_namespaces.is_empty() {
let forward_topic = runtime_config
.demoted_topic
.clone()
.unwrap_or(config.kafka_long_topic.clone());
if !demoted_namespaces.is_empty() && forward_topic != config.kafka_topic {
let forward_demoted_start = Instant::now();
if let Ok(tasks) = store
.get_pending_activations_from_namespaces(Some(&demoted_namespaces), None)
Expand All @@ -296,14 +300,10 @@ pub async fn do_upkeep(
.map(|inflight| {
let producer = producer.clone();
let config = config.clone();
// The default demoted topic to forward tasks to is config.kafka_long_topic if not set in runtime config.
let topic = runtime_config.demoted_topic.clone().unwrap_or(config.kafka_long_topic.clone());

let topic = forward_topic.clone();
async move {
if inflight.status == InflightActivationStatus::Complete {
return Ok(inflight.id);
}
metrics::counter!("upkeep.forward_demoted_namespace", "namespace" => inflight.namespace, "taskname" => inflight.taskname).increment(1);
metrics::counter!("upkeep.forward_task_demoted_namespace", "namespace" => inflight.namespace.clone(), "taskname" => inflight.taskname.clone()).increment(1);

let delivery = producer
.send(
FutureRecord::<(), Vec<u8>>::to(&topic)
Expand All @@ -314,8 +314,9 @@ pub async fn do_upkeep(
match delivery {
Ok(_) => Ok(inflight.id),
Err((err, _msg)) => {
error!("forward_demoted_namespace.publish.failure: {}", err);
Err(err)
metrics::counter!("upkeep.forward_task_demoted_namespace.publish_failure", "namespace" => inflight.namespace.clone(), "taskname" => inflight.taskname.clone()).increment(1);
error!("forward_task_demoted_namespace.publish.failure: {}", err);
Err(anyhow::anyhow!("failed to publish activation: {}", err))
}
}
}
Expand All @@ -333,7 +334,7 @@ pub async fn do_upkeep(
result_context.forwarded = forwarded_count;
}
}
metrics::histogram!("upkeep.forward_demoted_namespaces")
metrics::histogram!("upkeep.forward_task_demoted_namespaces")
.record(forward_demoted_start.elapsed());
}

Expand Down
Loading