Skip to content

Commit e330c6c

Browse files
Docs in cloudevents-sdk-rdkafka
Signed-off-by: Francesco Guardiani <[email protected]>
1 parent 6658a80 commit e330c6c

File tree

4 files changed

+36
-13
lines changed

4 files changed

+36
-13
lines changed

cloudevents-sdk-rdkafka/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,15 @@ description = "CloudEvents official Rust SDK - Kafka integration"
88
documentation = "https://docs.rs/cloudevents-sdk-rdkafka"
99
repository = "https://github.com/cloudevents/sdk-rust"
1010
readme = "README.md"
11+
categories = ["web-programming", "encoding"]
1112

1213
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
1314

1415
[dependencies]
1516
bytes = "^0.5"
1617
cloudevents-sdk = { version = "0.2.0", path = ".." }
1718
lazy_static = "1.4.0"
18-
rdkafka = { version = "^0.24", features = ["cmake-build"] }
19+
rdkafka = { version = "^0.24", default-features = false }
1920

2021

2122
[dev-dependencies]

cloudevents-sdk-rdkafka/src/kafka_consumer_record.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,10 @@ pub fn record_to_event(msg: &impl Message) -> Result<Event> {
134134
MessageDeserializer::into_event(ConsumerRecordDeserializer::new(msg)?)
135135
}
136136

137-
/// Extension Trait for [`Message`] which acts as a wrapper for the function [`record_to_event()`]
138-
pub trait MessageExt {
137+
/// Extension Trait for [`Message`] which acts as a wrapper for the function [`record_to_event()`].
138+
///
139+
/// This trait is sealed and cannot be implemented for types outside of this crate.
140+
pub trait MessageExt: private::Sealed {
139141
/// Generates [`Event`] from [`BorrowedMessage`].
140142
fn to_event(&self) -> Result<Event>;
141143
}
@@ -152,6 +154,13 @@ impl MessageExt for OwnedMessage {
152154
}
153155
}
154156

157+
mod private {
158+
// Sealing the MessageExt
159+
pub trait Sealed {}
160+
impl Sealed for rdkafka::message::OwnedMessage {}
161+
impl Sealed for rdkafka::message::BorrowedMessage<'_> {}
162+
}
163+
155164
#[cfg(test)]
156165
mod tests {
157166
use super::*;

cloudevents-sdk-rdkafka/src/kafka_producer_record.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,9 @@ impl StructuredSerializer<MessageRecord> for MessageRecord {
8888
}
8989

9090
/// Extension Trait for [`BaseRecord`] that fills the record with a [`MessageRecord`].
91-
pub trait BaseRecordExt<'a, K: ToBytes + ?Sized> {
91+
///
92+
/// This trait is sealed and cannot be implemented for types outside of this crate.
93+
pub trait BaseRecordExt<'a, K: ToBytes + ?Sized>: private::Sealed {
9294
/// Fill this [`BaseRecord`] with a [`MessageRecord`].
9395
fn message_record(
9496
self,
@@ -112,7 +114,9 @@ impl<'a, K: ToBytes + ?Sized> BaseRecordExt<'a, K> for BaseRecord<'a, K, Vec<u8>
112114
}
113115

114116
/// Extension Trait for [`FutureRecord`] that fills the record with a [`MessageRecord`].
115-
pub trait FutureRecordExt<'a, K: ToBytes + ?Sized> {
117+
///
118+
/// This trait is sealed and cannot be implemented for types outside of this crate.
119+
pub trait FutureRecordExt<'a, K: ToBytes + ?Sized>: private::Sealed {
116120
/// Fill this [`FutureRecord`] with a [`MessageRecord`].
117121
fn message_record(self, message_record: &'a MessageRecord) -> FutureRecord<'a, K, Vec<u8>>;
118122
}
@@ -128,3 +132,10 @@ impl<'a, K: ToBytes + ?Sized> FutureRecordExt<'a, K> for FutureRecord<'a, K, Vec
128132
self
129133
}
130134
}
135+
136+
mod private {
137+
// Sealing the FutureRecordExt and BaseRecordExt
138+
pub trait Sealed {}
139+
impl <K: rdkafka::message::ToBytes + ?Sized, V: rdkafka::message::ToBytes> Sealed for rdkafka::producer::FutureRecord<'_, K, V> {}
140+
impl <K: rdkafka::message::ToBytes + ?Sized, V: rdkafka::message::ToBytes> Sealed for rdkafka::producer::BaseRecord<'_, K, V> {}
141+
}

cloudevents-sdk-rdkafka/src/lib.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,16 @@
1010
//! use rdkafka::util::Timeout;
1111
//! use cloudevents_sdk_rdkafka::{MessageRecord, FutureRecordExt};
1212
//!
13-
//! # async fn produce(producer: &FutureProducer, event: Event) {
14-
//! let message_record = MessageRecord::from_event(event)
15-
//! .expect("error while serializing the event");
13+
//! # async fn produce(producer: &FutureProducer, event: Event) -> Result<(), Box<dyn std::error::Error>> {
14+
//! let message_record = MessageRecord::from_event(event)?;
1615
//!
1716
//! producer.send(
1817
//! FutureRecord::to("topic")
1918
//! .key("some_event")
2019
//! .message_record(&message_record),
2120
//! Timeout::Never
2221
//! ).await;
23-
//!
22+
//! # Ok(())
2423
//! # }
2524
//!
2625
//! ```
@@ -32,22 +31,25 @@
3231
//! use cloudevents_sdk_rdkafka::MessageExt;
3332
//! use futures::StreamExt;
3433
//!
35-
//! # async fn consume(consumer: StreamConsumer<DefaultConsumerContext>) {
34+
//! # async fn consume(consumer: StreamConsumer<DefaultConsumerContext>) -> Result<(), Box<dyn std::error::Error>> {
3635
//! let mut message_stream = consumer.start();
3736
//!
3837
//! while let Some(message) = message_stream.next().await {
3938
//! match message {
4039
//! Err(e) => println!("Kafka error: {}", e),
4140
//! Ok(m) => {
42-
//! let event = m.to_event().expect("error while deserializing record to CloudEvent");
43-
//! println!("Received Event: {:#?}", event);
44-
//! consumer.commit_message(&m, CommitMode::Async).unwrap();
41+
//! let event = m.to_event()?;
42+
//! println!("Received Event: {}", event);
43+
//! consumer.commit_message(&m, CommitMode::Async)?;
4544
//! }
4645
//! };
4746
//! }
47+
//! # Ok(())
4848
//! # }
4949
//! ```
5050
51+
#![deny(broken_intra_doc_links)]
52+
5153
#[macro_use]
5254
mod headers;
5355
mod kafka_consumer_record;

0 commit comments

Comments
 (0)