Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 26 additions & 119 deletions src/kafka/inflight_activation_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ use super::consumer::{
ReduceConfig, ReduceShutdownBehaviour, ReduceShutdownCondition, Reducer,
ReducerWhenFullBehaviour,
};
use super::utils::tag_for_forwarding;

pub struct ActivationBatcherConfig {
pub kafka_config: ClientConfig,
pub kafka_topic: String,
pub kafka_long_topic: String,
pub send_timeout_ms: u64,
pub max_batch_time_ms: u64,
Expand All @@ -31,6 +31,7 @@ impl ActivationBatcherConfig {
pub fn from_config(config: &Config) -> Self {
Self {
kafka_config: config.kafka_producer_config(),
kafka_topic: config.kafka_topic.clone(),
kafka_long_topic: config.kafka_long_topic.clone(),
send_timeout_ms: config.kafka_send_timeout_ms,
max_batch_time_ms: config.db_insert_batch_max_time_ms,
Expand Down Expand Up @@ -78,6 +79,10 @@ impl Reducer for InflightActivationBatcher {

async fn reduce(&mut self, t: Self::Input) -> Result<(), anyhow::Error> {
let runtime_config = self.runtime_config_manager.read().await;
let forward_topic = runtime_config
.demoted_topic
.clone()
.unwrap_or(self.config.kafka_long_topic.clone());
let task_name = &t.taskname;
let namespace = &t.namespace;

Expand All @@ -95,36 +100,22 @@ impl Reducer for InflightActivationBatcher {
}

if runtime_config.demoted_namespaces.contains(namespace) {
match tag_for_forwarding(&t.activation) {
Ok(Some(tagged_activation)) => {
// Forward it
metrics::counter!(
"filter.forward_task_demoted_namespace",
"namespace" => namespace.clone(),
"taskname" => task_name.clone(),
)
.increment(1);
self.forward_batch.push(tagged_activation);
return Ok(());
}
Ok(None) => {
// Already forwarded, fall through to add to batch
metrics::counter!(
"filter.forward_task_demoted_namespace.skipped",
"namespace" => namespace.clone(),
"taskname" => task_name.clone(),
)
.increment(1);
}
Err(_) => {
// Decode error, fall through to add to batch to handle in upkeep
metrics::counter!(
"filter.forward_task_demoted_namespace.decode_error",
"namespace" => namespace.clone(),
"taskname" => task_name.clone(),
)
.increment(1);
}
if forward_topic == self.config.kafka_topic {
metrics::counter!(
"filter.forward_task_demoted_namespace.skipped",
"namespace" => namespace.clone(),
"taskname" => task_name.clone(),
)
.increment(1);
} else {
metrics::counter!(
"filter.forward_task_demoted_namespace",
"namespace" => namespace.clone(),
"taskname" => task_name.clone(),
)
.increment(1);
self.forward_batch.push(t.activation.clone());
return Ok(());
}
}

Expand All @@ -148,14 +139,13 @@ impl Reducer for InflightActivationBatcher {

// Send all forward batch in parallel
if !self.forward_batch.is_empty() {
// The default demoted topic to forward tasks to is config.kafka_long_topic if not set in runtime config.
let topic = runtime_config
let forward_topic = runtime_config
.demoted_topic
.clone()
.unwrap_or(self.config.kafka_long_topic.clone());
let sends = self.forward_batch.iter().map(|payload| {
self.producer.send(
FutureRecord::<(), Vec<u8>>::to(&topic).payload(payload),
FutureRecord::<(), Vec<u8>>::to(&forward_topic).payload(payload),
Timeout::After(Duration::from_millis(self.config.send_timeout_ms)),
)
});
Expand Down Expand Up @@ -206,7 +196,6 @@ mod tests {
ActivationBatcherConfig, Config, InflightActivation, InflightActivationBatcher, Reducer,
RuntimeConfigManager,
};
use crate::kafka::utils::tag_for_forwarding;
use chrono::Utc;
use std::collections::HashMap;
use tokio::fs;
Expand All @@ -217,90 +206,6 @@ mod tests {

use crate::store::inflight_activation::InflightActivationStatus;

#[test]
fn test_tag_for_forwarding() {
let original = TaskActivation {
id: "test-id".to_string(),
namespace: "bad_namespace".to_string(),
taskname: "test_task".to_string(),
parameters: r#"{"key":"value"}"#.to_string(),
headers: HashMap::new(),
received_at: None,
retry_state: None,
processing_deadline_duration: 60,
expires: Some(300),
delay: Some(10),
};

let encoded = original.encode_to_vec();
let tagged = tag_for_forwarding(&encoded).unwrap();

assert!(
tagged.is_some(),
"Should return Some for untagged activation"
);
let decoded = TaskActivation::decode(&tagged.unwrap() as &[u8]).unwrap();

// Namespace should be preserved (not modified)
assert_eq!(decoded.namespace, "bad_namespace");

// Should have forwarded header added
assert_eq!(
decoded.headers.get("forwarded"),
Some(&"true".to_string()),
"Should have forwarded header set to true"
);

// All other fields should be preserved
assert_eq!(decoded.id, original.id);
assert_eq!(decoded.taskname, original.taskname);
assert_eq!(decoded.parameters, original.parameters);
assert_eq!(decoded.received_at, original.received_at);
assert_eq!(decoded.retry_state, original.retry_state);
assert_eq!(
decoded.processing_deadline_duration,
original.processing_deadline_duration
);
assert_eq!(decoded.expires, original.expires);
assert_eq!(decoded.delay, original.delay);
}

#[test]
fn test_tag_for_forwarding_already_tagged() {
let mut original = TaskActivation {
id: "test-id".to_string(),
namespace: "bad_namespace".to_string(),
taskname: "test_task".to_string(),
parameters: r#"{"key":"value"}"#.to_string(),
headers: HashMap::new(),
received_at: None,
retry_state: None,
processing_deadline_duration: 60,
expires: Some(300),
delay: Some(10),
};

// Pre-tag it
original
.headers
.insert("forwarded".to_string(), "true".to_string());

let encoded = original.encode_to_vec();
let result = tag_for_forwarding(&encoded).unwrap();

assert!(
result.is_none(),
"Should return None for already tagged activation"
);
}

#[test]
fn test_tag_for_forwarding_decode_error() {
let invalid_bytes = vec![0xFF, 0xFF, 0xFF]; // Invalid protobuf
let result = tag_for_forwarding(&invalid_bytes);
assert!(result.is_err());
}

#[tokio::test]
async fn test_drop_task_due_to_killswitch() {
let test_yaml = r#"
Expand Down Expand Up @@ -551,6 +456,8 @@ demoted_namespaces:
ActivationBatcherConfig::from_config(&config),
runtime_config,
);
println!("kafka_topic: {:?}", config.kafka_topic);
println!("kafka_long_topic: {:?}", config.kafka_long_topic);

let inflight_activation_0 = InflightActivation {
id: "0".to_string(),
Expand Down
1 change: 0 additions & 1 deletion src/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,3 @@ pub mod deserialize_activation;
pub mod inflight_activation_batcher;
pub mod inflight_activation_writer;
pub mod os_stream_writer;
pub mod utils;
24 changes: 0 additions & 24 deletions src/kafka/utils.rs

This file was deleted.

26 changes: 7 additions & 19 deletions src/upkeep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use uuid::Uuid;
use crate::{
SERVICE_NAME,
config::Config,
kafka::utils::tag_for_forwarding,
runtime_config::RuntimeConfigManager,
store::inflight_activation::{InflightActivationStatus, InflightActivationStore},
};
Expand Down Expand Up @@ -285,7 +284,11 @@ pub async fn do_upkeep(

// 12. Forward tasks from demoted namespaces to "long" namespace
let demoted_namespaces = runtime_config.demoted_namespaces.clone();
if !demoted_namespaces.is_empty() {
let forward_topic = runtime_config
.demoted_topic
.clone()
.unwrap_or(config.kafka_long_topic.clone());
if !demoted_namespaces.is_empty() && forward_topic != config.kafka_topic {
let forward_demoted_start = Instant::now();
if let Ok(tasks) = store
.get_pending_activations_from_namespaces(Some(&demoted_namespaces), None)
Expand All @@ -297,29 +300,14 @@ pub async fn do_upkeep(
.map(|inflight| {
let producer = producer.clone();
let config = config.clone();
// The default demoted topic to forward tasks to is config.kafka_long_topic if not set in runtime config.
let topic = runtime_config.demoted_topic.clone().unwrap_or(config.kafka_long_topic.clone());

let topic = forward_topic.clone();
async move {
metrics::counter!("upkeep.forward_task_demoted_namespace", "namespace" => inflight.namespace.clone(), "taskname" => inflight.taskname.clone()).increment(1);

let tagged_activation = match tag_for_forwarding(&inflight.activation) {
Ok(Some(tagged)) => tagged,
Ok(None) => {
metrics::counter!("upkeep.forward_task_demoted_namespace.skipped", "namespace" => inflight.namespace.clone(), "taskname" => inflight.taskname.clone()).increment(1);
return Ok(inflight.id);
}
Err(err) => {
metrics::counter!("upkeep.forward_task_demoted_namespace.decode_error", "namespace" => inflight.namespace.clone(), "taskname" => inflight.taskname.clone()).increment(1);
error!("forward_task_demoted_namespace.tag_failure: {}", err);
return Err(anyhow::anyhow!("failed to tag for forwarding: {}", err));
}
};

let delivery = producer
.send(
FutureRecord::<(), Vec<u8>>::to(&topic)
.payload(&tagged_activation),
.payload(&inflight.activation),
Timeout::After(Duration::from_millis(config.kafka_send_timeout_ms)),
)
.await;
Expand Down
Loading