Skip to content

Commit 9055d71

Browse files
committed
New feature: cloudevents-rdkafka
Conditionally compile rdkafka module when enabled Signed-off-by: Jim Crossley <[email protected]>
1 parent 51b49f1 commit 9055d71

File tree

9 files changed

+512
-4
lines changed

9 files changed

+512
-4
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ name = "cloudevents"
1919
[features]
2020
cloudevents-actix = ["actix-web", "async-trait", "lazy_static", "bytes", "futures"]
2121
cloudevents-reqwest = ["reqwest", "async-trait", "lazy_static", "bytes"]
22+
cloudevents-rdkafka = ["rdkafka", "lazy_static", "bytes"]
2223

2324
[dependencies]
2425
serde = { version = "^1.0", features = ["derive"] }
@@ -33,6 +34,7 @@ bitflags = "^1.2"
3334
# runtime optional deps
3435
actix-web = { version = "^3", default-features = false, optional = true }
3536
reqwest = { version = "^0.11", default-features = false, features = ["rustls-tls"], optional = true }
37+
rdkafka = { version = "^0.25", features = ["cmake-build"], optional = true }
3638
async-trait = { version = "^0.1.33", optional = true }
3739
lazy_static = { version = "1.4.0", optional = true }
3840
bytes = { version = "^1.0", optional = true }

cloudevents-sdk-rdkafka/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ categories = ["web-programming", "encoding"]
1414

1515
[dependencies]
1616
bytes = "^1.0"
17-
cloudevents-sdk = { path = ".." }
17+
cloudevents-sdk = { version = "0.3.0", path = ".." }
1818
lazy_static = "1.4.0"
1919
rdkafka = { version = "^0.25", features = ["cmake-build"] }
2020

example-projects/rdkafka-example/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@ edition = "2018"
88

99
[dependencies]
1010
async-trait = "^0.1.33"
11-
cloudevents-sdk = { path = "../.." }
12-
cloudevents-sdk-rdkafka = { path = "../../cloudevents-sdk-rdkafka" }
11+
cloudevents-sdk = { path = "../..", features = ["cloudevents-rdkafka"] }
1312
lazy_static = "1.4.0"
1413
bytes = "^1.0"
1514
url = { version = "^2.1", features = ["serde"] }

example-projects/rdkafka-example/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use futures::StreamExt;
33
use serde_json::json;
44

55
use cloudevents::{EventBuilder, EventBuilderV10};
6-
use cloudevents_sdk_rdkafka::{FutureRecordExt, MessageExt, MessageRecord};
6+
use cloudevents::rdkafka::{FutureRecordExt, MessageExt, MessageRecord};
77

88
use rdkafka::config::{ClientConfig, RDKafkaLogLevel};
99
use rdkafka::consumer::stream_consumer::StreamConsumer;

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242

4343
#[cfg(feature = "cloudevents-actix")]
4444
pub mod actix;
45+
#[cfg(feature = "cloudevents-rdkafka")]
46+
pub mod rdkafka;
4547
#[cfg(feature = "cloudevents-reqwest")]
4648
pub mod reqwest;
4749

