Skip to content

Commit 3a56fcc

Browse files
Implementing Kafka Protocol Binding for CloudEvents using rdkafka-rust (cloudevents#60)
Signed-off-by: [email protected] <[email protected]> Co-authored-by: slinkydeveloper <[email protected]>
1 parent 94a134c commit 3a56fcc

File tree

10 files changed

+713
-4
lines changed

10 files changed

+713
-4
lines changed

Cargo.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,11 @@ claim = "0.3.1"
4141
members = [
4242
".",
4343
"cloudevents-sdk-actix-web",
44-
"cloudevents-sdk-reqwest"
44+
"cloudevents-sdk-reqwest",
45+
"cloudevents-sdk-rdkafka"
4546
]
4647
exclude = [
4748
"example-projects/actix-web-example",
48-
"example-projects/reqwest-wasm-example"
49+
"example-projects/reqwest-wasm-example",
50+
"example-projects/rdkafka-example",
4951
]

README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ Note: This project is WIP under active development, hence all APIs are considere
1313
| AVRO Event Format |||
1414
| HTTP Protocol Binding |||
1515
| JSON Event Format |||
16-
| Kafka Protocol Binding | | |
16+
| Kafka Protocol Binding | | |
1717
| MQTT Protocol Binding |||
1818
| NATS Protocol Binding |||
1919
| Web hook |||
@@ -23,6 +23,7 @@ Note: This project is WIP under active development, hence all APIs are considere
2323
* `cloudevents-sdk`: Provides Event data structure, JSON Event format implementation. This module is tested to work with GNU libc, WASM and musl toolchains.
2424
* `cloudevents-sdk-actix-web`: Integration with [Actix Web](https://github.com/actix/actix-web).
2525
* `cloudevents-sdk-reqwest`: Integration with [reqwest](https://github.com/seanmonstar/reqwest).
26+
* `cloudevents-sdk-rdkafka`: Integration with [rust-rdkafka](https://fede1024.github.io/rust-rdkafka).
2627

2728
## Get Started
2829

@@ -46,10 +47,11 @@ let event = EventBuilderV10::new()
4647
.unwrap();
4748
```
4849

49-
Checkout the examples using our integrations with `actix-web` and `reqwest` to learn how to send and receive events:
50+
Checkout the examples using our integrations to learn how to send and receive events:
5051

5152
* [Actix Web Example](example-projects/actix-web-example)
5253
* [Reqwest/WASM Example](example-projects/reqwest-wasm-example)
54+
* [Kafka Example](example-projects/rdkafka-example)
5355

5456
## Development & Contributing
5557

cloudevents-sdk-rdkafka/Cargo.toml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
[package]
2+
name = "cloudevents-sdk-rdkafka"
3+
version = "0.1.0"
4+
authors = ["Pranav Bhatt <[email protected]>"]
5+
edition = "2018"
6+
license-file = "../LICENSE"
7+
description = "CloudEvents official Rust SDK - Kafka integration"
8+
documentation = "https://docs.rs/cloudevents-sdk-rdkafka"
9+
repository = "https://github.com/cloudevents/sdk-rust"
10+
readme = "README.md"
11+
12+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
13+
14+
[dependencies]
15+
bytes = "^0.5"
16+
cloudevents-sdk = { version = "0.1.0", path = ".." }
17+
lazy_static = "1.4.0"
18+
rdkafka = "^0.23"
19+
20+
[dev-dependencies]
21+
url = { version = "^2.1" }
22+
serde_json = "^1.0"
23+
chrono = { version = "^0.4", features = ["serde"] }
24+
futures = "0.3.5"

cloudevents-sdk-rdkafka/README.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# CloudEvents SDK Rust - rdkafka [![Crates badge]][crates.io] [![Docs badge]][docs.rs]
2+
3+
Integration of [CloudEvents SDK](https://github.com/cloudevents/sdk-rust/) with [rdkafka](https://github.com/fede1024/rust-rdkafka)
4+
5+
Look at [CloudEvents SDK README](https://github.com/cloudevents/sdk-rust/) for more info.
6+
7+
## Development & Contributing
8+
9+
If you're interested in contributing to sdk-rust, look at [Contributing documentation](../CONTRIBUTING.md)
10+
11+
## Community
12+
13+
- There are bi-weekly calls immediately following the
14+
[Serverless/CloudEvents call](https://github.com/cloudevents/spec#meeting-time)
15+
at 9am PT (US Pacific). Which means they will typically start at 10am PT, but
16+
if the other call ends early then the SDK call will start early as well. See
17+
the
18+
[CloudEvents meeting minutes](https://docs.google.com/document/d/1OVF68rpuPK5shIHILK9JOqlZBbfe91RNzQ7u_P7YCDE/edit#)
19+
to determine which week will have the call.
20+
- Slack: #cloudeventssdk (or #cloudevents-sdk-rust) channel under
21+
[CNCF's Slack workspace](https://slack.cncf.io/).
22+
- Email: https://lists.cncf.io/g/cncf-cloudevents-sdk
23+
- Contact for additional information: Francesco Guardiani (`@slinkydeveloper` on slack).
24+
25+
[Crates badge]: https://img.shields.io/crates/v/cloudevents-sdk-rdkafka.svg
26+
[crates.io]: https://crates.io/crates/cloudevents-sdk-rdkafka
27+
[Docs badge]: https://docs.rs/cloudevents-sdk-rdkafka/badge.svg
28+
[docs.rs]: https://docs.rs/cloudevents-sdk-rdkafka
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
use cloudevents::event::SpecVersion;
2+
use lazy_static::lazy_static;
3+
use std::collections::HashMap;
4+
5+
macro_rules! attribute_name_to_header {
6+
($attribute:expr) => {
7+
format!("ce_{}", $attribute)
8+
};
9+
}
10+
11+
fn attributes_to_headers(it: impl Iterator<Item = &'static str>) -> HashMap<&'static str, String> {
12+
it.map(|s| {
13+
if s == "datacontenttype" {
14+
(s, String::from("content-type"))
15+
} else {
16+
(s, attribute_name_to_header!(s))
17+
}
18+
})
19+
.collect()
20+
}
21+
22+
lazy_static! {
23+
pub(crate) static ref ATTRIBUTES_TO_HEADERS: HashMap<&'static str, String> =
24+
attributes_to_headers(SpecVersion::all_attribute_names());
25+
}
26+
27+
pub(crate) static SPEC_VERSION_HEADER: &'static str = "ce_specversion";
28+
pub(crate) static CLOUDEVENTS_JSON_HEADER: &'static str = "application/cloudevents+json";
29+
pub(crate) static CONTENT_TYPE: &'static str = "content-type";
Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
use crate::headers;
2+
use cloudevents::event::SpecVersion;
3+
use cloudevents::message::{
4+
BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
5+
Result, StructuredDeserializer, StructuredSerializer,
6+
};
7+
use cloudevents::{message, Event};
8+
use rdkafka::message::{BorrowedMessage, Headers, Message, OwnedMessage};
9+
use std::collections::HashMap;
10+
use std::convert::TryFrom;
11+
use std::str;
12+
13+
/// Wrapper for [`Message`] that implements [`MessageDeserializer`] trait.
14+
pub struct ConsumerRecordDeserializer {
15+
pub(crate) headers: HashMap<String, Vec<u8>>,
16+
pub(crate) payload: Option<Vec<u8>>,
17+
}
18+
19+
impl ConsumerRecordDeserializer {
20+
fn get_kafka_headers(message: &impl Message) -> Result<HashMap<String, Vec<u8>>> {
21+
let mut hm = HashMap::new();
22+
let headers = message
23+
.headers()
24+
// TODO create an error variant for invalid headers
25+
.ok_or(cloudevents::message::Error::WrongEncoding {})?;
26+
for i in 0..headers.count() {
27+
let header = headers.get(i).unwrap();
28+
hm.insert(header.0.to_string(), Vec::from(header.1));
29+
}
30+
Ok(hm)
31+
}
32+
33+
pub fn new(message: &impl Message) -> Result<ConsumerRecordDeserializer> {
34+
Ok(ConsumerRecordDeserializer {
35+
headers: Self::get_kafka_headers(message)?,
36+
payload: message.payload().map(|s| Vec::from(s)),
37+
})
38+
}
39+
}
40+
41+
impl BinaryDeserializer for ConsumerRecordDeserializer {
42+
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(mut self, mut visitor: V) -> Result<R> {
43+
if self.encoding() != Encoding::BINARY {
44+
return Err(message::Error::WrongEncoding {});
45+
}
46+
47+
let spec_version = SpecVersion::try_from(
48+
str::from_utf8(&self.headers.remove(headers::SPEC_VERSION_HEADER).unwrap()[..])
49+
.map_err(|e| cloudevents::message::Error::Other {
50+
source: Box::new(e),
51+
})?,
52+
)?;
53+
54+
visitor = visitor.set_spec_version(spec_version.clone())?;
55+
56+
let attributes = spec_version.attribute_names();
57+
58+
if let Some(hv) = self.headers.remove(headers::CONTENT_TYPE) {
59+
visitor = visitor.set_attribute(
60+
"datacontenttype",
61+
MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
62+
cloudevents::message::Error::Other {
63+
source: Box::new(e),
64+
}
65+
})?),
66+
)?
67+
}
68+
69+
for (hn, hv) in self
70+
.headers
71+
.into_iter()
72+
.filter(|(hn, _)| headers::SPEC_VERSION_HEADER != *hn && hn.starts_with("ce_"))
73+
{
74+
let name = &hn["ce_".len()..];
75+
76+
if attributes.contains(&name) {
77+
visitor = visitor.set_attribute(
78+
name,
79+
MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
80+
cloudevents::message::Error::Other {
81+
source: Box::new(e),
82+
}
83+
})?),
84+
)?
85+
} else {
86+
visitor = visitor.set_extension(
87+
name,
88+
MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
89+
cloudevents::message::Error::Other {
90+
source: Box::new(e),
91+
}
92+
})?),
93+
)?
94+
}
95+
}
96+
97+
if self.payload != None {
98+
visitor.end_with_data(self.payload.unwrap())
99+
} else {
100+
visitor.end()
101+
}
102+
}
103+
}
104+
105+
impl StructuredDeserializer for ConsumerRecordDeserializer {
106+
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
107+
if self.encoding() != Encoding::STRUCTURED {
108+
return Err(message::Error::WrongEncoding {});
109+
}
110+
visitor.set_structured_event(self.payload.unwrap())
111+
}
112+
}
113+
114+
impl MessageDeserializer for ConsumerRecordDeserializer {
115+
fn encoding(&self) -> Encoding {
116+
match (
117+
self.headers
118+
.get("content-type")
119+
.map(|s| String::from_utf8(s.to_vec()).ok())
120+
.flatten()
121+
.map(|s| s.starts_with(headers::CLOUDEVENTS_JSON_HEADER))
122+
.unwrap_or(false),
123+
self.headers.get(headers::SPEC_VERSION_HEADER),
124+
) {
125+
(true, _) => Encoding::STRUCTURED,
126+
(_, Some(_)) => Encoding::BINARY,
127+
_ => Encoding::UNKNOWN,
128+
}
129+
}
130+
}
131+
132+
/// Method to transform a [`Message`] to [`Event`].
133+
pub fn record_to_event(msg: &impl Message) -> Result<Event> {
134+
MessageDeserializer::into_event(ConsumerRecordDeserializer::new(msg)?)
135+
}
136+
137+
/// Extension Trait for [`Message`] which acts as a wrapper for the function [`record_to_event()`]
138+
pub trait MessageExt {
139+
/// Generates [`Event`] from [`BorrowedMessage`].
140+
fn to_event(&self) -> Result<Event>;
141+
}
142+
143+
impl MessageExt for BorrowedMessage<'_> {
144+
fn to_event(&self) -> Result<Event> {
145+
record_to_event(self)
146+
}
147+
}
148+
149+
impl MessageExt for OwnedMessage {
150+
fn to_event(&self) -> Result<Event> {
151+
record_to_event(self)
152+
}
153+
}
154+
155+
#[cfg(test)]
156+
mod tests {
157+
use super::*;
158+
use crate::kafka_producer_record::MessageRecord;
159+
160+
use chrono::Utc;
161+
use cloudevents::{EventBuilder, EventBuilderV10};
162+
use serde_json::json;
163+
164+
#[test]
165+
fn test_binary_record() {
166+
let time = Utc::now();
167+
168+
let expected = EventBuilderV10::new()
169+
.id("0001")
170+
.ty("example.test")
171+
.time(time)
172+
.source("http://localhost")
173+
.extension("someint", "10")
174+
.build()
175+
.unwrap();
176+
177+
// Since there is neither a way provided by rust-rdkafka to convert FutureProducer back into
178+
// OwnedMessage or BorrowedMessage, nor is there a way to create a BorrowedMessage struct,
179+
// the test uses OwnedMessage instead, which consumes the message instead of borrowing it like
180+
// in the case of BorrowedMessage
181+
182+
let message_record = MessageRecord::from_event(
183+
EventBuilderV10::new()
184+
.id("0001")
185+
.ty("example.test")
186+
.time(time)
187+
.source("http://localhost")
188+
.extension("someint", "10")
189+
.build()
190+
.unwrap(),
191+
)
192+
.unwrap();
193+
194+
let owned_message = OwnedMessage::new(
195+
message_record.payload,
196+
Some(String::from("test key").into_bytes()),
197+
String::from("test topic"),
198+
rdkafka::message::Timestamp::NotAvailable,
199+
10,
200+
10,
201+
Some(message_record.headers),
202+
);
203+
204+
assert_eq!(owned_message.to_event().unwrap(), expected)
205+
}
206+
207+
#[test]
208+
fn test_structured_record() {
209+
let j = json!({"hello": "world"});
210+
211+
let expected = EventBuilderV10::new()
212+
.id("0001")
213+
.ty("example.test")
214+
.source("http://localhost")
215+
.data("application/json", j.clone())
216+
.extension("someint", "10")
217+
.build()
218+
.unwrap();
219+
220+
// Since there is neither a way provided by rust-rdkafka to convert FutureProducer back into
221+
// OwnedMessage or BorrowedMessage, nor is there a way to create a BorrowedMessage struct,
222+
// the test uses OwnedMessage instead, which consumes the message instead of borrowing it like
223+
// in the case of BorrowedMessage
224+
225+
let input = EventBuilderV10::new()
226+
.id("0001")
227+
.ty("example.test")
228+
.source("http://localhost")
229+
.data("application/json", j.clone())
230+
.extension("someint", "10")
231+
.build()
232+
.unwrap();
233+
234+
let serialized_event =
235+
StructuredDeserializer::deserialize_structured(input, MessageRecord::new()).unwrap();
236+
237+
let owned_message = OwnedMessage::new(
238+
serialized_event.payload,
239+
Some(String::from("test key").into_bytes()),
240+
String::from("test topic"),
241+
rdkafka::message::Timestamp::NotAvailable,
242+
10,
243+
10,
244+
Some(serialized_event.headers),
245+
);
246+
247+
assert_eq!(owned_message.to_event().unwrap(), expected)
248+
}
249+
}

0 commit comments

Comments
 (0)