Skip to content

Commit 7c2ff41

Browse files
authored
Bump to latest rdkafka 0.29 (#192)
* Bump to latest rdkafka 0.29 * Updated example Apps will be required to bump their dep to 0.29, too. * Using the new iter() fn for message headers
1 parent c380078 commit 7c2ff41

File tree

5 files changed

+29
-21
lines changed

5 files changed

+29
-21
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ uuid = { version = "1", features = ["v4"] }
4646
actix-web = { version = "4", optional = true }
4747
actix-http = { version = "3", optional = true }
4848
reqwest-lib = { version = "^0.11", default-features = false, features = ["rustls-tls"], optional = true, package = "reqwest" }
49-
rdkafka-lib = { version = "^0.28", features = ["cmake-build"], optional = true, package = "rdkafka" }
49+
rdkafka-lib = { version = "^0.29", features = ["cmake-build"], optional = true, package = "rdkafka" }
5050
warp-lib = { version = "^0.3", optional = true, package = "warp" }
5151
async-trait = { version = "^0.1.33", optional = true }
5252
bytes = { version = "^1.0", optional = true }

example-projects/rdkafka-example/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@ serde_json = "^1.0"
1616
futures = "^0.3"
1717
tokio = { version = "^1.0", features = ["full"] }
1818
clap = "2.33.1"
19-
rdkafka = { version = "^0.28", features = ["cmake-build"] }
19+
rdkafka = { version = "^0.29", features = ["cmake-build"] }

src/binding/rdkafka/kafka_consumer_record.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,13 @@ pub struct ConsumerRecordDeserializer {
2020

2121
impl ConsumerRecordDeserializer {
2222
fn get_kafka_headers(message: &impl Message) -> Result<HashMap<String, Vec<u8>>> {
23-
let mut hm = HashMap::new();
24-
let headers = message
25-
.headers()
26-
// TODO create an error variant for invalid headers
27-
.ok_or(crate::message::Error::WrongEncoding {})?;
28-
for i in 0..headers.count() {
29-
let header = headers.get(i).unwrap();
30-
hm.insert(header.0.to_string(), Vec::from(header.1));
23+
match message.headers() {
24+
None => Err(crate::message::Error::WrongEncoding {}),
25+
Some(headers) => Ok(headers
26+
.iter()
27+
.map(|h| (h.key.to_string(), Vec::from(h.value.unwrap())))
28+
.collect()),
3129
}
32-
Ok(hm)
3330
}
3431

3532
pub fn new(message: &impl Message) -> Result<ConsumerRecordDeserializer> {

src/binding/rdkafka/kafka_producer_record.rs

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::message::{
99
BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer,
1010
};
1111
use crate::Event;
12-
use rdkafka::message::{OwnedHeaders, ToBytes};
12+
use rdkafka::message::{Header, OwnedHeaders, ToBytes};
1313
use rdkafka::producer::{BaseRecord, FutureRecord};
1414

1515
/// This struct contains a serialized CloudEvent message in the Kafka shape.
@@ -46,20 +46,27 @@ impl Default for MessageRecord {
4646

4747
impl BinarySerializer<MessageRecord> for MessageRecord {
4848
fn set_spec_version(mut self, sv: SpecVersion) -> Result<Self> {
49-
self.headers = self.headers.add(SPEC_VERSION_HEADER, &sv.to_string());
49+
let v = sv.to_string();
50+
let header = Header {
51+
key: SPEC_VERSION_HEADER,
52+
value: Some(&v),
53+
};
54+
self.headers = self.headers.insert(header);
5055
Ok(self)
5156
}
5257

5358
fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
54-
let key = &header_prefix(name);
55-
self.headers = self.headers.add(key, &value.to_string());
59+
let v = value.to_string();
60+
let header = Header {
61+
key: &header_prefix(name),
62+
value: Some(&v),
63+
};
64+
self.headers = self.headers.insert(header);
5665
Ok(self)
5766
}
5867

59-
fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
60-
let key = &header_prefix(name);
61-
self.headers = self.headers.add(key, &value.to_string());
62-
Ok(self)
68+
fn set_extension(self, name: &str, value: MessageAttributeValue) -> Result<Self> {
69+
self.set_attribute(name, value)
6370
}
6471

6572
fn end_with_data(mut self, bytes: Vec<u8>) -> Result<MessageRecord> {
@@ -74,7 +81,11 @@ impl BinarySerializer<MessageRecord> for MessageRecord {
7481

7582
impl StructuredSerializer<MessageRecord> for MessageRecord {
7683
fn set_structured_event(mut self, bytes: Vec<u8>) -> Result<MessageRecord> {
77-
self.headers = self.headers.add(CONTENT_TYPE, CLOUDEVENTS_JSON_HEADER);
84+
let header = Header {
85+
key: CONTENT_TYPE,
86+
value: Some(CLOUDEVENTS_JSON_HEADER),
87+
};
88+
self.headers = self.headers.insert(header);
7889

7990
self.payload = Some(bytes);
8091

src/binding/rdkafka/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
//! use futures::StreamExt;
3434
//!
3535
//! # async fn consume(consumer: StreamConsumer<DefaultConsumerContext>) -> Result<(), Box<dyn std::error::Error>> {
36-
//! let mut message_stream = consumer.start();
36+
//! let mut message_stream = consumer.stream();
3737
//!
3838
//! while let Some(message) = message_stream.next().await {
3939
//! match message {

0 commit comments

Comments
 (0)