Skip to content

Commit 77fa24b

Browse files
fix: producer is not closed at server side when batching is enabled (#363)
1 parent b4a67b8 commit 77fa24b

File tree

3 files changed

+41
-12
lines changed

3 files changed

+41
-12
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ serde = { version = "^1.0.216", features = ["derive"] }
5959
serde_json = "^1.0.133"
6060
tokio = { version = "^1.42.0", features = ["macros", "rt-multi-thread"] }
6161
assert_matches = "1.5.0"
62+
reqwest = { version = "0.12.23", features = ["json"] }
6263

6364
[build-dependencies]
6465
prost-build = "^0.13.4"

src/lib.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ mod tests {
215215
use assert_matches::assert_matches;
216216
use futures::{future::try_join_all, StreamExt};
217217
use log::{LevelFilter, Metadata, Record};
218+
use serde_json::Value;
218219
#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))]
219220
use tokio::time::timeout;
220221

@@ -649,8 +650,14 @@ mod tests {
649650
// Flush 0 messages should be ok
650651
producer.send_batch().await.unwrap();
651652

653+
assert!(!is_publishers_empty(&topic).await);
654+
652655
let send_futures = send_messages(&mut producer, vec!["3", "4"]).await;
653656
producer.close().await.unwrap();
657+
// After the CloseProducer RPC is done, the producer should be removed from the publishers
658+
// list in topic stats
659+
assert!(is_publishers_empty(&topic).await);
660+
654661
for send_future in send_futures {
655662
let error = send_future.await.err().unwrap();
656663
assert_matches!(error, PulsarError::Producer(ProducerError::Closed));
@@ -742,4 +749,18 @@ mod tests {
742749
}
743750
actual_values
744751
}
752+
753+
async fn is_publishers_empty(topic: &str) -> bool {
754+
let stats_url =
755+
format!("http://127.0.0.1:8080/admin/v2/persistent/public/default/{topic}/stats");
756+
let response = reqwest::get(stats_url).await.unwrap();
757+
assert!(response.status().is_success());
758+
let json_value: Value = response.json().await.unwrap();
759+
if let Some(publishers) = json_value.get("publishers") {
760+
let publishers = publishers.as_array().unwrap();
761+
publishers.is_empty()
762+
} else {
763+
panic!("No publishers in the stats");
764+
}
765+
}
745766
}

src/producer.rs

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::{
2727
proto::{self, CommandSendReceipt, EncryptionKeys, Schema},
2828
BatchedMessage,
2929
},
30+
proto::CommandSuccess,
3031
retry_op::retry_create_producer,
3132
BrokerAddress, Error, Pulsar,
3233
};
@@ -603,18 +604,19 @@ impl<Exe: Executor> TopicProducer<Exe> {
603604
let (msg_sender, msg_receiver) = mpsc::channel::<BatchItem>(10);
604605
let executor_clone = executor.clone();
605606
let (batch_sender, batch_receiver) = mpsc::channel::<Vec<BatchItem>>(1);
606-
let (close_sender, close_receiver) = oneshot::channel::<()>();
607+
let (close_sender, close_receiver) =
608+
oneshot::channel::<Result<CommandSuccess, ConnectionError>>();
607609

608610
let _ = executor.spawn(Box::pin(batch_process_loop(
609611
producer_id,
610612
batch_storage,
611613
msg_receiver,
612-
close_sender,
613614
batch_sender,
614615
executor_clone,
615616
)));
616617
let _ = executor.spawn(Box::pin(message_send_loop(
617618
batch_receiver,
619+
close_sender,
618620
client.clone(),
619621
connection.clone(),
620622
topic.clone(),
@@ -743,7 +745,11 @@ impl<Exe: Executor> TopicProducer<Exe> {
743745
Some(mut batch) if self.options.enabled_batching() => {
744746
batch.msg_sender.close_channel();
745747
let close_receiver = &mut batch.close_receiver;
746-
let _ = close_receiver.await;
748+
return match close_receiver.await {
749+
Ok(Ok(_)) => Ok(()),
750+
Ok(Err(e)) => Err(Error::Producer(ProducerError::Connection(e))),
751+
Err(_) => Err(Error::Producer(ProducerError::Closed)),
752+
};
747753
}
748754
_ => {
749755
warn!(
@@ -1072,15 +1078,14 @@ struct Batch {
10721078
// sends a message or trigger a flush
10731079
msg_sender: mpsc::Sender<BatchItem>,
10741080
// receives the notification when `bath_process_loop` is closed
1075-
close_receiver: oneshot::Receiver<()>,
1081+
close_receiver: oneshot::Receiver<Result<CommandSuccess, ConnectionError>>,
10761082
}
10771083

10781084
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
10791085
async fn batch_process_loop(
10801086
producer_id: ProducerId,
10811087
mut batch_storage: BatchStorage,
10821088
mut msg_receiver: mpsc::Receiver<BatchItem>,
1083-
close_sender: oneshot::Sender<()>,
10841089
mut batch_sender: mpsc::Sender<Vec<BatchItem>>,
10851090
executor: impl Executor,
10861091
) {
@@ -1123,14 +1128,8 @@ async fn batch_process_loop(
11231128
}
11241129
}
11251130
} else {
1126-
debug!("producer {producer_id}'s batch_process_loop: channel closed, exiting");
1131+
info!("producer {producer_id}'s batch_process_loop: channel closed, exiting");
11271132
}
1128-
let _ = close_sender.send(()).inspect_err(|e| {
1129-
warn!(
1130-
"{producer_id} could not notify the batch_process_loop is closed: {:?}, the producer might be dropped without closing",
1131-
e
1132-
);
1133-
});
11341133
break;
11351134
}
11361135
Either::Right((_, previous_recv_future)) => {
@@ -1147,6 +1146,7 @@ async fn batch_process_loop(
11471146
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
11481147
async fn message_send_loop<Exe>(
11491148
mut msg_receiver: mpsc::Receiver<Vec<BatchItem>>,
1149+
close_sender: oneshot::Sender<Result<CommandSuccess, ConnectionError>>,
11501150
client: Pulsar<Exe>,
11511151
mut connection: Arc<Connection<Exe>>,
11521152
topic: String,
@@ -1242,6 +1242,13 @@ async fn message_send_loop<Exe>(
12421242
}
12431243
None => {
12441244
debug!("producer {producer_id} message_send_loop: channel closed, exiting");
1245+
let close_result = connection.sender().close_producer(producer_id).await;
1246+
let _ = close_sender.send(close_result).inspect_err(|e| {
1247+
warn!(
1248+
"{producer_id} could not notify the message_send_loop is closed: {:?}, the producer might be dropped without closing",
1249+
e
1250+
);
1251+
});
12451252
break;
12461253
}
12471254
}

0 commit comments

Comments
 (0)