Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 32 additions & 6 deletions src/kafka/inflight_activation_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use super::consumer::{
};

pub struct ActivationBatcherConfig {
pub kafka_config: ClientConfig,
pub producer_config: ClientConfig,
pub kafka_cluster: String,
pub kafka_topic: String,
pub kafka_long_topic: String,
pub send_timeout_ms: u64,
Expand All @@ -30,7 +31,8 @@ impl ActivationBatcherConfig {
/// Convert from application configuration into ActivationBatcher config.
pub fn from_config(config: &Config) -> Self {
Self {
kafka_config: config.kafka_producer_config(),
producer_config: config.kafka_producer_config(),
kafka_cluster: config.kafka_cluster.clone(),
kafka_topic: config.kafka_topic.clone(),
kafka_long_topic: config.kafka_long_topic.clone(),
send_timeout_ms: config.kafka_send_timeout_ms,
Expand All @@ -48,6 +50,7 @@ pub struct InflightActivationBatcher {
config: ActivationBatcherConfig,
runtime_config_manager: Arc<RuntimeConfigManager>,
producer: Arc<FutureProducer>,
producer_cluster: String,
}

impl InflightActivationBatcher {
Expand All @@ -57,17 +60,23 @@ impl InflightActivationBatcher {
) -> Self {
let producer: Arc<FutureProducer> = Arc::new(
config
.kafka_config
.producer_config
.create()
.expect("Could not create kafka producer in inflight activation batcher"),
);
let producer_cluster = config
.producer_config
.get("bootstrap.servers")
.unwrap()
.to_owned();
Self {
batch: Vec::with_capacity(config.max_batch_len),
batch_size: 0,
forward_batch: Vec::with_capacity(config.max_batch_len),
config,
runtime_config_manager,
producer,
producer_cluster,
}
}
}
Expand Down Expand Up @@ -136,6 +145,20 @@ impl Reducer for InflightActivationBatcher {
// Send all forward batch in parallel
if !self.forward_batch.is_empty() {
let runtime_config = self.runtime_config_manager.read().await;
let forward_cluster = runtime_config
.demoted_topic_cluster
.clone()
.unwrap_or(self.config.kafka_cluster.clone());
if self.producer_cluster != forward_cluster {
let mut new_config = self.config.producer_config.clone();
new_config.set("bootstrap.servers", &forward_cluster);
self.producer = Arc::new(
new_config
.create()
.expect("Could not create kafka producer in inflight activation batcher"),
);
self.producer_cluster = forward_cluster;
}
let forward_topic = runtime_config
.demoted_topic
.clone()
Expand Down Expand Up @@ -442,7 +465,9 @@ demoted_namespaces:
drop_task_killswitch:
-
demoted_namespaces:
- bad_namespace"#;
- bad_namespace
demoted_topic_cluster: 0.0.0.0:9092
demoted_topic: taskworker-demoted"#;

let test_path = "test_forward_task_due_to_demoted_namespace.yaml";
fs::write(test_path, test_yaml).await.unwrap();
Expand All @@ -453,8 +478,8 @@ demoted_namespaces:
ActivationBatcherConfig::from_config(&config),
runtime_config,
);
println!("kafka_topic: {:?}", config.kafka_topic);
println!("kafka_long_topic: {:?}", config.kafka_long_topic);

assert_eq!(batcher.producer_cluster, config.kafka_cluster.clone());

let inflight_activation_0 = InflightActivation {
id: "0".to_string(),
Expand Down Expand Up @@ -534,6 +559,7 @@ demoted_namespaces:
assert_eq!(flush_result.as_ref().unwrap()[0].taskname, "good_task");
assert_eq!(batcher.batch.len(), 0);
assert_eq!(batcher.forward_batch.len(), 0);
assert_eq!(batcher.producer_cluster, "0.0.0.0:9092");

fs::remove_file(test_path).await.unwrap();
}
Expand Down
8 changes: 8 additions & 0 deletions src/runtime_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ pub struct RuntimeConfig {
/// Tasks from these namespaces will be automatically forwarded to the "long" namespace
/// to prevent them from blocking other tasks in shared namespaces.
pub demoted_namespaces: Vec<String>,
/// The cluster to forward tasks from demoted namespaces to.
/// If not set, the current cluster taskbroker is consuming from will be used.
pub demoted_topic_cluster: Option<String>,
/// The topic to forward tasks from demoted namespaces to.
/// If not set, the taskworker-long topic will be used
pub demoted_topic: Option<String>,
Expand Down Expand Up @@ -176,13 +179,18 @@ drop_task_killswitch:
-
demoted_namespaces:
- bad_namespace
demoted_topic_cluster: kafka:9092
demoted_topic: taskworker-demoted-topic"#;

let test_path = "test_demoted_namespaces.yaml";
fs::write(test_path, test_yaml).await.unwrap();

let runtime_config = RuntimeConfigManager::new(Some(test_path.to_string())).await;
let config = runtime_config.read().await;
assert_eq!(
config.demoted_topic_cluster.as_deref().unwrap(),
"kafka:9092"
);
assert_eq!(config.demoted_namespaces.len(), 1);
assert_eq!(config.demoted_namespaces[0], "bad_namespace");
assert_eq!(
Expand Down
19 changes: 16 additions & 3 deletions src/upkeep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,12 +284,25 @@ pub async fn do_upkeep(

// 12. Forward tasks from demoted namespaces to "long" namespace
let demoted_namespaces = runtime_config.demoted_namespaces.clone();
let forward_cluster = runtime_config
.demoted_topic_cluster
.clone()
.unwrap_or(config.kafka_cluster.clone());
let forward_topic = runtime_config
.demoted_topic
.clone()
.unwrap_or(config.kafka_long_topic.clone());
if !demoted_namespaces.is_empty() && forward_topic != config.kafka_topic {
let same_cluster = forward_cluster == config.kafka_cluster;
let same_topic = forward_topic == config.kafka_topic;
if !(demoted_namespaces.is_empty() || (same_cluster && same_topic)) {
let forward_demoted_start = Instant::now();
let mut forward_producer_config = config.kafka_producer_config();
forward_producer_config.set("bootstrap.servers", &forward_cluster);
let forward_producer: Arc<FutureProducer> = Arc::new(
forward_producer_config
.create()
.expect("Could not create kafka producer in upkeep"),
);
if let Ok(tasks) = store
.get_pending_activations_from_namespaces(Some(&demoted_namespaces), None)
.await
Expand All @@ -298,13 +311,13 @@ pub async fn do_upkeep(
let deliveries = tasks
.into_iter()
.map(|inflight| {
let producer = producer.clone();
let forward_producer = forward_producer.clone();
let config = config.clone();
let topic = forward_topic.clone();
async move {
metrics::counter!("upkeep.forward_task_demoted_namespace", "namespace" => inflight.namespace.clone(), "taskname" => inflight.taskname.clone()).increment(1);

let delivery = producer
let delivery = forward_producer
.send(
FutureRecord::<(), Vec<u8>>::to(&topic)
.payload(&inflight.activation),
Expand Down
Loading