Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1755,7 +1755,7 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {

#[inline]
pub fn timer_message_store_unchecked(&self) -> &TimerMessageStore {
unsafe { self.timer_message_store.as_ref().unwrap_unchecked() }
unimplemented!("timer_message_store_unchecked is not implemented yet")
Copy link

Copilot AI Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change will cause a runtime panic when timer messages are processed. The method timer_message_store_unchecked() is actively called in rocketmq-broker/src/util/hook_utils.rs:157 within the handle_schedule_message function, which is used by ScheduleMessageHook for processing messages with timer properties. When a timer message is handled (and timer_wheel_enable is true), this will panic instead of returning the timer message store. The original unsafe unwrap_unchecked() should be replaced with a safe alternative like unwrap() or proper error handling, not unimplemented!().

Suggested change
unimplemented!("timer_message_store_unchecked is not implemented yet")
self.timer_message_store.as_ref().unwrap()

Copilot uses AI. Check for mistakes.
}
Comment on lines 1757 to 1759
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for any callers of timer_message_store_unchecked across all Rust files
rg -n 'timer_message_store_unchecked' --type rust

Repository: mxsm/rocketmq-rust

Length of output: 415


🏁 Script executed:

#!/bin/bash
# Get context around the caller at hook_utils.rs:157
sed -n '145,170p' rocketmq-broker/src/util/hook_utils.rs | cat -n

Repository: mxsm/rocketmq-rust

Length of output: 1568


Do not use unimplemented!() — this method has a caller that will panic at runtime.

timer_message_store_unchecked() is called from rocketmq-broker/src/util/hook_utils.rs:157 in the handle_schedule_message() function, which is the core message scheduling handler. When timer_wheel_enable is true, any incoming timer message will trigger the unimplemented!() panic.

Either implement the method properly or remove the public API. If the feature is not ready, consider returning an error or wrapping the method with a feature flag.

🤖 Prompt for AI Agents
In `@rocketmq-broker/src/broker_runtime.rs` around lines 1757 - 1759,
timer_message_store_unchecked currently calls unimplemented!() which will panic
when handle_schedule_message invokes it (when timer_wheel_enable is true);
replace the panic with a real implementation or a safe API: either return a
reference to the existing TimerMessageStore instance (implement the body of
timer_message_store_unchecked to retrieve and return the stored
TimerMessageStore), or change the public API to return
Result<&TimerMessageStore, Error> and propagate errors up from
handle_schedule_message, or gate the method behind a feature flag so callers
cannot call it when timer wheel is disabled; locate
timer_message_store_unchecked and the TimerMessageStore storage/field in the
broker_runtime impl and implement safe retrieval or alter its signature and
update handle_schedule_message accordingly.


#[inline]
Expand Down
11 changes: 2 additions & 9 deletions rocketmq-broker/src/processor/query_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,8 @@ where
}

impl<MS: MessageStore> QueryMessageProcessor<MS> {
pub fn new(
/* message_store_config: Arc<MessageStoreConfig>, message_store: ArcMut<MS> */
broker_runtime_inner: ArcMut<BrokerRuntimeInner<MS>>,
) -> Self {
Self {
/*message_store_config,
message_store,*/
broker_runtime_inner,
}
pub fn new(broker_runtime_inner: ArcMut<BrokerRuntimeInner<MS>>) -> Self {
Self { broker_runtime_inner }
}
}

Expand Down
17 changes: 0 additions & 17 deletions rocketmq/src/task/service_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,23 +86,6 @@ impl ServiceContext {
}
}

/*#[trait_variant::make(ServiceTask: Send)]
pub trait ServiceTaskInner: Sync {
/// Get the service name
fn get_service_name(&self) -> String;

/// Main run method - implement the service logic here
async fn run(&self, context: &ServiceTaskContext);

/// Called when wait ends - override for custom behavior
async fn on_wait_end(&self);

/// Get join time for shutdown (default 90 seconds)
fn get_join_time(&self) -> Duration {
Duration::from_millis(90_000)
}
}*/

pub trait ServiceTask: Sync + Send {
/// Get the service name
fn get_service_name(&self) -> String;
Expand Down
Loading