Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 10 additions & 21 deletions rocketmq-broker/src/schedule/schedule_message_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
use rocketmq_store::base::message_status_enum::PutMessageStatus;
use rocketmq_store::base::message_store::MessageStore;
use rocketmq_store::store_path_config_helper::get_delay_offset_store_path;
use tokio::sync::oneshot;
use tokio::sync::Mutex;
use tracing::error;
use tracing::info;
Expand Down Expand Up @@ -786,7 +785,7 @@
.await?;

// Wait for the result
let result = result_process.get().await;
let result = result_process.get();

Check warning on line 788 in rocketmq-broker/src/schedule/schedule_message_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/schedule/schedule_message_service.rs#L788

Added line #L788 was not covered by tests
let send_status = result.put_message_status() == PutMessageStatus::PutOk;

if send_status {
Expand Down Expand Up @@ -897,9 +896,7 @@
delay_level: i32,
msg_id: CheetahString,
auto_resend: bool,
future: Option<oneshot::Receiver<PutMessageResult>>,
result_sender: Option<oneshot::Sender<PutMessageResult>>,

put_message_result: Option<PutMessageResult>,
resend_count: AtomicI32,
status: ArcMut<ProcessStatus>,
broker_controller: ArcMut<BrokerRuntimeInner<MS>>,
Expand All @@ -908,7 +905,6 @@
impl<MS: MessageStore> PutResultProcess<MS> {
/// Create a new PutResultProcess instance
pub fn new(broker_controller: ArcMut<BrokerRuntimeInner<MS>>) -> Self {
let (tx, rx) = oneshot::channel();
Self {
topic: CheetahString::empty(),
offset: 0,
Expand All @@ -917,8 +913,7 @@
delay_level: 0,
msg_id: CheetahString::empty(),
auto_resend: false,
future: Some(rx),
result_sender: Some(tx),
put_message_result: None,

Check warning on line 916 in rocketmq-broker/src/schedule/schedule_message_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/schedule/schedule_message_service.rs#L916

Added line #L916 was not covered by tests
resend_count: AtomicI32::new(0),
status: ArcMut::new(ProcessStatus::Running),
broker_controller,
Expand Down Expand Up @@ -968,10 +963,8 @@
}

/// Set the future for this process
pub fn set_future(mut self, put_result: PutMessageResult) -> Self {
if let Some(sender) = self.result_sender.take() {
let _ = sender.send(put_result);
}
pub fn set_put_message_result(mut self, put_result: PutMessageResult) -> Self {
self.put_message_result = Some(put_result);

Check warning on line 967 in rocketmq-broker/src/schedule/schedule_message_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/schedule/schedule_message_service.rs#L966-L967

Added lines #L966 - L967 were not covered by tests
self
}

Expand Down Expand Up @@ -1182,15 +1175,11 @@
*self.status.as_ref()
}

/// Get the result from the future
pub async fn get(&mut self) -> PutMessageResult {
if let Some(future) = self.future.take() {
match future.await {
Ok(result) => result,
Err(_) => PutMessageResult::new_default(PutMessageStatus::UnknownError),
}
} else {
PutMessageResult::new_default(PutMessageStatus::UnknownError)
/// Get the result
pub fn get(&mut self) -> PutMessageResult {
match self.put_message_result.take() {
None => PutMessageResult::new_default(PutMessageStatus::UnknownError),
Some(value) => value,

Check warning on line 1182 in rocketmq-broker/src/schedule/schedule_message_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/schedule/schedule_message_service.rs#L1179-L1182

Added lines #L1179 - L1182 were not covered by tests
}
}

Expand Down
Loading