Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ serde = { version = "^1.0.216", features = ["derive"] }
serde_json = "^1.0.133"
tokio = { version = "^1.42.0", features = ["macros", "rt-multi-thread"] }
assert_matches = "1.5.0"
reqwest = { version = "0.12.23", features = ["json"] }

[build-dependencies]
prost-build = "^0.13.4"
Expand Down
21 changes: 21 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ mod tests {
use assert_matches::assert_matches;
use futures::{future::try_join_all, StreamExt};
use log::{LevelFilter, Metadata, Record};
use serde_json::Value;
#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))]
use tokio::time::timeout;

Expand Down Expand Up @@ -649,8 +650,14 @@ mod tests {
// Flush 0 messages should be ok
producer.send_batch().await.unwrap();

assert!(!is_publishers_empty(&topic).await);

let send_futures = send_messages(&mut producer, vec!["3", "4"]).await;
producer.close().await.unwrap();
// After the CloseProducer RPC is done, the producer should be removed from the publishers
// list in topic stats
assert!(is_publishers_empty(&topic).await);

for send_future in send_futures {
let error = send_future.await.err().unwrap();
assert_matches!(error, PulsarError::Producer(ProducerError::Closed));
Expand Down Expand Up @@ -742,4 +749,18 @@ mod tests {
}
actual_values
}

async fn is_publishers_empty(topic: &str) -> bool {
let stats_url =
format!("http://127.0.0.1:8080/admin/v2/persistent/public/default/{topic}/stats");
let response = reqwest::get(stats_url).await.unwrap();
assert!(response.status().is_success());
let json_value: Value = response.json().await.unwrap();
if let Some(publishers) = json_value.get("publishers") {
let publishers = publishers.as_array().unwrap();
publishers.is_empty()
} else {
panic!("No publishers in the stats");
}
}
}
31 changes: 19 additions & 12 deletions src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::{
proto::{self, CommandSendReceipt, EncryptionKeys, Schema},
BatchedMessage,
},
proto::CommandSuccess,
retry_op::retry_create_producer,
BrokerAddress, Error, Pulsar,
};
Expand Down Expand Up @@ -603,18 +604,19 @@ impl<Exe: Executor> TopicProducer<Exe> {
let (msg_sender, msg_receiver) = mpsc::channel::<BatchItem>(10);
let executor_clone = executor.clone();
let (batch_sender, batch_receiver) = mpsc::channel::<Vec<BatchItem>>(1);
let (close_sender, close_receiver) = oneshot::channel::<()>();
let (close_sender, close_receiver) =
oneshot::channel::<Result<CommandSuccess, ConnectionError>>();

let _ = executor.spawn(Box::pin(batch_process_loop(
producer_id,
batch_storage,
msg_receiver,
close_sender,
batch_sender,
executor_clone,
)));
let _ = executor.spawn(Box::pin(message_send_loop(
batch_receiver,
close_sender,
client.clone(),
connection.clone(),
topic.clone(),
Expand Down Expand Up @@ -743,7 +745,11 @@ impl<Exe: Executor> TopicProducer<Exe> {
Some(mut batch) if self.options.enabled_batching() => {
batch.msg_sender.close_channel();
let close_receiver = &mut batch.close_receiver;
let _ = close_receiver.await;
return match close_receiver.await {
Ok(Ok(_)) => Ok(()),
Ok(Err(e)) => Err(Error::Producer(ProducerError::Connection(e))),
Err(_) => Err(Error::Producer(ProducerError::Closed)),
};
}
_ => {
warn!(
Expand Down Expand Up @@ -1072,15 +1078,14 @@ struct Batch {
// sends a message or trigger a flush
msg_sender: mpsc::Sender<BatchItem>,
// receives the notification when `bath_process_loop` is closed
close_receiver: oneshot::Receiver<()>,
close_receiver: oneshot::Receiver<Result<CommandSuccess, ConnectionError>>,
}

#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
async fn batch_process_loop(
producer_id: ProducerId,
mut batch_storage: BatchStorage,
mut msg_receiver: mpsc::Receiver<BatchItem>,
close_sender: oneshot::Sender<()>,
mut batch_sender: mpsc::Sender<Vec<BatchItem>>,
executor: impl Executor,
) {
Expand Down Expand Up @@ -1123,14 +1128,8 @@ async fn batch_process_loop(
}
}
} else {
debug!("producer {producer_id}'s batch_process_loop: channel closed, exiting");
info!("producer {producer_id}'s batch_process_loop: channel closed, exiting");
}
let _ = close_sender.send(()).inspect_err(|e| {
warn!(
"{producer_id} could not notify the batch_process_loop is closed: {:?}, the producer might be dropped without closing",
e
);
});
break;
}
Either::Right((_, previous_recv_future)) => {
Expand All @@ -1147,6 +1146,7 @@ async fn batch_process_loop(
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
async fn message_send_loop<Exe>(
mut msg_receiver: mpsc::Receiver<Vec<BatchItem>>,
close_sender: oneshot::Sender<Result<CommandSuccess, ConnectionError>>,
client: Pulsar<Exe>,
mut connection: Arc<Connection<Exe>>,
topic: String,
Expand Down Expand Up @@ -1242,6 +1242,13 @@ async fn message_send_loop<Exe>(
}
None => {
debug!("producer {producer_id} message_send_loop: channel closed, exiting");
let close_result = connection.sender().close_producer(producer_id).await;
let _ = close_sender.send(close_result).inspect_err(|e| {
warn!(
"{producer_id} could not notify the message_send_loop is closed: {:?}, the producer might be dropped without closing",
e
);
});
break;
}
}
Expand Down
Loading