Skip to content

Commit faa81f0

Browse files
committed
Basic produce.
1 parent b4a0ac5 commit faa81f0

File tree

4 files changed

+56
-0
lines changed

4 files changed

+56
-0
lines changed

src/producer/base_producer.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
//! should wait and try again.
4343
4444
use std::ffi::{CStr, CString};
45+
use std::fmt;
4546
use std::marker::PhantomData;
4647
use std::mem;
4748
use std::os::raw::c_void;
@@ -340,6 +341,19 @@ where
340341
_partitioner: PhantomData<Part>,
341342
}
342343

344+
impl<C, Part> fmt::Debug for BaseProducer<C, Part>
345+
where
346+
Part: Partitioner,
347+
C: ProducerContext<Part>,
348+
{
349+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
350+
f.debug_struct("BaseProducer")
351+
.field("native_ptr", &self.native_ptr())
352+
.field("queue", &self.queue)
353+
.finish()
354+
}
355+
}
356+
343357
impl<C, Part> BaseProducer<C, Part>
344358
where
345359
Part: Partitioner,

tests/producer.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
1+
use rdkafka::ClientConfig;
12
use crate::utils::containers::KafkaContext;
23
use crate::utils::logging::init_test_logger;
4+
use crate::utils::producer::create_base_producer;
5+
use crate::utils::rand::rand_test_topic;
6+
use rdkafka::producer::BaseRecord;
37

48
mod utils;
59

@@ -14,4 +18,30 @@ pub async fn test_basic_produce() {
1418
kafka_context_result.unwrap_err()
1519
);
1620
};
21+
22+
let bootstrap_servers_result = _kafka_context.bootstrap_servers().await;
23+
let Ok(bootstrap_servers) = bootstrap_servers_result else {
24+
panic!("could not create bootstrap servers: {}", bootstrap_servers_result.unwrap_err());
25+
};
26+
let mut client_config = ClientConfig::default();
27+
client_config.set("bootstrap.servers", bootstrap_servers);
28+
29+
let base_producer_result = create_base_producer(&client_config);
30+
let Ok(base_producer) = base_producer_result else {
31+
panic!(
32+
"could not create based_producer: {}",
33+
base_producer_result.unwrap_err()
34+
);
35+
};
36+
37+
let test_topic = rand_test_topic("testing-topic");
38+
let record = BaseRecord::to(&test_topic) // destination topic
39+
.key(&[1, 2, 3, 4]) // message key
40+
.payload("content") // message payload
41+
.partition(5);
42+
43+
let send_result = base_producer.send(record);
44+
if send_result.is_err() {
45+
panic!("could not produce record: {:?}", send_result.unwrap_err());
46+
}
1747
}

tests/utils/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
pub mod containers;
44
pub mod logging;
5+
pub mod producer;
56
pub mod rand;
67

78
use std::collections::HashMap;

tests/utils/producer.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
use rdkafka::ClientConfig;
2+
use rdkafka::config::FromClientConfig;
3+
use rdkafka::producer::BaseProducer;
4+
5+
pub fn create_base_producer(config: &ClientConfig) -> anyhow::Result<BaseProducer> {
6+
let base_producer_result = BaseProducer::from_config(config);
7+
let Ok(base_producer) = base_producer_result else {
8+
anyhow::bail!("error creating base producer: {}", base_producer_result.unwrap_err())
9+
};
10+
Ok(base_producer)
11+
}

0 commit comments

Comments
 (0)