Skip to content

Commit ffbaa20

Browse files
lyradcDaryl Coburnvruello
authored
Round-robin multiple Kafka topics (#299)
round-robin multiple topics when topic contains comma --------- Co-authored-by: Daryl Coburn <daryl_coburn@cargill.com> Co-authored-by: Vincent Ruello <5345986+vruello@users.noreply.github.com>
1 parent d38846e commit ffbaa20

File tree

2 files changed

+16
-4
lines changed

2 files changed

+16
-4
lines changed

doc/outputs.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,9 @@ $ openwec subscriptions edit <subscription> outputs add --format <format> files
7373

7474
The Kafka driver sends events in a Kafka topic.
7575

76-
For a given subscription, all events will be sent in the configured Kafka topic. You may want to add additionnal options to the inner Kafka client, such as `bootstrap.servers`. This options will be directly given to `librdkafka` (available options are listed here: https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html).
76+
For a given subscription, all events will be sent in the configured Kafka topic. If multiple topics are provided, delimited by commas, then the events are evenly distributed across the listed topics.
77+
78+
You may want to add additionnal options to the inner Kafka client, such as `bootstrap.servers`. This options will be directly given to `librdkafka` (available options are listed here: https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html).
7779

7880
> [!TIP]
7981
> If multiple outputs use the Kafka driver and connect to the same Kafka cluster, it is recommended to configure the additional options in OpenWEC settings (`outputs.kafka.options`) **and** to omit the `options` parameter in Kafka output configuration. This way, only one Kafka client will be used by all the outputs, which is more resource efficient.

server/src/drivers/kafka.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ impl OutputKafkaContext {
4141
}
4242

4343
pub struct OutputKafka {
44-
config: KafkaConfiguration,
4544
producer: FutureProducer,
45+
topics: Vec<String>,
4646
}
4747

4848
impl OutputKafka {
@@ -72,9 +72,15 @@ impl OutputKafka {
7272
);
7373
client_config.create()?
7474
};
75+
7576
Ok(OutputKafka {
76-
config: config.clone(),
7777
producer,
78+
topics: config
79+
.topic()
80+
.split(',')
81+
.map(|s| s.trim().to_string())
82+
.filter(|s| !s.is_empty())
83+
.collect(),
7884
})
7985
}
8086
}
@@ -86,13 +92,17 @@ impl OutputDriver for OutputKafka {
8692
_metadata: Arc<EventMetadata>,
8793
events: Arc<Vec<Arc<String>>>,
8894
) -> Result<()> {
95+
let mut l_topic_index: usize = 0;
8996
let mut futures = Vec::new();
9097
for event in events.iter() {
98+
// Get current topic
99+
let topic = self.topics[l_topic_index].as_ref();
91100
// We need to explicitly assign the Key type as ()
92101
futures.push(self.producer.send::<(), _, _>(
93-
FutureRecord::to(self.config.topic()).payload(event.as_ref()),
102+
FutureRecord::to(topic).payload(event.as_ref()),
94103
Timeout::After(Duration::from_secs(30)),
95104
));
105+
l_topic_index = (l_topic_index + 1) % self.topics.len();
96106
}
97107

98108
// Wait for all events to be sent and ack

0 commit comments

Comments
 (0)