Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@
// limitations under the License.

use std::collections::VecDeque;
use std::sync::Arc;

use parking_lot::Mutex;
use rocketmq_remoting::net::channel::Channel;
use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData;
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
use rocketmq_store::filter::MessageFilter;
use rocketmq_store::filter::ArcMessageFilter;
use tracing::warn;

pub const NO_SUSPEND_KEY: &str = "_noSuspend_";
Expand All @@ -33,7 +32,7 @@ pub struct ColdDataPullRequest {
timeout_millis: u64,
queue_offset: i64,
subscription_data: SubscriptionData,
message_filter: Arc<Box<dyn MessageFilter>>,
message_filter: ArcMessageFilter,
}

impl ColdDataPullRequest {
Expand All @@ -44,7 +43,7 @@ impl ColdDataPullRequest {
suspend_timestamp: u64,
queue_offset: i64,
subscription_data: SubscriptionData,
message_filter: Arc<Box<dyn MessageFilter>>,
message_filter: ArcMessageFilter,
) -> Self {
Self {
request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use rocketmq_remoting::runtime::processor::RequestProcessor;
use rocketmq_rust::ArcMut;
use rocketmq_store::base::message_store::MessageStore;
use rocketmq_store::consume_queue::cq_ext_unit::CqExtUnit;
use rocketmq_store::filter::MessageFilter;
use rocketmq_store::filter::ArcMessageFilter;
use tokio::select;
use tokio::sync::Notify;
use tracing::error;
Expand Down Expand Up @@ -375,7 +375,7 @@ impl<MS: MessageStore, RP: RequestProcessor + Sync + 'static> PopLongPollingServ
remoting_command: &mut RemotingCommand,
request_header: PollingHeader,
subscription_data: Option<SubscriptionData>,
message_filter: Option<Arc<Box<dyn MessageFilter>>>,
message_filter: Option<ArcMessageFilter>,
) -> PollingResult {
//this method may be need to optimize
if request_header.get_poll_time() <= 0 {
Expand Down
8 changes: 4 additions & 4 deletions rocketmq-broker/src/long_polling/pop_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use rocketmq_remoting::net::channel::Channel;
use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData;
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
use rocketmq_store::filter::MessageFilter;
use rocketmq_store::filter::ArcMessageFilter;

pub struct PopRequest {
remoting_command: RemotingCommand,
Expand All @@ -32,7 +32,7 @@ pub struct PopRequest {
op: i64,
expired: u64,
subscription_data: Option<SubscriptionData>,
message_filter: Option<Arc<Box<dyn MessageFilter>>>,
message_filter: Option<ArcMessageFilter>,
}

impl PopRequest {
Expand All @@ -41,7 +41,7 @@ impl PopRequest {
ctx: ConnectionHandlerContext,
expired: u64,
subscription_data: Option<SubscriptionData>,
message_filter: Option<Arc<Box<dyn MessageFilter>>>,
message_filter: Option<ArcMessageFilter>,
) -> Self {
static COUNTER: AtomicI64 = AtomicI64::new(i64::MIN);
let op = COUNTER.fetch_add(1, Ordering::SeqCst);
Expand Down Expand Up @@ -96,7 +96,7 @@ impl PopRequest {
self.subscription_data.as_ref()
}

pub fn get_message_filter(&self) -> Option<&Arc<Box<dyn MessageFilter>>> {
pub fn get_message_filter(&self) -> Option<&ArcMessageFilter> {
self.message_filter.as_ref()
}
}
Expand Down
10 changes: 4 additions & 6 deletions rocketmq-broker/src/long_polling/pull_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use rocketmq_remoting::net::channel::Channel;
use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData;
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
use rocketmq_store::filter::MessageFilter;
use rocketmq_store::filter::ArcMessageFilter;

#[derive(Clone)]
pub struct PullRequest {
Expand All @@ -29,7 +27,7 @@ pub struct PullRequest {
suspend_timestamp: u64,
pull_from_this_offset: i64,
subscription_data: SubscriptionData,
message_filter: Arc<Box<dyn MessageFilter>>,
message_filter: ArcMessageFilter,
}

impl PullRequest {
Expand All @@ -41,7 +39,7 @@ impl PullRequest {
suspend_timestamp: u64,
pull_from_this_offset: i64,
subscription_data: SubscriptionData,
message_filter: Arc<Box<dyn MessageFilter>>,
message_filter: ArcMessageFilter,
) -> Self {
Self {
request_command,
Expand Down Expand Up @@ -75,7 +73,7 @@ impl PullRequest {
&self.subscription_data
}

pub fn message_filter(&self) -> Arc<Box<dyn MessageFilter>> {
pub fn message_filter(&self) -> ArcMessageFilter {
self.message_filter.clone()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use rocketmq_rust::ArcMut;
use rocketmq_store::base::get_message_result::GetMessageResult;
use rocketmq_store::base::message_status_enum::GetMessageStatus;
use rocketmq_store::base::message_store::MessageStore;
use rocketmq_store::filter::MessageFilter;
use rocketmq_store::filter::ArcMessageFilter;
use rocketmq_store::message_store::local_file_message_store::LocalFileMessageStore;
use rocketmq_store::stats::broker_stats_manager::BrokerStatsManager;
use rocketmq_store::stats::stats_type::StatsType;
Expand Down Expand Up @@ -95,7 +95,7 @@ impl<MS: MessageStore> PullMessageResultHandler for DefaultPullMessageResultHand
subscription_data: SubscriptionData,
subscription_group_config: &SubscriptionGroupConfig,
broker_allow_suspend: bool,
message_filter: Arc<Box<dyn MessageFilter>>,
message_filter: ArcMessageFilter,
mut response: RemotingCommand,
mut mapping_context: TopicQueueMappingContext,
_begin_time_mills: u64,
Expand Down
11 changes: 5 additions & 6 deletions rocketmq-broker/src/processor/pop_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use rocketmq_store::base::get_message_result::GetMessageResult;
use rocketmq_store::base::message_status_enum::GetMessageStatus;
use rocketmq_store::base::message_store::MessageStore;
use rocketmq_store::base::select_result::SelectMappedBufferResult;
use rocketmq_store::filter::MessageFilter;
use rocketmq_store::filter::ArcMessageFilter;
use rocketmq_store::pop::batch_ack_msg::BatchAckMsg;
use rocketmq_store::pop::pop_check_point::PopCheckPoint;
use rocketmq_store::pop::AckMessage;
Expand Down Expand Up @@ -365,7 +365,7 @@ where
)));
}
let consumer_filter_data = consumer_filter_data.unwrap();
let message_filter: Box<dyn MessageFilter> = Box::new(ExpressionMessageFilter::new(
let message_filter: ArcMessageFilter = Arc::new(ExpressionMessageFilter::new(
Some(subscription_data.clone()),
Some(consumer_filter_data),
Arc::new(self.broker_runtime_inner.consumer_filter_manager().clone()),
Expand Down Expand Up @@ -458,7 +458,6 @@ where
};
let pop_time = get_current_millis();

let message_filter = message_filter.map(Arc::new);
let mut rest_num = 0; // remaining number of messages to be fetched
if need_retry && !request_header.order.unwrap_or(false) {
rest_num = if need_retry_v1 {
Expand Down Expand Up @@ -701,7 +700,7 @@ where
revive_qid: i32,
channel: Channel,
pop_time: u64,
message_filter: Option<Arc<Box<dyn MessageFilter>>>,
message_filter: Option<ArcMessageFilter>,
start_offset_info: &mut String,
msg_offset_info: &mut String,
order_count_info: &mut str,
Expand Down Expand Up @@ -741,7 +740,7 @@ where
revive_qid: i32,
channel: Channel,
pop_time: u64,
message_filter: Option<Arc<Box<dyn MessageFilter>>>,
message_filter: Option<ArcMessageFilter>,
start_offset_info: &mut String,
msg_offset_info: &mut String,
order_count_info: &mut str,
Expand Down Expand Up @@ -785,7 +784,7 @@ where
revive_qid: i32,
channel: Channel,
pop_time: u64,
message_filter: Option<Arc<Box<dyn MessageFilter>>>,
message_filter: Option<ArcMessageFilter>,
start_offset_info: &mut String,
msg_offset_info: &mut String,
order_count_info: &mut str,
Expand Down
10 changes: 5 additions & 5 deletions rocketmq-broker/src/processor/pull_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use rocketmq_rust::ArcMut;
use rocketmq_store::base::get_message_result::GetMessageResult;
use rocketmq_store::base::message_status_enum::GetMessageStatus;
use rocketmq_store::base::message_store::MessageStore;
use rocketmq_store::filter::MessageFilter;
use rocketmq_store::filter::ArcMessageFilter;
use tokio::sync::Mutex;
use tracing::error;
use tracing::info;
Expand Down Expand Up @@ -465,16 +465,16 @@ where
&self,
subscription_data: &rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData,
consumer_filter_data: Option<ConsumerFilterData>,
) -> Arc<Box<dyn MessageFilter>> {
) -> ArcMessageFilter {
// TODO: Consider optimizing consumer_filter_manager clone - Arc wrapper might be better
if self.broker_runtime_inner.broker_config().filter_support_retry {
Arc::new(Box::new(ExpressionForRetryMessageFilter))
Arc::new(ExpressionForRetryMessageFilter)
} else {
Arc::new(Box::new(ExpressionMessageFilter::new(
Arc::new(ExpressionMessageFilter::new(
Some(subscription_data.clone()),
consumer_filter_data,
Arc::new(self.broker_runtime_inner.consumer_filter_manager().clone()),
)))
))
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions rocketmq-broker/src/processor/pull_message_result_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::any::Any;
use std::sync::Arc;

use rocketmq_remoting::net::channel::Channel;
use rocketmq_remoting::protocol::header::pull_message_request_header::PullMessageRequestHeader;
Expand All @@ -23,7 +22,7 @@ use rocketmq_remoting::protocol::static_topic::topic_queue_mapping_context::Topi
use rocketmq_remoting::protocol::subscription::subscription_group_config::SubscriptionGroupConfig;
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
use rocketmq_store::base::get_message_result::GetMessageResult;
use rocketmq_store::filter::MessageFilter;
use rocketmq_store::filter::ArcMessageFilter;

/// Trait defining the behavior for handling the result of a pull message request.
///
Expand Down Expand Up @@ -66,7 +65,7 @@ pub trait PullMessageResultHandler: Sync + Send + Any + 'static {
subscription_data: SubscriptionData,
subscription_group_config: &SubscriptionGroupConfig,
broker_allow_suspend: bool,
message_filter: Arc<Box<dyn MessageFilter>>,
message_filter: ArcMessageFilter,
response: RemotingCommand,
mapping_context: TopicQueueMappingContext,
begin_time_mills: u64,
Expand Down
5 changes: 3 additions & 2 deletions rocketmq-store/src/base/message_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::base::store_checkpoint::StoreCheckpoint;
use crate::base::store_stats_service::StoreStatsService;
use crate::base::transient_store_pool::TransientStorePool;
use crate::config::message_store_config::MessageStoreConfig;
use crate::filter::ArcMessageFilter;
use crate::filter::MessageFilter;
use crate::ha::general_ha_service::GeneralHAService;
use crate::hook::put_message_hook::BoxedPutMessageHook;
Expand Down Expand Up @@ -137,7 +138,7 @@ pub trait MessageStoreInner: Sync + 'static {
queue_id: i32,
offset: i64,
max_msg_nums: i32,
message_filter: Option<Arc<Box<dyn MessageFilter>>>,
message_filter: Option<ArcMessageFilter>,
) -> Option<GetMessageResult>;

/* /// Asynchronous get message
Expand All @@ -160,7 +161,7 @@ pub trait MessageStoreInner: Sync + 'static {
offset: i64,
max_msg_nums: i32,
max_total_msg_size: i32,
message_filter: Option<Arc<Box<dyn MessageFilter>>>,
message_filter: Option<ArcMessageFilter>,
) -> Option<GetMessageResult>;

/* /// Asynchronous get message with size constraint
Expand Down
7 changes: 7 additions & 0 deletions rocketmq-store/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use cheetah_string::CheetahString;

Expand Down Expand Up @@ -48,3 +49,9 @@ pub trait MessageFilter: Send + Sync {
properties: Option<&HashMap<CheetahString, CheetahString>>,
) -> bool;
}

/// Type alias for an atomically reference-counted message filter.
///
/// This is the standard way to share a [`MessageFilter`] implementation across multiple
/// threads or components. The `Arc` provides thread-safe reference counting.
pub type ArcMessageFilter = Arc<dyn MessageFilter>;
5 changes: 3 additions & 2 deletions rocketmq-store/src/message_store/local_file_message_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ use crate::config::flush_disk_type::FlushDiskType;
use crate::config::message_store_config::MessageStoreConfig;
use crate::config::store_path_config_helper::get_store_path_batch_consume_queue;
use crate::config::store_path_config_helper::get_store_path_consume_queue_ext;
use crate::filter::ArcMessageFilter;
use crate::filter::MessageFilter;
use crate::ha::general_ha_service::GeneralHAService;
use crate::ha::ha_service::HAService;
Expand Down Expand Up @@ -841,7 +842,7 @@ impl MessageStore for LocalFileMessageStore {
queue_id: i32,
offset: i64,
max_msg_nums: i32,
message_filter: Option<Arc<Box<dyn MessageFilter>>>,
message_filter: Option<ArcMessageFilter>,
) -> Option<GetMessageResult> {
self.get_message_with_size_limit(
group,
Expand All @@ -863,7 +864,7 @@ impl MessageStore for LocalFileMessageStore {
offset: i64,
max_msg_nums: i32,
max_total_msg_size: i32,
message_filter: Option<Arc<Box<dyn MessageFilter>>>,
message_filter: Option<ArcMessageFilter>,
) -> Option<GetMessageResult> {
if self.shutdown.load(Ordering::Relaxed) {
warn!("message store has shutdown, so getMessage is forbidden");
Expand Down
Loading