Skip to content

Commit 51b93ed

Browse files
feat: support blocking send_non_blocking() and other RPCs (#355)
1 parent f8c31e9 commit 51b93ed

File tree

3 files changed

+176
-62
lines changed

3 files changed

+176
-62
lines changed

src/client.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -554,6 +554,7 @@ impl<Exe: Executor> PulsarBuilder<Exe> {
554554
Ok(self.with_certificate_chain(v))
555555
}
556556

557+
/// The internal pending queue size for each producer on a topic partition. (default: 100)
557558
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
558559
pub fn with_outbound_channel_size(mut self, size: usize) -> Self {
559560
self.outbound_channel_size = Some(size);

src/connection.rs

Lines changed: 84 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -319,12 +319,13 @@ impl<Exe: Executor> ConnectionSender<Exe> {
319319
}
320320

321321
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
322-
pub(crate) fn send(
322+
pub(crate) async fn send(
323323
&self,
324324
producer_id: u64,
325325
producer_name: String,
326326
sequence_id: u64,
327327
message: producer::ProducerMessage,
328+
block_if_queue_full: bool,
328329
) -> Result<
329330
impl Future<Output = Result<proto::CommandSendReceipt, ConnectionError>>,
330331
ConnectionError,
@@ -334,7 +335,13 @@ impl<Exe: Executor> ConnectionSender<Exe> {
334335
sequence_id,
335336
};
336337
let msg = messages::send(producer_id, producer_name, sequence_id, message);
337-
self.send_message_non_blocking(msg, key, |resp| resp.command.send_receipt)
338+
self.send_message_non_blocking(
339+
msg,
340+
key,
341+
|resp| resp.command.send_receipt,
342+
block_if_queue_full,
343+
)
344+
.await
338345
}
339346

340347
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
@@ -633,15 +640,22 @@ impl<Exe: Executor> ConnectionSender<Exe> {
633640
where
634641
F: FnOnce(Message) -> Option<R> + 'static,
635642
{
636-
self.send_message_non_blocking(msg, key, extract)?.await
643+
// This method is called for RPCs other than CommandSend. If the queue is full due to too
644+
// many CommandSend RPCs not processed in time, we should wait rather than fail fast
645+
// because it's a client side issue that can be recovered later. Hence, set
646+
// block_if_queue_full to true here.
647+
self.send_message_non_blocking(msg, key, extract, true)
648+
.await?
649+
.await
637650
}
638651

639652
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
640-
fn send_message_non_blocking<R: Debug, F>(
653+
async fn send_message_non_blocking<R: Debug, F>(
641654
&self,
642655
msg: Message,
643656
key: RequestKey,
644657
extract: F,
658+
block_if_queue_full: bool,
645659
) -> Result<impl Future<Output = Result<R, ConnectionError>>, ConnectionError>
646660
where
647661
F: FnOnce(Message) -> Option<R> + 'static,
@@ -668,62 +682,76 @@ impl<Exe: Executor> ConnectionSender<Exe> {
668682
})?
669683
};
670684

671-
match (
672-
self.registrations
673-
.unbounded_send(Register::Request { key, resolver }),
674-
self.tx.try_send(msg),
675-
) {
676-
(Ok(_), Ok(_)) => {
677-
let connection_id = self.connection_id;
678-
let error = self.error.clone();
679-
let delay_f = self.executor.delay(self.operation_timeout);
680-
trace!(
681-
"Create timeout futures with operation timeout at {:?}",
682-
self.operation_timeout
685+
self.registrations
686+
.unbounded_send(Register::Request { key, resolver })
687+
.map_err(|e| {
688+
warn!(
689+
"connection {} disconnected when sending the Request: {}",
690+
self.connection_id, e
683691
);
684-
let fut = async move {
685-
pin_mut!(response);
686-
pin_mut!(delay_f);
687-
match select(response, delay_f).await {
688-
Either::Left((res, _)) => {
689-
debug!("Received response: {:?}", res);
690-
res
691-
}
692-
Either::Right(_) => {
693-
warn!(
694-
"connection {} timedout sending message to the Pulsar server",
695-
connection_id
696-
);
697-
error.set(ConnectionError::Io(std::io::Error::new(
698-
std::io::ErrorKind::TimedOut,
699-
format!(
700-
" connection {} timedout sending message to the Pulsar server",
701-
connection_id
702-
),
703-
)));
704-
Err(ConnectionError::Io(std::io::Error::new(
705-
std::io::ErrorKind::TimedOut,
706-
format!(
707-
" connection {} timedout sending message to the Pulsar server",
708-
connection_id
709-
),
710-
)))
711-
}
712-
}
713-
};
714-
715-
Ok(fut)
716-
}
717-
(_, Err(e)) if e.is_full() => Err(ConnectionError::SlowDown),
718-
_ => {
692+
ConnectionError::Disconnected
693+
})?;
694+
if block_if_queue_full {
695+
self.tx.send(msg).await.map_err(|e| {
719696
warn!(
720-
"connection {} disconnected sending message to the Pulsar server",
721-
self.connection_id
697+
"connection {} disconnected when sending the message: {}",
698+
self.connection_id, e
722699
);
723-
self.error.set(ConnectionError::Disconnected);
724-
Err(ConnectionError::Disconnected)
700+
ConnectionError::Disconnected
701+
})?;
702+
} else {
703+
self.tx.try_send(msg).map_err(|e| {
704+
if e.is_full() {
705+
ConnectionError::SlowDown
706+
} else {
707+
warn!(
708+
"connection {} disconnected when sending the message: {}",
709+
self.connection_id, e
710+
);
711+
ConnectionError::Disconnected
712+
}
713+
})?;
714+
};
715+
716+
let connection_id = self.connection_id;
717+
let error = self.error.clone();
718+
let delay_f = self.executor.delay(self.operation_timeout);
719+
trace!(
720+
"Create timeout futures with operation timeout at {:?}",
721+
self.operation_timeout
722+
);
723+
let fut = async move {
724+
pin_mut!(response);
725+
pin_mut!(delay_f);
726+
match select(response, delay_f).await {
727+
Either::Left((res, _)) => {
728+
debug!("Received response: {:?}", res);
729+
res
730+
}
731+
Either::Right(_) => {
732+
warn!(
733+
"connection {} timedout sending message to the Pulsar server",
734+
connection_id
735+
);
736+
error.set(ConnectionError::Io(std::io::Error::new(
737+
std::io::ErrorKind::TimedOut,
738+
format!(
739+
" connection {} timedout sending message to the Pulsar server",
740+
connection_id
741+
),
742+
)));
743+
Err(ConnectionError::Io(std::io::Error::new(
744+
std::io::ErrorKind::TimedOut,
745+
format!(
746+
" connection {} timedout sending message to the Pulsar server",
747+
connection_id
748+
),
749+
)))
750+
}
725751
}
726-
}
752+
};
753+
754+
Ok(fut)
727755
}
728756

729757
/// wait for desired message(commandproducersuccess with ready field true)

src/producer.rs

Lines changed: 91 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,10 @@ pub struct ProducerOptions {
157157
/// producer access mode: shared = 0, exclusive = 1, waitforexclusive =2,
158158
/// exclusivewithoutfencing =3
159159
pub access_mode: Option<i32>,
160+
/// Whether to block if the internal pending queue, whose size is configured by
161+
/// [`crate::client::PulsarBuilder::with_outbound_channel_size`] is full, when awaiting
162+
/// [`Producer::send_non_blocking`]. (default: false)
163+
pub block_queue_if_full: bool,
160164
}
161165

162166
impl ProducerOptions {
@@ -378,6 +382,26 @@ impl<Exe: Executor> Producer<Exe> {
378382
/// - the producer is batching messages, so this function must return immediately, and the
379383
/// receipt will come when the batched messages are actually sent
380384
///
385+
/// If [`ProducerOptions::block_queue_if_full`] is false (by default) and the internal pending
386+
/// queue is full, which means the send rate is too fast,
387+
/// [`crate::error::ConnectionError::SlowDown`] will be returned. You should handle the error
388+
/// like:
389+
///
390+
/// ```rust,no_run
391+
/// use pulsar::error::{ConnectionError, Error, ProducerError};
392+
///
393+
/// # async fn run(mut producer: pulsar::Producer<pulsar::TokioExecutor>) -> Result<(), pulsar::Error> {
394+
/// match producer.send_non_blocking("msg").await {
395+
/// Ok(future) => { /* handle the send future */ }
396+
/// Err(Error::Producer(ProducerError::Connection(ConnectionError::SlowDown))) => {
397+
/// /* wait for a while and resent */
398+
/// }
399+
/// Err(e) => { /* handle other errors */ }
400+
/// }
401+
/// # Ok(())
402+
/// # }
403+
/// ```
404+
///
381405
/// Usage:
382406
///
383407
/// ```rust,no_run
@@ -818,12 +842,17 @@ where
818842
{
819843
loop {
820844
let message = message.clone();
821-
match connection.sender().send(
822-
producer_id,
823-
producer_name.clone(),
824-
sequence_id.get(),
825-
message,
826-
) {
845+
match connection
846+
.sender()
847+
.send(
848+
producer_id,
849+
producer_name.clone(),
850+
sequence_id.get(),
851+
message,
852+
options.block_queue_if_full,
853+
)
854+
.await
855+
{
827856
Ok(fut) => {
828857
let fut = async move {
829858
let res = fut.await;
@@ -1379,8 +1408,10 @@ impl<T: SerializeMessage + Sized, Exe: Executor> MessageBuilder<'_, T, Exe> {
13791408
#[cfg(test)]
13801409
mod tests {
13811410
use futures::executor::block_on;
1411+
use log::LevelFilter;
13821412

13831413
use super::*;
1414+
use crate::{tests::TEST_LOGGER, TokioExecutor};
13841415

13851416
#[test]
13861417
fn send_future_errors_when_sender_dropped() {
@@ -1436,4 +1467,58 @@ mod tests {
14361467
assert!(pm.compression.is_none());
14371468
assert!(pm.uncompressed_size.is_none());
14381469
}
1470+
1471+
#[tokio::test]
1472+
async fn block_if_queue_full() {
1473+
let _result = log::set_logger(&TEST_LOGGER);
1474+
log::set_max_level(LevelFilter::Debug);
1475+
let pulsar: Pulsar<_> = Pulsar::builder("pulsar://127.0.0.1:6650", TokioExecutor)
1476+
.with_outbound_channel_size(3)
1477+
.build()
1478+
.await
1479+
.unwrap();
1480+
let mut producer = pulsar
1481+
.producer()
1482+
.with_topic(format!("block_queue_if_full_{}", rand::random::<u16>()))
1483+
.build()
1484+
.await
1485+
.unwrap();
1486+
let mut send_results = Vec::with_capacity(10);
1487+
for i in 0..10 {
1488+
send_results.push(producer.send_non_blocking(format!("msg-{i}")).await);
1489+
}
1490+
let mut failed_indexes = vec![];
1491+
for (i, result) in send_results.into_iter().enumerate() {
1492+
match result {
1493+
Ok(_) => {}
1494+
Err(Error::Producer(ProducerError::Connection(ConnectionError::SlowDown))) => {
1495+
failed_indexes.push(i);
1496+
}
1497+
Err(e) => panic!("failed to send {}: {}", i, e),
1498+
}
1499+
}
1500+
info!("Messages failed due to SlowDown: {:?}", &failed_indexes);
1501+
assert!(!failed_indexes.is_empty());
1502+
1503+
let mut producer = pulsar
1504+
.producer()
1505+
.with_topic(format!("block_queue_if_full_{}", rand::random::<u16>()))
1506+
.with_options(ProducerOptions {
1507+
block_queue_if_full: true,
1508+
..Default::default()
1509+
})
1510+
.build()
1511+
.await
1512+
.unwrap();
1513+
let mut send_results = Vec::with_capacity(10);
1514+
for i in 0..10 {
1515+
send_results.push(producer.send_non_blocking(format!("msg-{i}")).await);
1516+
}
1517+
for (i, result) in send_results.into_iter().enumerate() {
1518+
match result {
1519+
Ok(_) => {}
1520+
Err(e) => panic!("failed to send {}: {}", i, e),
1521+
}
1522+
}
1523+
}
14391524
}

0 commit comments

Comments
 (0)