diff --git a/rocketmq-broker/src/schedule/schedule_message_service.rs b/rocketmq-broker/src/schedule/schedule_message_service.rs index 7801e0a04..2209ad94e 100644 --- a/rocketmq-broker/src/schedule/schedule_message_service.rs +++ b/rocketmq-broker/src/schedule/schedule_message_service.rs @@ -50,7 +50,6 @@ use rocketmq_store::base::message_result::PutMessageResult; use rocketmq_store::base::message_status_enum::PutMessageStatus; use rocketmq_store::base::message_store::MessageStore; use rocketmq_store::store_path_config_helper::get_delay_offset_store_path; -use tokio::sync::oneshot; use tokio::sync::Mutex; use tracing::error; use tracing::info; @@ -786,7 +785,7 @@ impl DeliverDelayedMessageTimerTask { .await?; // Wait for the result - let result = result_process.get().await; + let result = result_process.get(); let send_status = result.put_message_status() == PutMessageStatus::PutOk; if send_status { @@ -897,9 +896,7 @@ pub struct PutResultProcess { delay_level: i32, msg_id: CheetahString, auto_resend: bool, - future: Option>, - result_sender: Option>, - + put_message_result: Option, resend_count: AtomicI32, status: ArcMut, broker_controller: ArcMut>, @@ -908,7 +905,6 @@ pub struct PutResultProcess { impl PutResultProcess { /// Create a new PutResultProcess instance pub fn new(broker_controller: ArcMut>) -> Self { - let (tx, rx) = oneshot::channel(); Self { topic: CheetahString::empty(), offset: 0, @@ -917,8 +913,7 @@ impl PutResultProcess { delay_level: 0, msg_id: CheetahString::empty(), auto_resend: false, - future: Some(rx), - result_sender: Some(tx), + put_message_result: None, resend_count: AtomicI32::new(0), status: ArcMut::new(ProcessStatus::Running), broker_controller, @@ -968,10 +963,8 @@ impl PutResultProcess { } /// Set the future for this process - pub fn set_future(mut self, put_result: PutMessageResult) -> Self { - if let Some(sender) = self.result_sender.take() { - let _ = sender.send(put_result); - } + pub fn set_put_message_result(mut self, put_result: PutMessageResult) -> Self { + self.put_message_result = Some(put_result); self } @@ -1182,15 +1175,11 @@ impl PutResultProcess { *self.status.as_ref() } - /// Get the result from the future - pub async fn get(&mut self) -> PutMessageResult { - if let Some(future) = self.future.take() { - match future.await { - Ok(result) => result, - Err(_) => PutMessageResult::new_default(PutMessageStatus::UnknownError), - } - } else { - PutMessageResult::new_default(PutMessageStatus::UnknownError) + /// Get the result + pub fn get(&mut self) -> PutMessageResult { + match self.put_message_result.take() { + None => PutMessageResult::new_default(PutMessageStatus::UnknownError), + Some(value) => value, } }