Skip to content

Commit c9714ab

Browse files
committed
Switch back to crate librdkafka
Signed-off-by: Heinz N. Gies <[email protected]>
1 parent 3bafc55 commit c9714ab

File tree

4 files changed

+30
-22
lines changed

4 files changed

+30
-22
lines changed

Cargo.lock

Lines changed: 9 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -131,18 +131,16 @@ tokio-tungstenite = "0.18"
131131
# dns
132132
trust-dns-resolver = { version = "0.22" }
133133

134-
# kafka. cmake is the encouraged way to build this and also the one that works on windows/with musl.
135-
# we stick with git until we have a release with https://github.com/fede1024/rust-rdkafka/pull/417
136-
#rdkafka = { version = "0.28", features = [
137-
rdkafka = { git = "https://github.com/fede1024/rust-rdkafka", rev = "7a2355f03c9aee8d4544e89bc4b0b0b6b96826e0", features = [
134+
rdkafka = { version = "0.29", features = [
138135
"cmake-build",
139136
"libz-static",
137+
"tokio",
140138
], default-features = false }
141-
# rdkafka-sys = { version = "4.2.0", features = [
142-
rdkafka-sys = { git = "https://github.com/fede1024/rust-rdkafka", rev = "7a2355f03c9aee8d4544e89bc4b0b0b6b96826e0", features = [
139+
# tracking the version rdkafka depends on
140+
rdkafka-sys = { version = "4.3.0", features = [
143141
"cmake-build",
144142
"libz-static",
145-
], default-features = false } # tracking the version rdkafka depends on
143+
], default-features = false }
146144

147145
# crononome
148146
cron = "0.12.0"

src/connectors/impls/kafka/consumer.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -450,12 +450,12 @@ impl Connector for KafkaConsumerConnector {
450450
fn kafka_meta(msg: &BorrowedMessage) -> Value<'static> {
451451
let headers = msg.headers().map(|headers| {
452452
let mut headers_meta = Value::object_with_capacity(headers.count());
453-
for i in 0..headers.count() {
454-
if let Some(header) = headers.get(i) {
455-
let key = String::from(header.0);
456-
let val = Value::Bytes(header.1.to_vec().into());
457-
headers_meta.try_insert(key, val);
458-
}
453+
for header in headers.iter() {
454+
let key = header.key.to_string();
455+
let val = header
456+
.value
457+
.map_or(Value::const_null(), |v| Value::Bytes(v.to_vec().into()));
458+
headers_meta.try_insert(key, val);
459459
}
460460
headers_meta
461461
});

src/connectors/impls/kafka/producer.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,11 @@ use crate::{
2424
utils::task_id,
2525
};
2626
use halfbrown::HashMap;
27-
use rdkafka::config::{ClientConfig, FromClientConfigAndContext};
2827
use rdkafka::producer::{DeliveryFuture, Producer};
28+
use rdkafka::{
29+
config::{ClientConfig, FromClientConfigAndContext},
30+
message::Header,
31+
};
2932
use rdkafka::{
3033
message::OwnedHeaders,
3134
producer::{FutureProducer, FutureRecord},
@@ -68,7 +71,7 @@ impl ConfigImpl for Config {}
6871
#[derive(Default, Debug)]
6972
pub(crate) struct Builder {}
7073

71-
#[async_trait::async_trait()]
74+
#[async_trait::async_trait]
7275
impl ConnectorBuilder for Builder {
7376
fn connector_type(&self) -> ConnectorType {
7477
"kafka_producer".into()
@@ -207,7 +210,10 @@ impl Sink for KafkaProducerSink {
207210
for (k, v) in headers_obj.iter() {
208211
// supporting string or bytes as headers value
209212
if let Some(v_bytes) = v.as_bytes() {
210-
headers = headers.add(k, v_bytes);
213+
headers = headers.insert(Header {
214+
key: k,
215+
value: Some(v_bytes),
216+
});
211217
}
212218
}
213219
record = record.headers(headers);
@@ -296,7 +302,7 @@ impl Sink for KafkaProducerSink {
296302
let wait_secs = Duration::from_secs(1);
297303
if producer.in_flight_count() > 0 {
298304
info!("{ctx} Flushing messages. Waiting for {wait_secs:?} seconds.");
299-
producer.flush(wait_secs);
305+
producer.flush(wait_secs)?;
300306
}
301307
}
302308
Ok(())

0 commit comments

Comments
 (0)