Skip to content

Commit 714ebe6

Browse files
authored
[ISSUE #6502]♻️Refactor transaction listener usage to utilize ArcTransactionListener for improved type safety and clarity (#6503)
1 parent 33efdf4 commit 714ebe6

File tree

6 files changed

+313
-60
lines changed

6 files changed

+313
-60
lines changed

rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs

Lines changed: 54 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ use crate::producer::request_response_future::RequestResponseFuture;
8787
use crate::producer::send_callback::SendMessageCallback;
8888
use crate::producer::send_result::SendResult;
8989
use crate::producer::send_status::SendStatus;
90-
use crate::producer::transaction_listener::TransactionListener;
90+
use crate::producer::transaction_listener::ArcTransactionListener;
9191
use crate::producer::transaction_send_result::TransactionSendResult;
9292
use tokio::task::JoinHandle;
9393

@@ -266,7 +266,7 @@ pub struct DefaultMQProducerImpl {
266266
semaphore_async_send_num: Arc<Semaphore>,
267267
semaphore_async_send_size: Arc<Semaphore>,
268268
default_mqproducer_impl_inner: Option<WeakArcMut<DefaultMQProducerImpl>>,
269-
transaction_listener: Option<Arc<Box<dyn TransactionListener>>>,
269+
transaction_listener: Option<ArcTransactionListener>,
270270
}
271271

272272
#[allow(unused_must_use)]
@@ -2190,7 +2190,7 @@ impl DefaultMQProducerImpl {
21902190
self.default_mqproducer_impl_inner = Some(default_mqproducer_impl_inner);
21912191
}
21922192

2193-
pub fn set_transaction_listener(&mut self, transaction_listener: Arc<Box<dyn TransactionListener>>) {
2193+
pub fn set_transaction_listener(&mut self, transaction_listener: ArcTransactionListener) {
21942194
self.transaction_listener = Some(transaction_listener);
21952195
}
21962196
}
@@ -2210,7 +2210,7 @@ impl MQProducerInner for DefaultMQProducerImpl {
22102210
true
22112211
}
22122212

2213-
fn get_check_listener(&self) -> Option<Arc<Box<dyn TransactionListener>>> {
2213+
fn get_check_listener(&self) -> Option<ArcTransactionListener> {
22142214
self.transaction_listener.clone()
22152215
}
22162216

@@ -2235,16 +2235,17 @@ impl MQProducerInner for DefaultMQProducerImpl {
22352235
let broker_addr = broker_addr.clone();
22362236
let group = self.producer_config.producer_group().clone();
22372237

2238-
// Spawn independent task without storing handle (matches Java's executor.submit behavior)
2239-
tokio::spawn(async move {
2238+
// Use spawn_blocking to avoid blocking Tokio worker threads (matches Java's ExecutorService
2239+
// behavior)
2240+
tokio::task::spawn_blocking(move || {
22402241
let mut unique_key = msg.property(&CheetahString::from_static_str(
22412242
MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
22422243
));
22432244
if unique_key.is_none() {
22442245
unique_key = Some(msg.msg_id.clone());
22452246
}
22462247

2247-
// Check local transaction state with exception handling
2248+
// Check local transaction state with exception handling (synchronous execution)
22482249
let transaction_state = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
22492250
transaction_listener.check_local_transaction(&msg)
22502251
})) {
@@ -2258,48 +2259,53 @@ impl MQProducerInner for DefaultMQProducerImpl {
22582259
LocalTransactionState::Unknown
22592260
}
22602261
};
2261-
let request_header = EndTransactionRequestHeader {
2262-
topic: check_request_header.topic.clone().unwrap_or_default(),
2263-
producer_group: CheetahString::from_string(
2264-
producer_impl_inner.producer_config.producer_group().to_string(),
2265-
),
2266-
tran_state_table_offset: check_request_header.commit_log_offset as u64,
2267-
commit_log_offset: check_request_header.commit_log_offset as u64,
2268-
commit_or_rollback: match transaction_state {
2269-
LocalTransactionState::CommitMessage => MessageSysFlag::TRANSACTION_COMMIT_TYPE,
2270-
LocalTransactionState::RollbackMessage => MessageSysFlag::TRANSACTION_ROLLBACK_TYPE,
2271-
LocalTransactionState::Unknown => MessageSysFlag::TRANSACTION_NOT_TYPE,
2272-
},
2273-
from_transaction_check: true,
2274-
msg_id: unique_key.clone().unwrap_or_default(),
2275-
transaction_id: check_request_header.transaction_id.clone(),
2276-
rpc_request_header: RpcRequestHeader {
2277-
broker_name: check_request_header.rpc_request_header.unwrap_or_default().broker_name,
2278-
..Default::default()
2279-
},
2280-
};
2281-
// Execute end transaction hook
2282-
producer_impl_inner.do_execute_end_transaction_hook(
2283-
&msg.message,
2284-
unique_key.as_ref().unwrap(),
2285-
&broker_addr,
2286-
transaction_state,
2287-
true,
2288-
);
22892262

2290-
// Send end transaction request with error handling
2291-
if let Err(e) = producer_impl_inner
2292-
.client_instance
2293-
.as_mut()
2294-
.unwrap()
2295-
.mq_client_api_impl
2296-
.as_mut()
2297-
.unwrap()
2298-
.end_transaction_oneway(&broker_addr, request_header, CheetahString::from_static_str(""), 3000)
2299-
.await
2300-
{
2301-
tracing::error!("endTransactionOneway exception: {:?}", e);
2302-
}
2263+
// Switch back to async context for network I/O
2264+
let handle = tokio::runtime::Handle::current();
2265+
handle.spawn(async move {
2266+
let request_header = EndTransactionRequestHeader {
2267+
topic: check_request_header.topic.clone().unwrap_or_default(),
2268+
producer_group: CheetahString::from_string(
2269+
producer_impl_inner.producer_config.producer_group().to_string(),
2270+
),
2271+
tran_state_table_offset: check_request_header.commit_log_offset as u64,
2272+
commit_log_offset: check_request_header.commit_log_offset as u64,
2273+
commit_or_rollback: match transaction_state {
2274+
LocalTransactionState::CommitMessage => MessageSysFlag::TRANSACTION_COMMIT_TYPE,
2275+
LocalTransactionState::RollbackMessage => MessageSysFlag::TRANSACTION_ROLLBACK_TYPE,
2276+
LocalTransactionState::Unknown => MessageSysFlag::TRANSACTION_NOT_TYPE,
2277+
},
2278+
from_transaction_check: true,
2279+
msg_id: unique_key.clone().unwrap_or_default(),
2280+
transaction_id: check_request_header.transaction_id.clone(),
2281+
rpc_request_header: RpcRequestHeader {
2282+
broker_name: check_request_header.rpc_request_header.unwrap_or_default().broker_name,
2283+
..Default::default()
2284+
},
2285+
};
2286+
// Execute end transaction hook
2287+
producer_impl_inner.do_execute_end_transaction_hook(
2288+
&msg.message,
2289+
unique_key.as_ref().unwrap(),
2290+
&broker_addr,
2291+
transaction_state,
2292+
true,
2293+
);
2294+
2295+
// Send end transaction request with error handling
2296+
if let Err(e) = producer_impl_inner
2297+
.client_instance
2298+
.as_mut()
2299+
.unwrap()
2300+
.mq_client_api_impl
2301+
.as_mut()
2302+
.unwrap()
2303+
.end_transaction_oneway(&broker_addr, request_header, CheetahString::from_static_str(""), 3000)
2304+
.await
2305+
{
2306+
tracing::error!("endTransactionOneway exception: {:?}", e);
2307+
}
2308+
});
23032309
});
23042310
}
23052311

rocketmq-client/src/producer/producer_impl/mq_producer_inner.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
// limitations under the License.
1414

1515
use std::collections::HashSet;
16-
use std::sync::Arc;
1716

1817
use cheetah_string::CheetahString;
1918
use rocketmq_common::common::message::message_ext::MessageExt;
@@ -22,14 +21,14 @@ use rocketmq_rust::ArcMut;
2221

2322
use crate::producer::producer_impl::default_mq_producer_impl::DefaultMQProducerImpl;
2423
use crate::producer::producer_impl::topic_publish_info::TopicPublishInfo;
25-
use crate::producer::transaction_listener::TransactionListener;
24+
use crate::producer::transaction_listener::ArcTransactionListener;
2625

2726
pub trait MQProducerInner: Send + Sync + 'static {
2827
fn get_publish_topic_list(&self) -> HashSet<CheetahString>;
2928

3029
fn is_publish_topic_need_update(&self, topic: &CheetahString) -> bool;
3130

32-
fn get_check_listener(&self) -> Option<Arc<Box<dyn TransactionListener>>>;
31+
fn get_check_listener(&self) -> Option<ArcTransactionListener>;
3332

3433
fn check_transaction_state(
3534
&self,
@@ -63,7 +62,7 @@ impl MQProducerInnerImpl {
6362
false
6463
}
6564

66-
pub fn get_check_listener(&self) -> Option<Arc<Box<dyn TransactionListener>>> {
65+
pub fn get_check_listener(&self) -> Option<ArcTransactionListener> {
6766
if let Some(default_mqproducer_impl_inner) = &self.default_mqproducer_impl_inner {
6867
return default_mqproducer_impl_inner.get_check_listener();
6968
}

rocketmq-client/src/producer/transaction_listener.rs

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,130 @@
1313
// limitations under the License.
1414

1515
use std::any::Any;
16+
use std::sync::Arc;
1617

1718
use rocketmq_common::common::message::message_ext::MessageExt;
1819
use rocketmq_common::common::message::MessageTrait;
1920

2021
use crate::producer::local_transaction_state::LocalTransactionState;
2122

23+
/// Listener for handling transactional message operations.
24+
///
25+
/// This trait defines the callback interface for transactional message processing,
26+
/// allowing applications to integrate local transaction execution with distributed
27+
/// message transactions.
28+
///
29+
/// Implementations must be thread-safe as they may be invoked concurrently from
30+
/// different threads or async tasks.
31+
///
32+
/// # Examples
33+
///
34+
/// ```ignore
35+
/// use rocketmq_client_rust::producer::transaction_listener::TransactionListener;
36+
/// use rocketmq_client_rust::producer::local_transaction_state::LocalTransactionState;
37+
/// use rocketmq_common::common::message::MessageTrait;
38+
/// use rocketmq_common::common::message::message_ext::MessageExt;
39+
/// use std::any::Any;
40+
///
41+
/// struct OrderTransactionListener;
42+
///
43+
/// impl TransactionListener for OrderTransactionListener {
44+
/// fn execute_local_transaction(
45+
/// &self,
46+
/// msg: &dyn MessageTrait,
47+
/// arg: Option<&(dyn Any + Send + Sync)>,
48+
/// ) -> LocalTransactionState {
49+
/// // Execute local database transaction
50+
/// if insert_order_into_db(msg).is_ok() {
51+
/// LocalTransactionState::CommitMessage
52+
/// } else {
53+
/// LocalTransactionState::RollbackMessage
54+
/// }
55+
/// }
56+
///
57+
/// fn check_local_transaction(&self, msg: &MessageExt) -> LocalTransactionState {
58+
/// // Check transaction status from database
59+
/// if order_exists_in_db(msg) {
60+
/// LocalTransactionState::CommitMessage
61+
/// } else {
62+
/// LocalTransactionState::Unknown
63+
/// }
64+
/// }
65+
/// }
66+
/// ```
2267
pub trait TransactionListener: Send + Sync + 'static {
68+
/// Executes the local transaction when sending a transactional message.
69+
///
70+
/// This method is invoked after the half message is successfully sent to the broker.
71+
/// The implementation should execute the local business transaction and return
72+
/// the transaction state to determine whether the message should be committed or rolled back.
73+
///
74+
/// # Parameters
75+
///
76+
/// * `msg` - The message being sent
77+
/// * `arg` - Optional user-defined argument passed from the send operation
78+
///
79+
/// # Returns
80+
///
81+
/// The local transaction state indicating whether to commit, rollback, or defer the decision:
82+
/// - `CommitMessage` - Commit the transaction and make the message visible to consumers
83+
/// - `RollbackMessage` - Roll back the transaction and discard the message
84+
/// - `Unknown` - Transaction state is uncertain, broker will check later
2385
fn execute_local_transaction(
2486
&self,
2587
msg: &dyn MessageTrait,
2688
arg: Option<&(dyn Any + Send + Sync)>,
2789
) -> LocalTransactionState;
2890

91+
/// Checks the status of a previously executed local transaction.
92+
///
93+
/// This method is invoked by the broker when it needs to verify the state of a
94+
/// transaction whose initial state was `Unknown` or when the transaction check
95+
/// timeout is reached.
96+
///
97+
/// The implementation should query the local transaction state (e.g., from a database)
98+
/// and return the current status.
99+
///
100+
/// # Parameters
101+
///
102+
/// * `msg` - The message whose transaction status needs to be checked
103+
///
104+
/// # Returns
105+
///
106+
/// The current state of the local transaction:
107+
/// - `CommitMessage` - The local transaction was committed successfully
108+
/// - `RollbackMessage` - The local transaction failed or was rolled back
109+
/// - `Unknown` - Transaction state cannot be determined at this time
29110
fn check_local_transaction(&self, msg: &MessageExt) -> LocalTransactionState;
30111
}
112+
113+
/// Thread-safe shared reference to a [`TransactionListener`].
114+
///
115+
/// This type alias provides a convenient way to share transaction listeners across
116+
/// threads using atomic reference counting. It uses `Arc<dyn TransactionListener>`
117+
/// instead of `Arc<Box<dyn TransactionListener>>` to avoid double heap allocation
118+
/// and minimize pointer indirection overhead.
119+
///
120+
/// # Performance
121+
///
122+
/// Using this alias instead of `Arc<Box<dyn TransactionListener>>` provides:
123+
/// - One fewer heap allocation per instance
124+
/// - One fewer pointer dereference per method call
125+
/// - Reduced memory overhead (saves approximately 16 bytes per instance)
126+
///
127+
/// # Examples
128+
///
129+
/// ```ignore
130+
/// use rocketmq_client_rust::producer::transaction_listener::{TransactionListener, ArcTransactionListener};
131+
/// use std::sync::Arc;
132+
///
133+
/// struct MyListener;
134+
/// impl TransactionListener for MyListener { /* ... */ }
135+
///
136+
/// // Create a shared reference
137+
/// let listener: ArcTransactionListener = Arc::new(MyListener);
138+
///
139+
/// // Clone for sharing across threads
140+
/// let listener_clone = listener.clone();
141+
/// ```
142+
pub type ArcTransactionListener = Arc<dyn TransactionListener>;

rocketmq-client/src/producer/transaction_mq_produce_builder.rs

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use crate::base::client_config::ClientConfig;
2525
use crate::producer::default_mq_producer::DefaultMQProducer;
2626
use crate::producer::produce_accumulator::ProduceAccumulator;
2727
use crate::producer::producer_impl::default_mq_producer_impl::DefaultMQProducerImpl;
28+
use crate::producer::transaction_listener::ArcTransactionListener;
2829
use crate::producer::transaction_listener::TransactionListener;
2930
use crate::producer::transaction_mq_producer::TransactionMQProducer;
3031
use crate::producer::transaction_mq_producer::TransactionProducerConfig;
@@ -55,7 +56,10 @@ pub struct TransactionMQProducerBuilder {
5556
compress_level: Option<i32>,
5657
compress_type: Option<CompressionType>,
5758
compressor: Option<&'static (dyn Compressor + Send + Sync)>,
58-
transaction_listener: Option<Arc<Box<dyn TransactionListener>>>,
59+
transaction_listener: Option<ArcTransactionListener>,
60+
check_thread_pool_min_size: Option<u32>,
61+
check_thread_pool_max_size: Option<u32>,
62+
check_request_hold_max: Option<u32>,
5963
check_runtime: Option<Arc<RocketMQRuntime>>,
6064
}
6165

@@ -86,6 +90,9 @@ impl TransactionMQProducerBuilder {
8690
compress_type: None,
8791
compressor: None,
8892
transaction_listener: None,
93+
check_thread_pool_min_size: None,
94+
check_thread_pool_max_size: None,
95+
check_request_hold_max: None,
8996
check_runtime: None,
9097
}
9198
}
@@ -216,7 +223,33 @@ impl TransactionMQProducerBuilder {
216223
}
217224

218225
pub fn transaction_listener(mut self, transaction_listener: impl TransactionListener) -> Self {
219-
self.transaction_listener = Some(Arc::new(Box::new(transaction_listener)));
226+
self.transaction_listener = Some(Arc::new(transaction_listener));
227+
self
228+
}
229+
230+
/// Set maximum size of transaction check thread pool
231+
///
232+
/// Note: When using default Tokio Runtime with spawn_blocking, this serves as a reference.
233+
/// To control actual thread count, configure the Runtime:
234+
/// ```ignore
235+
/// tokio::runtime::Builder::new_multi_thread()
236+
/// .max_blocking_threads(100)
237+
/// .build()
238+
/// ```
239+
pub fn check_thread_pool_max_size(mut self, size: u32) -> Self {
240+
self.check_thread_pool_max_size = Some(size);
241+
self
242+
}
243+
244+
/// Set minimum size of transaction check thread pool
245+
pub fn check_thread_pool_min_size(mut self, size: u32) -> Self {
246+
self.check_thread_pool_min_size = Some(size);
247+
self
248+
}
249+
250+
/// Set maximum capacity of transaction check request queue
251+
pub fn check_request_hold_max(mut self, size: u32) -> Self {
252+
self.check_request_hold_max = Some(size);
220253
self
221254
}
222255

@@ -304,9 +337,9 @@ impl TransactionMQProducerBuilder {
304337
}
305338
let transaction_producer_config = TransactionProducerConfig {
306339
transaction_listener: self.transaction_listener,
307-
check_thread_pool_min_size: 0,
308-
check_thread_pool_max_size: 0,
309-
check_request_hold_max: 0,
340+
check_thread_pool_min_size: self.check_thread_pool_min_size.unwrap_or(1),
341+
check_thread_pool_max_size: self.check_thread_pool_max_size.unwrap_or(1),
342+
check_request_hold_max: self.check_request_hold_max.unwrap_or(2000),
310343
};
311344
TransactionMQProducer::new(transaction_producer_config, mq_producer)
312345
}

0 commit comments

Comments
 (0)