Skip to content

Commit d963be5

Browse files
fix(producer): prevent send requests with 0 as num_messages (#377)
1 parent 4043d22 commit d963be5

File tree

4 files changed

+104
-61
lines changed

4 files changed

+104
-61
lines changed

src/lib.rs

Lines changed: 36 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ pub mod reader;
225225
mod retry_op;
226226
pub mod routing_policy;
227227
mod service_discovery;
228+
mod test_utils;
228229

229230
#[cfg(all(
230231
any(
@@ -248,7 +249,7 @@ mod tests {
248249

249250
use assert_matches::assert_matches;
250251
use futures::{future::try_join_all, StreamExt};
251-
use log::{LevelFilter, Metadata, Record};
252+
use log::{Metadata, Record};
252253
use serde_json::Value;
253254
#[cfg(any(
254255
feature = "tokio-runtime",
@@ -263,7 +264,6 @@ mod tests {
263264
feature = "tokio-rustls-runtime-aws-lc-rs",
264265
feature = "tokio-rustls-runtime-ring"
265266
))]
266-
use crate::executor::TokioExecutor;
267267
use crate::{
268268
client::SerializeMessage,
269269
consumer::{InitialPosition, Message},
@@ -332,11 +332,7 @@ mod tests {
332332
feature = "tokio-rustls-runtime-ring"
333333
))]
334334
async fn round_trip() {
335-
let _result = log::set_logger(&TEST_LOGGER);
336-
log::set_max_level(LevelFilter::Debug);
337-
338-
let addr = "pulsar://127.0.0.1:6650";
339-
let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap();
335+
let pulsar = test_utils::new_pulsar().await;
340336

341337
// random topic to better allow multiple test runs while debugging
342338
let topic = format!("test_{}", rand::random::<u16>());
@@ -401,12 +397,8 @@ mod tests {
401397
feature = "tokio-rustls-runtime-ring"
402398
))]
403399
async fn unsized_data() {
404-
let _result = log::set_logger(&TEST_LOGGER);
405-
log::set_max_level(LevelFilter::Debug);
406-
407-
let addr = "pulsar://127.0.0.1:6650";
400+
let pulsar = test_utils::new_pulsar().await;
408401
let test_id: u16 = rand::random();
409-
let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap();
410402

411403
// test &str
412404
{
@@ -491,13 +483,8 @@ mod tests {
491483
feature = "tokio-rustls-runtime-ring"
492484
))]
493485
async fn redelivery() {
494-
let _result = log::set_logger(&TEST_LOGGER);
495-
log::set_max_level(LevelFilter::Debug);
496-
497-
let addr = "pulsar://127.0.0.1:6650";
486+
let pulsar = test_utils::new_pulsar().await;
498487
let topic = format!("test_redelivery_{}", rand::random::<u16>());
499-
500-
let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap();
501488
pulsar
502489
.send(&topic, String::from("data"))
503490
.await
@@ -545,13 +532,8 @@ mod tests {
545532
async fn batching() {
546533
use assert_matches::assert_matches;
547534

548-
let _result = log::set_logger(&TEST_LOGGER);
549-
log::set_max_level(LevelFilter::Debug);
550-
551-
let addr = "pulsar://127.0.0.1:6650";
535+
let pulsar = test_utils::new_pulsar().await;
552536
let topic = format!("test_batching_{}", rand::random::<u16>());
553-
554-
let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap();
555537
let mut consumer: Consumer<String, _> =
556538
pulsar.consumer().with_topic(&topic).build().await.unwrap();
557539

@@ -677,13 +659,8 @@ mod tests {
677659
feature = "tokio-rustls-runtime-ring"
678660
))]
679661
async fn flush() {
680-
let _result = log::set_logger(&TEST_LOGGER);
681-
log::set_max_level(LevelFilter::Debug);
682-
683-
let addr = "pulsar://127.0.0.1:6650";
662+
let pulsar = test_utils::new_pulsar().await;
684663
let topic = format!("test_flush_{}", rand::random::<u16>());
685-
686-
let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap();
687664
let mut consumer: Consumer<String, _> =
688665
pulsar.consumer().with_topic(&topic).build().await.unwrap();
689666
let mut producer =
@@ -825,4 +802,33 @@ mod tests {
825802
panic!("No publishers in the stats");
826803
}
827804
}
805+
806+
#[tokio::test]
807+
async fn flush_on_partitioned_topic() {
808+
let pulsar = test_utils::new_pulsar().await;
809+
let topic = format!("test_flush_on_part_topic_{}", rand::random::<u16>());
810+
811+
const NUM_PARTITIONS: u32 = 2;
812+
test_utils::create_partitioned_topic("public", "default", &topic, NUM_PARTITIONS).await;
813+
let mut producer =
814+
create_batched_producer(pulsar.clone(), &topic, Some(2), None, None).await;
815+
let send_future = producer.send_non_blocking("msg").await.unwrap();
816+
producer.send_batch().await.unwrap();
817+
818+
let msg_id = send_future.await.unwrap().message_id.unwrap();
819+
for i in 0..NUM_PARTITIONS {
820+
let mut reader = pulsar
821+
.reader()
822+
.with_topic(format!("{}-partition-{}", topic, i))
823+
.build::<String>()
824+
.await
825+
.unwrap();
826+
let last_msg_ids = reader.get_last_message_id().await.unwrap();
827+
let last_msg_id = last_msg_ids.first().unwrap();
828+
if last_msg_id.ledger_id != u64::MAX && last_msg_id.entry_id != u64::MAX {
829+
assert_eq!(last_msg_id.ledger_id, msg_id.ledger_id);
830+
assert_eq!(last_msg_id.entry_id, msg_id.entry_id);
831+
}
832+
}
833+
}
828834
}

