Skip to content

Commit 4e550a2

Browse files
authored
feat: add receiver queue size configuration to consumer (#371)
1 parent 8780cb1 commit 4e550a2

File tree

3 files changed

+54
-5
lines changed

3 files changed

+54
-5
lines changed

src/consumer/mod.rs

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -563,6 +563,8 @@ mod tests {
563563
.unwrap();
564564

565565
assert!(&consumer_1.check_connection().await.is_ok());
566+
let receive_queue_size = consumer_1.options().receiver_queue_size;
567+
assert_eq!(receive_queue_size, None);
566568

567569
let mut consumer_2: Consumer<TestData, _> = builder
568570
.clone()
@@ -677,6 +679,40 @@ mod tests {
677679
assert!(consumer_1_exclusive.is_ok());
678680
}
679681

682+
#[tokio::test]
683+
#[cfg(any(
684+
feature = "tokio-runtime",
685+
feature = "tokio-rustls-runtime-aws-lc-rs",
686+
feature = "tokio-rustls-runtime-ring"
687+
))]
688+
async fn consumer_zero_receiver_queue_size() {
689+
let _result = log::set_logger(&TEST_LOGGER);
690+
log::set_max_level(LevelFilter::Debug);
691+
let addr = "pulsar://127.0.0.1:6650";
692+
let topic = format!(
693+
"consumer_zero_receiver_queue_size_{}",
694+
rand::random::<u16>()
695+
);
696+
697+
let client: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap();
698+
let consumer: Consumer<TestData, _> = client
699+
.consumer()
700+
.with_topic(&topic)
701+
.with_subscription("dropped_ack")
702+
.with_subscription_type(SubType::Shared)
703+
// get earliest messages
704+
.with_options(
705+
ConsumerOptions::default()
706+
.with_receiver_queue_size(0)
707+
.with_initial_position(InitialPosition::Earliest),
708+
)
709+
.build()
710+
.await
711+
.unwrap();
712+
let size = consumer.options().receiver_queue_size.unwrap();
713+
assert_eq!(size, 1000);
714+
}
715+
680716
#[tokio::test]
681717
#[cfg(any(
682718
feature = "tokio-runtime",
@@ -715,15 +751,18 @@ mod tests {
715751
.with_subscription("dropped_ack")
716752
.with_subscription_type(SubType::Shared)
717753
// get earliest messages
718-
.with_options(ConsumerOptions {
719-
initial_position: InitialPosition::Earliest,
720-
..Default::default()
721-
})
754+
.with_options(
755+
ConsumerOptions::default()
756+
.with_receiver_queue_size(2000)
757+
.with_initial_position(InitialPosition::Earliest),
758+
)
722759
.build()
723760
.await
724761
.unwrap();
725762

726763
println!("created consumer");
764+
let receive_queue_size = consumer.options().receiver_queue_size;
765+
assert_eq!(receive_queue_size, Some(2000));
727766

728767
// consumer.next().await
729768
let msg: Message<TestData> = timeout(Duration::from_secs(1), consumer.next())

src/consumer/options.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ pub struct ConsumerOptions {
2020
pub metadata: BTreeMap<String, String>,
2121
pub read_compacted: Option<bool>,
2222
pub schema: Option<Schema>,
23+
/// size of the receiver queue
24+
pub receiver_queue_size: Option<u32>,
2325
/// Signal whether the subscription will initialize on latest
2426
/// or earliest message (default on latest)
2527
///
@@ -76,4 +78,11 @@ impl ConsumerOptions {
7678
self.initial_position = initial_position;
7779
self
7880
}
81+
82+
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
83+
pub fn with_receiver_queue_size(mut self, size: u32) -> Self {
84+
// todo: support zero_queue_size consumer
85+
self.receiver_queue_size = Some(if size == 0 { 1000 } else { size });
86+
self
87+
}
7988
}

src/consumer/topic.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,8 @@ impl<T: DeserializeMessage, Exe: Executor> TopicConsumer<T, Exe> {
103103
return Err(Error::Executor);
104104
}
105105
}
106-
let (tx, rx) = mpsc::channel(1000);
106+
let receiver_queue_size = options.receiver_queue_size.unwrap_or(1000);
107+
let (tx, rx) = mpsc::channel(receiver_queue_size as usize);
107108
let mut c = ConsumerEngine::new(
108109
client.clone(),
109110
connection.clone(),

0 commit comments

Comments
 (0)