Skip to content

Commit 97d0c53

Browse files
authored
[ISSUE #3342]🚀Add some methods for TransactionalMessageBridge💫 (#3343)
1 parent 9159df1 commit 97d0c53

File tree

2 files changed

+29
-5
lines changed

2 files changed

+29
-5
lines changed

rocketmq-broker/src/transaction/queue/transactional_message_bridge.rs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,8 @@ where
163163
queue_id: i32,
164164
offset: i64,
165165
nums: i32,
166-
_sub: Option<SubscriptionData>,
166+
_sub: Option<SubscriptionData>, /* in Java version, this is not used, so we keep it as
167+
* Option */
167168
) -> Option<PullResult> {
168169
let get_message_result = self
169170
.broker_runtime_inner
@@ -176,7 +177,7 @@ where
176177
)
177178
.await;
178179

179-
if let Some(get_message_result) = get_message_result {
180+
if let Some(mut get_message_result) = get_message_result {
180181
let (pull_status, msg_found_list) = match get_message_result.status().unwrap() {
181182
GetMessageStatus::Found => {
182183
let msg_list = Self::decode_msg_list(&get_message_result);
@@ -194,6 +195,7 @@ where
194195

195196
GetMessageStatus::OffsetReset => (PullStatus::NoNewMsg, None),
196197
};
198+
get_message_result.release();
197199
Some(PullResult::new(
198200
pull_status,
199201
get_message_result.next_begin_offset() as u64,
@@ -256,9 +258,7 @@ where
256258
) -> PutMessageResult {
257259
Self::parse_half_message_inner(&mut message);
258260
self.broker_runtime_inner
259-
.message_store_mut()
260-
.as_mut()
261-
.unwrap()
261+
.message_store_unchecked_mut()
262262
.put_message(message)
263263
.await
264264
}
@@ -312,6 +312,9 @@ where
312312
message.set_topic(CheetahString::from_static_str(
313313
TransactionalMessageUtil::build_half_topic(),
314314
));
315+
316+
//TopicValidator::RMQ_SYS_TRANS_HALF_TOPIC topic is a special topic for half messages
317+
// write queue number is always 1, read queue number is always 1
315318
message.message_ext_inner.queue_id = 0;
316319
let properties_to_string =
317320
message_decoder::message_properties_to_string(message.get_properties());
@@ -432,6 +435,20 @@ where
432435
}
433436
result
434437
}
438+
439+
pub async fn put_message(&mut self, message_inner: MessageExtBrokerInner) -> bool {
440+
let result = self.put_message_return_result(message_inner).await;
441+
result.put_message_status() == PutMessageStatus::PutOk
442+
}
443+
444+
pub async fn escape_message(&mut self, message_inner: MessageExtBrokerInner) -> bool {
445+
let put_message_result = self
446+
.broker_runtime_inner
447+
.escape_bridge_mut()
448+
.put_message(message_inner)
449+
.await;
450+
put_message_result.is_ok()
451+
}
435452
}
436453

437454
#[inline]

rocketmq-store/src/base/get_message_result.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,13 @@ impl GetMessageResult {
293293
pub fn message_queue_offset(&self) -> &Vec<u64> {
294294
&self.message_queue_offset
295295
}
296+
297+
#[inline]
298+
pub fn release(&mut self) {
299+
for mapped_buffer in &mut self.message_mapped_list {
300+
mapped_buffer.release();
301+
}
302+
}
296303
}
297304

298305
#[cfg(test)]

0 commit comments

Comments
 (0)