src/rdkafka/headers.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
use crate::event::SpecVersion;
2+
use lazy_static::lazy_static;
3+
use std::collections::HashMap;
4+
5+
macro_rules! attribute_name_to_header {
6+
($attribute:expr) => {
7+
format!("ce_{}", $attribute)
8+
};
9+
}
10+
11+
fn attributes_to_headers(it: impl Iterator<Item = &'static str>) -> HashMap<&'static str, String> {
12+
it.map(|s| {
13+
if s == "datacontenttype" {
14+
(s, String::from("content-type"))
15+
} else {
16+
(s, attribute_name_to_header!(s))
17+
}
18+
})
19+
.collect()
20+
}
21+
22+
lazy_static! {
23+
pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, String> =
24+
attributes_to_headers(SpecVersion::all_attribute_names());
25+
}
26+
27+
pub(crate) static SPEC_VERSION_HEADER: &str = "ce_specversion";
28+
pub(crate) static CLOUDEVENTS_JSON_HEADER: &str = "application/cloudevents+json";
29+
pub(crate) static CONTENT_TYPE: &str = "content-type";

src/rdkafka/kafka_consumer_record.rs

Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
use super::headers;
2+
use crate::event::SpecVersion;
3+
use crate::message::{
4+
BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
5+
Result, StructuredDeserializer, StructuredSerializer,
6+
};
7+
use crate::{message, Event};
8+
use rdkafka::message::{BorrowedMessage, Headers, Message, OwnedMessage};
9+
use std::collections::HashMap;
10+
use std::convert::TryFrom;
11+
use std::str;
12+
13+
/// Wrapper for [`Message`] that implements [`MessageDeserializer`] trait.
14+
pub struct ConsumerRecordDeserializer {
15+
pub(crate) headers: HashMap<String, Vec<u8>>,
16+
pub(crate) payload: Option<Vec<u8>>,
17+
}
18+
19+
impl ConsumerRecordDeserializer {
20+
fn get_kafka_headers(message: &impl Message) -> Result<HashMap<String, Vec<u8>>> {
21+
let mut hm = HashMap::new();
22+
let headers = message
23+
.headers()
24+
// TODO create an error variant for invalid headers
25+
.ok_or(crate::message::Error::WrongEncoding {})?;
26+
for i in 0..headers.count() {
27+
let header = headers.get(i).unwrap();
28+
hm.insert(header.0.to_string(), Vec::from(header.1));
29+
}
30+
Ok(hm)
31+
}
32+
33+
pub fn new(message: &impl Message) -> Result<ConsumerRecordDeserializer> {
34+
Ok(ConsumerRecordDeserializer {
35+
headers: Self::get_kafka_headers(message)?,
36+
payload: message.payload().map(Vec::from),
37+
})
38+
}
39+
}
40+
41+
impl BinaryDeserializer for ConsumerRecordDeserializer {
42+
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(mut self, mut visitor: V) -> Result<R> {
43+
if self.encoding() != Encoding::BINARY {
44+
return Err(message::Error::WrongEncoding {});
45+
}
46+
47+
let spec_version = SpecVersion::try_from(
48+
str::from_utf8(&self.headers.remove(headers::SPEC_VERSION_HEADER).unwrap()[..])
49+
.map_err(|e| crate::message::Error::Other {
50+
source: Box::new(e),
51+
})?,
52+
)?;
53+
54+
visitor = visitor.set_spec_version(spec_version.clone())?;
55+
56+
let attributes = spec_version.attribute_names();
57+
58+
if let Some(hv) = self.headers.remove(headers::CONTENT_TYPE) {
59+
visitor = visitor.set_attribute(
60+
"datacontenttype",
61+
MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
62+
crate::message::Error::Other {
63+
source: Box::new(e),
64+
}
65+
})?),
66+
)?
67+
}
68+
69+
for (hn, hv) in self
70+
.headers
71+
.into_iter()
72+
.filter(|(hn, _)| headers::SPEC_VERSION_HEADER != *hn && hn.starts_with("ce_"))
73+
{
74+
let name = &hn["ce_".len()..];
75+
76+
if attributes.contains(&name) {
77+
visitor = visitor.set_attribute(
78+
name,
79+
MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
80+
crate::message::Error::Other {
81+
source: Box::new(e),
82+
}
83+
})?),
84+
)?
85+
} else {
86+
visitor = visitor.set_extension(
87+
name,
88+
MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
89+
crate::message::Error::Other {
90+
source: Box::new(e),
91+
}
92+
})?),
93+
)?
94+
}
95+
}
96+
97+
if self.payload != None {
98+
visitor.end_with_data(self.payload.unwrap())
99+
} else {
100+
visitor.end()
101+
}
102+
}
103+
}
104+
105+
impl StructuredDeserializer for ConsumerRecordDeserializer {
106+
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
107+
if self.encoding() != Encoding::STRUCTURED {
108+
return Err(message::Error::WrongEncoding {});
109+
}
110+
visitor.set_structured_event(self.payload.unwrap())
111+
}
112+
}
113+
114+
impl MessageDeserializer for ConsumerRecordDeserializer {
115+
fn encoding(&self) -> Encoding {
116+
match (
117+
self.headers
118+
.get("content-type")
119+
.map(|s| String::from_utf8(s.to_vec()).ok())
120+
.flatten()
121+
.map(|s| s.starts_with(headers::CLOUDEVENTS_JSON_HEADER))
122+
.unwrap_or(false),
123+
self.headers.get(headers::SPEC_VERSION_HEADER),
124+
) {
125+
(true, _) => Encoding::STRUCTURED,
126+
(_, Some(_)) => Encoding::BINARY,
127+
_ => Encoding::UNKNOWN,
128+
}
129+
}
130+
}
131+
132+
/// Method to transform a [`Message`] to [`Event`].
133+
pub fn record_to_event(msg: &impl Message) -> Result<Event> {
134+
MessageDeserializer::into_event(ConsumerRecordDeserializer::new(msg)?)
135+
}
136+
137+
/// Extension Trait for [`Message`] which acts as a wrapper for the function [`record_to_event()`].
138+
///
139+
/// This trait is sealed and cannot be implemented for types outside of this crate.
140+
pub trait MessageExt: private::Sealed {
141+
/// Generates [`Event`] from [`BorrowedMessage`].
142+
fn to_event(&self) -> Result<Event>;
143+
}
144+
145+
impl MessageExt for BorrowedMessage<'_> {
146+
fn to_event(&self) -> Result<Event> {
147+
record_to_event(self)
148+
}
149+
}
150+
151+
impl MessageExt for OwnedMessage {
152+
fn to_event(&self) -> Result<Event> {
153+
record_to_event(self)
154+
}
155+
}
156+
157+
mod private {
158+
// Sealing the MessageExt
159+
pub trait Sealed {}
160+
impl Sealed for rdkafka::message::OwnedMessage {}
161+
impl Sealed for rdkafka::message::BorrowedMessage<'_> {}
162+
}
163+
164+
#[cfg(test)]
165+
mod tests {
166+
use super::*;
167+
use crate::rdkafka::kafka_producer_record::MessageRecord;
168+
169+
use crate::{EventBuilder, EventBuilderV10};
170+
use chrono::Utc;
171+
use serde_json::json;
172+
173+
#[test]
174+
fn test_binary_record() {
175+
let time = Utc::now();
176+
177+
let expected = EventBuilderV10::new()
178+
.id("0001")
179+
.ty("example.test")
180+
.time(time)
181+
.source("http://localhost")
182+
.extension("someint", "10")
183+
.build()
184+
.unwrap();
185+
186+
// Since there is neither a way provided by rust-rdkafka to convert FutureProducer back into
187+
// OwnedMessage or BorrowedMessage, nor is there a way to create a BorrowedMessage struct,
188+
// the test uses OwnedMessage instead, which consumes the message instead of borrowing it like
189+
// in the case of BorrowedMessage
190+
191+
let message_record = MessageRecord::from_event(
192+
EventBuilderV10::new()
193+
.id("0001")
194+
.ty("example.test")
195+
.time(time)
196+
.source("http://localhost")
197+
.extension("someint", "10")
198+
.build()
199+
.unwrap(),
200+
)
201+
.unwrap();
202+
203+
let owned_message = OwnedMessage::new(
204+
message_record.payload,
205+
Some(String::from("test key").into_bytes()),
206+
String::from("test topic"),
207+
rdkafka::message::Timestamp::NotAvailable,
208+
10,
209+
10,
210+
Some(message_record.headers),
211+
);
212+
213+
assert_eq!(owned_message.to_event().unwrap(), expected)
214+
}
215+
216+
#[test]
217+
fn test_structured_record() {
218+
let j = json!({"hello": "world"});
219+
220+
let expected = EventBuilderV10::new()
221+
.id("0001")
222+
.ty("example.test")
223+
.source("http://localhost")
224+
.data("application/json", j.clone())
225+
.extension("someint", "10")
226+
.build()
227+
.unwrap();
228+
229+
// Since there is neither a way provided by rust-rdkafka to convert FutureProducer back into
230+
// OwnedMessage or BorrowedMessage, nor is there a way to create a BorrowedMessage struct,
231+
// the test uses OwnedMessage instead, which consumes the message instead of borrowing it like
232+
// in the case of BorrowedMessage
233+
234+
let input = EventBuilderV10::new()
235+
.id("0001")
236+
.ty("example.test")
237+
.source("http://localhost")
238+
.data("application/json", j.clone())
239+
.extension("someint", "10")
240+
.build()
241+
.unwrap();
242+
243+
let serialized_event =
244+
StructuredDeserializer::deserialize_structured(input, MessageRecord::new()).unwrap();
245+
246+
let owned_message = OwnedMessage::new(
247+
serialized_event.payload,
248+
Some(String::from("test key").into_bytes()),
249+
String::from("test topic"),
250+
rdkafka::message::Timestamp::NotAvailable,
251+
10,
252+
10,
253+
Some(serialized_event.headers),
254+
);
255+
256+
assert_eq!(owned_message.to_event().unwrap(), expected)
257+
}
258+
}

0 commit comments

Comments
 (0)