src/producer.rs

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1227,6 +1227,12 @@ async fn message_send_loop<Exe>(
12271227
);
12281228
}
12291229
}
1230+
if counter == 0 {
1231+
if let Some(flush_tx) = flush_tx {
1232+
let _ = flush_tx.send(());
1233+
}
1234+
continue;
1235+
}
12301236

12311237
let message = ProducerMessage {
12321238
payload,
@@ -1435,7 +1441,9 @@ mod tests {
14351441
use log::LevelFilter;
14361442

14371443
use super::*;
1438-
use crate::{routing_policy::CustomRoutingPolicy, tests::TEST_LOGGER, TokioExecutor};
1444+
use crate::{
1445+
routing_policy::CustomRoutingPolicy, test_utils, tests::TEST_LOGGER, TokioExecutor,
1446+
};
14391447

14401448
#[test]
14411449
fn send_future_errors_when_sender_dropped() {
@@ -1582,7 +1590,7 @@ mod tests {
15821590
..Default::default()
15831591
};
15841592
let partition_count = 3;
1585-
create_partitioned_topic("public", "default", &topic, partition_count).await;
1593+
test_utils::create_partitioned_topic("public", "default", &topic, partition_count).await;
15861594

15871595
let mut producer = pulsar
15881596
.producer()
@@ -1652,7 +1660,7 @@ mod tests {
16521660
..Default::default()
16531661
};
16541662
let partition_count = 3;
1655-
create_partitioned_topic("public", "default", &topic, partition_count).await;
1663+
test_utils::create_partitioned_topic("public", "default", &topic, partition_count).await;
16561664

16571665
let mut producer = pulsar
16581666
.producer()
@@ -1715,7 +1723,7 @@ mod tests {
17151723
..Default::default()
17161724
};
17171725
let partition_count = 3;
1718-
create_partitioned_topic("public", "default", &topic, partition_count).await;
1726+
test_utils::create_partitioned_topic("public", "default", &topic, partition_count).await;
17191727

17201728
let mut producer = pulsar
17211729
.producer()
@@ -1755,23 +1763,4 @@ mod tests {
17551763
assert!(send_receipt.producer_id == producer_id);
17561764
}
17571765
}
1758-
1759-
async fn create_partitioned_topic(
1760-
tenant: &str,
1761-
namespace: &str,
1762-
topic_name: &str,
1763-
num_partitions: u32,
1764-
) {
1765-
let create_partitioned_topic_url = format!(
1766-
"http://127.0.0.1:8080/admin/v2/persistent/{tenant}/{namespace}/{topic_name}/partitions"
1767-
);
1768-
let client = reqwest::Client::new();
1769-
let response = client
1770-
.put(create_partitioned_topic_url)
1771-
.json(&num_partitions.to_string())
1772-
.send()
1773-
.await
1774-
.unwrap();
1775-
assert!(response.status().is_success());
1776-
}
17771766
}

src/reader.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -184,18 +184,21 @@ impl<T: DeserializeMessage, Exe: Executor> Reader<T, Exe> {
184184

185185
#[cfg(test)]
186186
mod tests {
187-
use crate::consumer::{DeadLetterPolicy, InitialPosition, Message};
188-
use crate::proto::MessageIdData;
189-
use crate::reader::Reader;
190-
use crate::{
191-
producer, ConsumerOptions, DeserializeMessage, Error, Executor, Payload, Pulsar,
192-
SerializeMessage, SubType, TokioExecutor,
193-
};
187+
use std::time::Duration;
188+
194189
use futures::StreamExt;
195190
use serde::{Deserialize, Serialize};
196-
use std::time::Duration;
197191
use tokio::time::timeout;
198192

193+
use crate::{
194+
consumer::{DeadLetterPolicy, InitialPosition, Message},
195+
producer,
196+
proto::MessageIdData,
197+
reader::Reader,
198+
ConsumerOptions, DeserializeMessage, Error, Executor, Payload, Pulsar, SerializeMessage,
199+
SubType, TokioExecutor,
200+
};
201+
199202
#[derive(Serialize, Deserialize)]
200203
struct TestData {
201204
data: String,

src/test_utils.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
#[cfg(test)]
2+
use crate::{client::Pulsar, executor::TokioExecutor};
3+
4+
/// Wrapper for the Tokio executor
5+
#[cfg(any(
6+
feature = "tokio-runtime",
7+
feature = "tokio-rustls-runtime-aws-lc-rs",
8+
feature = "tokio-rustls-runtime-ring"
9+
))]
10+
#[cfg(test)]
11+
pub async fn new_pulsar() -> Pulsar<TokioExecutor> {
12+
use log::LevelFilter;
13+
14+
use crate::tests::TEST_LOGGER;
15+
16+
let _result = log::set_logger(&TEST_LOGGER);
17+
log::set_max_level(LevelFilter::Debug);
18+
19+
Pulsar::builder("pulsar://127.0.0.1:6650", TokioExecutor)
20+
.build()
21+
.await
22+
.unwrap()
23+
}
24+
25+
#[cfg(test)]
26+
pub(crate) async fn create_partitioned_topic(
27+
tenant: &str,
28+
namespace: &str,
29+
topic_name: &str,
30+
num_partitions: u32,
31+
) {
32+
use reqwest::Client;
33+
34+
let create_partitioned_topic_url = format!(
35+
"http://127.0.0.1:8080/admin/v2/persistent/{tenant}/{namespace}/{topic_name}/partitions"
36+
);
37+
let client = Client::new();
38+
let response = client
39+
.put(create_partitioned_topic_url)
40+
.json(&num_partitions.to_string())
41+
.send()
42+
.await
43+
.unwrap();
44+
assert!(response.status().is_success());
45+
}

0 commit comments

Comments
 (0)