Skip to content

Commit 51c7663

Browse files
authored
[ISSUE #6496]♻️Refactor message filter usage to utilize ArcMessageFilter for improved type safety and clarity (#6497)
1 parent 2e3661b commit 51c7663

File tree

11 files changed

+40
-36
lines changed

11 files changed

+40
-36
lines changed

rocketmq-broker/src/coldctr/cold_data_pull_request_hold_service.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,12 @@
1313
// limitations under the License.
1414

1515
use std::collections::VecDeque;
16-
use std::sync::Arc;
1716

1817
use parking_lot::Mutex;
1918
use rocketmq_remoting::net::channel::Channel;
2019
use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData;
2120
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
22-
use rocketmq_store::filter::MessageFilter;
21+
use rocketmq_store::filter::ArcMessageFilter;
2322
use tracing::warn;
2423

2524
pub const NO_SUSPEND_KEY: &str = "_noSuspend_";
@@ -33,7 +32,7 @@ pub struct ColdDataPullRequest {
3332
timeout_millis: u64,
3433
queue_offset: i64,
3534
subscription_data: SubscriptionData,
36-
message_filter: Arc<Box<dyn MessageFilter>>,
35+
message_filter: ArcMessageFilter,
3736
}
3837

3938
impl ColdDataPullRequest {
@@ -44,7 +43,7 @@ impl ColdDataPullRequest {
4443
suspend_timestamp: u64,
4544
queue_offset: i64,
4645
subscription_data: SubscriptionData,
47-
message_filter: Arc<Box<dyn MessageFilter>>,
46+
message_filter: ArcMessageFilter,
4847
) -> Self {
4948
Self {
5049
request,

rocketmq-broker/src/long_polling/long_polling_service/pop_long_polling_service.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use rocketmq_remoting::runtime::processor::RequestProcessor;
3232
use rocketmq_rust::ArcMut;
3333
use rocketmq_store::base::message_store::MessageStore;
3434
use rocketmq_store::consume_queue::cq_ext_unit::CqExtUnit;
35-
use rocketmq_store::filter::MessageFilter;
35+
use rocketmq_store::filter::ArcMessageFilter;
3636
use tokio::select;
3737
use tokio::sync::Notify;
3838
use tracing::error;
@@ -375,7 +375,7 @@ impl<MS: MessageStore, RP: RequestProcessor + Sync + 'static> PopLongPollingServ
375375
remoting_command: &mut RemotingCommand,
376376
request_header: PollingHeader,
377377
subscription_data: Option<SubscriptionData>,
378-
message_filter: Option<Arc<Box<dyn MessageFilter>>>,
378+
message_filter: Option<ArcMessageFilter>,
379379
) -> PollingResult {
380380
//this method may be need to optimize
381381
if request_header.get_poll_time() <= 0 {

rocketmq-broker/src/long_polling/pop_request.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use rocketmq_remoting::net::channel::Channel;
2323
use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData;
2424
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
2525
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
26-
use rocketmq_store::filter::MessageFilter;
26+
use rocketmq_store::filter::ArcMessageFilter;
2727

2828
pub struct PopRequest {
2929
remoting_command: RemotingCommand,
@@ -32,7 +32,7 @@ pub struct PopRequest {
3232
op: i64,
3333
expired: u64,
3434
subscription_data: Option<SubscriptionData>,
35-
message_filter: Option<Arc<Box<dyn MessageFilter>>>,
35+
message_filter: Option<ArcMessageFilter>,
3636
}
3737

3838
impl PopRequest {
@@ -41,7 +41,7 @@ impl PopRequest {
4141
ctx: ConnectionHandlerContext,
4242
expired: u64,
4343
subscription_data: Option<SubscriptionData>,
44-
message_filter: Option<Arc<Box<dyn MessageFilter>>>,
44+
message_filter: Option<ArcMessageFilter>,
4545
) -> Self {
4646
static COUNTER: AtomicI64 = AtomicI64::new(i64::MIN);
4747
let op = COUNTER.fetch_add(1, Ordering::SeqCst);
@@ -96,7 +96,7 @@ impl PopRequest {
9696
self.subscription_data.as_ref()
9797
}
9898

99-
pub fn get_message_filter(&self) -> Option<&Arc<Box<dyn MessageFilter>>> {
99+
pub fn get_message_filter(&self) -> Option<&ArcMessageFilter> {
100100
self.message_filter.as_ref()
101101
}
102102
}

rocketmq-broker/src/long_polling/pull_request.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,11 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::sync::Arc;
16-
1715
use rocketmq_remoting::net::channel::Channel;
1816
use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData;
1917
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
2018
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
21-
use rocketmq_store::filter::MessageFilter;
19+
use rocketmq_store::filter::ArcMessageFilter;
2220

2321
#[derive(Clone)]
2422
pub struct PullRequest {
@@ -29,7 +27,7 @@ pub struct PullRequest {
2927
suspend_timestamp: u64,
3028
pull_from_this_offset: i64,
3129
subscription_data: SubscriptionData,
32-
message_filter: Arc<Box<dyn MessageFilter>>,
30+
message_filter: ArcMessageFilter,
3331
}
3432

3533
impl PullRequest {
@@ -41,7 +39,7 @@ impl PullRequest {
4139
suspend_timestamp: u64,
4240
pull_from_this_offset: i64,
4341
subscription_data: SubscriptionData,
44-
message_filter: Arc<Box<dyn MessageFilter>>,
42+
message_filter: ArcMessageFilter,
4543
) -> Self {
4644
Self {
4745
request_command,
@@ -75,7 +73,7 @@ impl PullRequest {
7573
&self.subscription_data
7674
}
7775

78-
pub fn message_filter(&self) -> Arc<Box<dyn MessageFilter>> {
76+
pub fn message_filter(&self) -> ArcMessageFilter {
7977
self.message_filter.clone()
8078
}
8179

rocketmq-broker/src/processor/default_pull_message_result_handler.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use rocketmq_rust::ArcMut;
4343
use rocketmq_store::base::get_message_result::GetMessageResult;
4444
use rocketmq_store::base::message_status_enum::GetMessageStatus;
4545
use rocketmq_store::base::message_store::MessageStore;
46-
use rocketmq_store::filter::MessageFilter;
46+
use rocketmq_store::filter::ArcMessageFilter;
4747
use rocketmq_store::message_store::local_file_message_store::LocalFileMessageStore;
4848
use rocketmq_store::stats::broker_stats_manager::BrokerStatsManager;
4949
use rocketmq_store::stats::stats_type::StatsType;
@@ -95,7 +95,7 @@ impl<MS: MessageStore> PullMessageResultHandler for DefaultPullMessageResultHand
9595
subscription_data: SubscriptionData,
9696
subscription_group_config: &SubscriptionGroupConfig,
9797
broker_allow_suspend: bool,
98-
message_filter: Arc<Box<dyn MessageFilter>>,
98+
message_filter: ArcMessageFilter,
9999
mut response: RemotingCommand,
100100
mut mapping_context: TopicQueueMappingContext,
101101
_begin_time_mills: u64,

rocketmq-broker/src/processor/pop_message_processor.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ use rocketmq_store::base::get_message_result::GetMessageResult;
5959
use rocketmq_store::base::message_status_enum::GetMessageStatus;
6060
use rocketmq_store::base::message_store::MessageStore;
6161
use rocketmq_store::base::select_result::SelectMappedBufferResult;
62-
use rocketmq_store::filter::MessageFilter;
62+
use rocketmq_store::filter::ArcMessageFilter;
6363
use rocketmq_store::pop::batch_ack_msg::BatchAckMsg;
6464
use rocketmq_store::pop::pop_check_point::PopCheckPoint;
6565
use rocketmq_store::pop::AckMessage;
@@ -365,7 +365,7 @@ where
365365
)));
366366
}
367367
let consumer_filter_data = consumer_filter_data.unwrap();
368-
let message_filter: Box<dyn MessageFilter> = Box::new(ExpressionMessageFilter::new(
368+
let message_filter: ArcMessageFilter = Arc::new(ExpressionMessageFilter::new(
369369
Some(subscription_data.clone()),
370370
Some(consumer_filter_data),
371371
Arc::new(self.broker_runtime_inner.consumer_filter_manager().clone()),
@@ -458,7 +458,6 @@ where
458458
};
459459
let pop_time = get_current_millis();
460460

461-
let message_filter = message_filter.map(Arc::new);
462461
let mut rest_num = 0; // remaining number of messages to be fetched
463462
if need_retry && !request_header.order.unwrap_or(false) {
464463
rest_num = if need_retry_v1 {
@@ -701,7 +700,7 @@ where
701700
revive_qid: i32,
702701
channel: Channel,
703702
pop_time: u64,
704-
message_filter: Option<Arc<Box<dyn MessageFilter>>>,
703+
message_filter: Option<ArcMessageFilter>,
705704
start_offset_info: &mut String,
706705
msg_offset_info: &mut String,
707706
order_count_info: &mut str,
@@ -741,7 +740,7 @@ where
741740
revive_qid: i32,
742741
channel: Channel,
743742
pop_time: u64,
744-
message_filter: Option<Arc<Box<dyn MessageFilter>>>,
743+
message_filter: Option<ArcMessageFilter>,
745744
start_offset_info: &mut String,
746745
msg_offset_info: &mut String,
747746
order_count_info: &mut str,
@@ -785,7 +784,7 @@ where
785784
revive_qid: i32,
786785
channel: Channel,
787786
pop_time: u64,
788-
message_filter: Option<Arc<Box<dyn MessageFilter>>>,
787+
message_filter: Option<ArcMessageFilter>,
789788
start_offset_info: &mut String,
790789
msg_offset_info: &mut String,
791790
order_count_info: &mut str,

rocketmq-broker/src/processor/pull_message_processor.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use rocketmq_rust::ArcMut;
4848
use rocketmq_store::base::get_message_result::GetMessageResult;
4949
use rocketmq_store::base::message_status_enum::GetMessageStatus;
5050
use rocketmq_store::base::message_store::MessageStore;
51-
use rocketmq_store::filter::MessageFilter;
51+
use rocketmq_store::filter::ArcMessageFilter;
5252
use tokio::sync::Mutex;
5353
use tracing::error;
5454
use tracing::info;
@@ -465,16 +465,16 @@ where
465465
&self,
466466
subscription_data: &rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData,
467467
consumer_filter_data: Option<ConsumerFilterData>,
468-
) -> Arc<Box<dyn MessageFilter>> {
468+
) -> ArcMessageFilter {
469469
// TODO: Consider optimizing consumer_filter_manager clone - Arc wrapper might be better
470470
if self.broker_runtime_inner.broker_config().filter_support_retry {
471-
Arc::new(Box::new(ExpressionForRetryMessageFilter))
471+
Arc::new(ExpressionForRetryMessageFilter)
472472
} else {
473-
Arc::new(Box::new(ExpressionMessageFilter::new(
473+
Arc::new(ExpressionMessageFilter::new(
474474
Some(subscription_data.clone()),
475475
consumer_filter_data,
476476
Arc::new(self.broker_runtime_inner.consumer_filter_manager().clone()),
477-
)))
477+
))
478478
}
479479
}
480480
}

rocketmq-broker/src/processor/pull_message_result_handler.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
// limitations under the License.
1414

1515
use std::any::Any;
16-
use std::sync::Arc;
1716

1817
use rocketmq_remoting::net::channel::Channel;
1918
use rocketmq_remoting::protocol::header::pull_message_request_header::PullMessageRequestHeader;
@@ -23,7 +22,7 @@ use rocketmq_remoting::protocol::static_topic::topic_queue_mapping_context::Topi
2322
use rocketmq_remoting::protocol::subscription::subscription_group_config::SubscriptionGroupConfig;
2423
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
2524
use rocketmq_store::base::get_message_result::GetMessageResult;
26-
use rocketmq_store::filter::MessageFilter;
25+
use rocketmq_store::filter::ArcMessageFilter;
2726

2827
/// Trait defining the behavior for handling the result of a pull message request.
2928
///
@@ -66,7 +65,7 @@ pub trait PullMessageResultHandler: Sync + Send + Any + 'static {
6665
subscription_data: SubscriptionData,
6766
subscription_group_config: &SubscriptionGroupConfig,
6867
broker_allow_suspend: bool,
69-
message_filter: Arc<Box<dyn MessageFilter>>,
68+
message_filter: ArcMessageFilter,
7069
response: RemotingCommand,
7170
mapping_context: TopicQueueMappingContext,
7271
begin_time_mills: u64,

rocketmq-store/src/base/message_store.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use crate::base::store_checkpoint::StoreCheckpoint;
4141
use crate::base::store_stats_service::StoreStatsService;
4242
use crate::base::transient_store_pool::TransientStorePool;
4343
use crate::config::message_store_config::MessageStoreConfig;
44+
use crate::filter::ArcMessageFilter;
4445
use crate::filter::MessageFilter;
4546
use crate::ha::general_ha_service::GeneralHAService;
4647
use crate::hook::put_message_hook::BoxedPutMessageHook;
@@ -137,7 +138,7 @@ pub trait MessageStoreInner: Sync + 'static {
137138
queue_id: i32,
138139
offset: i64,
139140
max_msg_nums: i32,
140-
message_filter: Option<Arc<Box<dyn MessageFilter>>>,
141+
message_filter: Option<ArcMessageFilter>,
141142
) -> Option<GetMessageResult>;
142143

143144
/* /// Asynchronous get message
@@ -160,7 +161,7 @@ pub trait MessageStoreInner: Sync + 'static {
160161
offset: i64,
161162
max_msg_nums: i32,
162163
max_total_msg_size: i32,
163-
message_filter: Option<Arc<Box<dyn MessageFilter>>>,
164+
message_filter: Option<ArcMessageFilter>,
164165
) -> Option<GetMessageResult>;
165166

166167
/* /// Asynchronous get message with size constraint

rocketmq-store/src/filter.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::collections::HashMap;
16+
use std::sync::Arc;
1617

1718
use cheetah_string::CheetahString;
1819

@@ -48,3 +49,9 @@ pub trait MessageFilter: Send + Sync {
4849
properties: Option<&HashMap<CheetahString, CheetahString>>,
4950
) -> bool;
5051
}
52+
53+
/// Type alias for an atomically reference-counted message filter.
54+
///
55+
/// This is the standard way to share a [`MessageFilter`] implementation across multiple
56+
/// threads or components. The `Arc` provides thread-safe reference counting.
57+
pub type ArcMessageFilter = Arc<dyn MessageFilter>;

0 commit comments

Comments
 (0)