Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ serde = { version = "^1.0.216", features = ["derive"] }
serde_json = "^1.0.133"
tokio = { version = "^1.42.0", features = ["macros", "rt-multi-thread"] }
assert_matches = "1.5.0"
reqwest = { version = "0.12.23", features = ["json"] }

[build-dependencies]
prost-build = "^0.13.4"
Expand Down
21 changes: 21 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ mod tests {
use assert_matches::assert_matches;
use futures::{future::try_join_all, StreamExt};
use log::{LevelFilter, Metadata, Record};
use serde_json::Value;
#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))]
use tokio::time::timeout;

Expand Down Expand Up @@ -649,8 +650,14 @@ mod tests {
// Flush 0 messages should be ok
producer.send_batch().await.unwrap();

assert!(!is_publishers_empty(&topic).await);

let send_futures = send_messages(&mut producer, vec!["3", "4"]).await;
producer.close().await.unwrap();
// After the CloseProducer RPC is done, the producer should be removed from the publishers
// list in topic stats
assert!(is_publishers_empty(&topic).await);

for send_future in send_futures {
let error = send_future.await.err().unwrap();
assert_matches!(error, PulsarError::Producer(ProducerError::Closed));
Expand Down Expand Up @@ -742,4 +749,18 @@ mod tests {
}
actual_values
}

async fn is_publishers_empty(topic: &String) -> bool {
let stats_url =
format!("http://127.0.0.1:8080/admin/v2/persistent/public/default/{topic}/stats");
let response = reqwest::get(stats_url).await.unwrap();
assert!(response.status().is_success());
let json_value: Value = response.json().await.unwrap();
if let Some(publishers) = json_value.get("publishers") {
let publishers = publishers.as_array().unwrap();
publishers.is_empty()
} else {
panic!("No publishers in the stats");
}
}
}
31 changes: 19 additions & 12 deletions src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::{
proto::{self, CommandSendReceipt, EncryptionKeys, Schema},
BatchedMessage,
},
proto::CommandSuccess,
retry_op::retry_create_producer,
BrokerAddress, Error, Pulsar,
};
Expand Down Expand Up @@ -603,18 +604,19 @@ impl<Exe: Executor> TopicProducer<Exe> {
let (msg_sender, msg_receiver) = mpsc::channel::<BatchItem>(10);
let executor_clone = executor.clone();
let (batch_sender, batch_receiver) = mpsc::channel::<Vec<BatchItem>>(1);
let (close_sender, close_receiver) = oneshot::channel::<()>();
let (close_sender, close_receiver) =
oneshot::channel::<Result<CommandSuccess, ConnectionError>>();

let _ = executor.spawn(Box::pin(batch_process_loop(
producer_id,
batch_storage,
msg_receiver,
close_sender,
batch_sender,
executor_clone,
)));
let _ = executor.spawn(Box::pin(message_send_loop(
batch_receiver,
close_sender,
client.clone(),
connection.clone(),
topic.clone(),
Expand Down Expand Up @@ -743,7 +745,11 @@ impl<Exe: Executor> TopicProducer<Exe> {
Some(mut batch) if self.options.enabled_batching() => {
batch.msg_sender.close_channel();
let close_receiver = &mut batch.close_receiver;
let _ = close_receiver.await;
return match close_receiver.await {
Ok(Ok(_)) => Ok(()),
Ok(Err(e)) => Err(Error::Producer(ProducerError::Connection(e))),
Err(_) => Err(Error::Producer(ProducerError::Closed)),
};
}
_ => {
warn!(
Expand Down Expand Up @@ -1072,15 +1078,14 @@ struct Batch {
// sends a message or trigger a flush
msg_sender: mpsc::Sender<BatchItem>,
// receives the notification when `bath_process_loop` is closed
close_receiver: oneshot::Receiver<()>,
close_receiver: oneshot::Receiver<Result<CommandSuccess, ConnectionError>>,
}

#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
async fn batch_process_loop(
producer_id: ProducerId,
mut batch_storage: BatchStorage,
mut msg_receiver: mpsc::Receiver<BatchItem>,
close_sender: oneshot::Sender<()>,
mut batch_sender: mpsc::Sender<Vec<BatchItem>>,
executor: impl Executor,
) {
Expand Down Expand Up @@ -1123,14 +1128,8 @@ async fn batch_process_loop(
}
}
} else {
debug!("producer {producer_id}'s batch_process_loop: channel closed, exiting");
info!("producer {producer_id}'s batch_process_loop: channel closed, exiting");
}
let _ = close_sender.send(()).inspect_err(|e| {
warn!(
"{producer_id} could not notify the batch_process_loop is closed: {:?}, the producer might be dropped without closing",
e
);
});
break;
}
Either::Right((_, previous_recv_future)) => {
Expand All @@ -1147,6 +1146,7 @@ async fn batch_process_loop(
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
async fn message_send_loop<Exe>(
mut msg_receiver: mpsc::Receiver<Vec<BatchItem>>,
close_sender: oneshot::Sender<Result<CommandSuccess, ConnectionError>>,
client: Pulsar<Exe>,
mut connection: Arc<Connection<Exe>>,
topic: String,
Expand Down Expand Up @@ -1242,6 +1242,13 @@ async fn message_send_loop<Exe>(
}
None => {
debug!("producer {producer_id} message_send_loop: channel closed, exiting");
let close_result = connection.sender().close_producer(producer_id).await;
let _ = close_sender.send(close_result).inspect_err(|e| {
warn!(
"{producer_id} could not notify the batch_process_loop is closed: {:?}, the producer might be dropped without closing",
e
);
});
break;
}
}
Expand Down
Loading