Skip to content

Commit d686d3c

Browse files
authored
[ISSUE #2859]🚀Implement PutResultProcess#then_process method (#2860)
1 parent d76bfad commit d686d3c

File tree

1 file changed

+13
-8
lines changed

1 file changed

+13
-8
lines changed

rocketmq-broker/src/schedule/schedule_message_service.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -862,7 +862,8 @@ impl<MS: MessageStore> DeliverDelayedMessageTimerTask<MS> {
862862

863863
let topic = msg_inner.get_topic().clone();
864864
// Send the message asynchronously
865-
self.schedule_service
865+
let result = self
866+
.schedule_service
866867
.broker_controller
867868
.mut_from_ref()
868869
.escape_bridge_mut()
@@ -879,7 +880,7 @@ impl<MS: MessageStore> DeliverDelayedMessageTimerTask<MS> {
879880
.set_physic_size(size_py)
880881
.set_msg_id(msg_id.to_string())
881882
.set_auto_resend(auto_resend)
882-
// .set_future(rx)
883+
.set_put_message_result(result)
883884
.then_process()
884885
.await;
885886

@@ -1015,8 +1016,13 @@ impl<MS: MessageStore> PutResultProcess<MS> {
10151016

10161017
/// Handle the processing after completing the future
10171018
pub async fn then_process(self) -> Self {
1018-
/* // Create a clone of self that will be captured in the async closure
1019-
let this = Arc::new(self);
1019+
// Create a clone of self that will be captured in the async closure
1020+
if let Some(put_message_result) = &self.put_message_result {
1021+
self.handle_result(put_message_result);
1022+
}
1023+
1024+
self
1025+
/*let this = Arc::new(self);
10201026
let this_clone = Arc::clone(&this);
10211027
10221028
// Handle the future completion
@@ -1043,11 +1049,10 @@ impl<MS: MessageStore> PutResultProcess<MS> {
10431049
Ok(this) => this,
10441050
Err(_) => panic!("Failed to unwrap Arc in then_process"),
10451051
}*/
1046-
unimplemented!("then_process not implemented")
10471052
}
10481053

10491054
/// Handle the result of a put operation
1050-
fn handle_result(&self, result: PutMessageResult) {
1055+
fn handle_result(&self, result: &PutMessageResult) {
10511056
if result.put_message_status() == PutMessageStatus::PutOk {
10521057
self.on_success(result);
10531058
} else {
@@ -1056,7 +1061,7 @@ impl<MS: MessageStore> PutResultProcess<MS> {
10561061
}
10571062

10581063
/// Handle a successful put operation
1059-
pub fn on_success(&self, result: PutMessageResult) {
1064+
pub fn on_success(&self, result: &PutMessageResult) {
10601065
*self.status.mut_from_ref() = ProcessStatus::Success;
10611066

10621067
if self
@@ -1213,7 +1218,7 @@ impl<MS: MessageStore> PutResultProcess<MS> {
12131218
.put_message(msg_inner)
12141219
.await;
12151220

1216-
self.handle_result(result);
1221+
self.handle_result(&result);
12171222
}
12181223
None => {
12191224
warn!(

0 commit comments

Comments
 (0)