Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rocketmq-store/src/ha/default_ha_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,14 @@
/// Create a new DefaultHAClient
pub fn new(
default_message_store: ArcMut<LocalFileMessageStore>,
) -> Result<Arc<Self>, HAClientError> {
) -> Result<ArcMut<Self>, HAClientError> {

Check warning on line 113 in rocketmq-store/src/ha/default_ha_client.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/ha/default_ha_client.rs#L113

Added line #L113 was not covered by tests
let flow_monitor = Arc::new(FlowMonitor::new(
default_message_store.message_store_config(),
));

let now = get_current_millis() as i64;

Ok(Arc::new(Self {
Ok(ArcMut::new(Self {

Check warning on line 120 in rocketmq-store/src/ha/default_ha_client.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/ha/default_ha_client.rs#L120

Added line #L120 was not covered by tests
master_ha_address: Arc::new(RwLock::new(None)),
master_address: Arc::new(RwLock::new(None)),
socket_stream: Arc::new(RwLock::new(None)),
Expand Down
24 changes: 21 additions & 3 deletions rocketmq-store/src/ha/default_ha_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,15 @@
use std::sync::atomic::AtomicU64;
use std::sync::Arc;

use rocketmq_common::common::broker::broker_role::BrokerRole;
use rocketmq_remoting::protocol::body::ha_runtime_info::HARuntimeInfo;
use rocketmq_rust::ArcMut;
use tracing::error;

use crate::ha::default_ha_client::DefaultHAClient;
use crate::ha::general_ha_client::GeneralHAClient;
use crate::ha::general_ha_connection::GeneralHAConnection;
use crate::ha::general_ha_service::GeneralHAService;
use crate::ha::group_transfer_service::GroupTransferService;
use crate::ha::ha_client::HAClient;
use crate::ha::ha_connection::HAConnection;
Expand All @@ -51,6 +54,7 @@
use crate::ha::wait_notify_object::WaitNotifyObject;
use crate::log_file::flush_manager_impl::group_commit_request::GroupCommitRequest;
use crate::message_store::local_file_message_store::LocalFileMessageStore;
use crate::store_error::HAError;
use crate::store_error::HAResult;

pub struct DefaultHAService {
Expand All @@ -62,7 +66,7 @@
push2_slave_max_offset: Arc<AtomicU64>,
group_transfer_service: Option<GroupTransferService>,
ha_client: GeneralHAClient,
ha_connection_state_notification_service: HAConnectionStateNotificationService,
ha_connection_state_notification_service: Option<HAConnectionStateNotificationService>,
}

impl DefaultHAService {
Expand All @@ -76,7 +80,7 @@
push2_slave_max_offset: Arc::new(AtomicU64::new(0)),
group_transfer_service: None,
ha_client: GeneralHAClient::new(),
ha_connection_state_notification_service: HAConnectionStateNotificationService,
ha_connection_state_notification_service: None,

Check warning on line 83 in rocketmq-store/src/ha/default_ha_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/ha/default_ha_service.rs#L83

Added line #L83 was not covered by tests
}
}

Expand All @@ -91,8 +95,22 @@
unimplemented!(" notify_transfer_some method is not implemented");
}

pub(crate) fn init(&mut self) -> HAResult<()> {
pub(crate) fn init(&mut self, this: ArcMut<Self>) -> HAResult<()> {

Check warning on line 98 in rocketmq-store/src/ha/default_ha_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/ha/default_ha_service.rs#L98

Added line #L98 was not covered by tests
// Initialize the DefaultHAService with the provided message store.
let config = self.default_message_store.get_message_store_config();
let service = GeneralHAService::new_with_default_ha_service(this.clone());
let group_transfer_service = GroupTransferService::new(config.clone(), service.clone());
self.group_transfer_service = Some(group_transfer_service);

Check warning on line 103 in rocketmq-store/src/ha/default_ha_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/ha/default_ha_service.rs#L100-L103

Added lines #L100 - L103 were not covered by tests

if config.broker_role == BrokerRole::Slave {
let default_message_store = self.default_message_store.clone();
let client = DefaultHAClient::new(default_message_store)
.map_err(|e| HAError::Service(format!("Failed to create DefaultHAClient: {e}")))?;
self.ha_client.set_default_ha_service(client)
}
let state_notification_service =
HAConnectionStateNotificationService::new(service, self.default_message_store.clone());
self.ha_connection_state_notification_service = Some(state_notification_service);

Check warning on line 113 in rocketmq-store/src/ha/default_ha_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/ha/default_ha_service.rs#L105-L113

Added lines #L105 - L113 were not covered by tests
Ok(())
}
}
Expand Down
5 changes: 3 additions & 2 deletions rocketmq-store/src/ha/general_ha_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use rocketmq_rust::ArcMut;

use crate::ha::auto_switch::auto_switch_ha_client::AutoSwitchHAClient;
use crate::ha::default_ha_client::DefaultHAClient;

pub struct GeneralHAClient {
default_ha_service: Option<DefaultHAClient>,
default_ha_service: Option<ArcMut<DefaultHAClient>>,
auto_switch_ha_service: Option<AutoSwitchHAClient>,
}

Expand All @@ -37,7 +38,7 @@
}
}

pub fn set_default_ha_service(&mut self, service: DefaultHAClient) {
pub fn set_default_ha_service(&mut self, service: ArcMut<DefaultHAClient>) {

Check warning on line 41 in rocketmq-store/src/ha/general_ha_client.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/ha/general_ha_client.rs#L41

Added line #L41 was not covered by tests
self.default_ha_service = Some(service);
}

Expand Down
27 changes: 21 additions & 6 deletions rocketmq-store/src/ha/general_ha_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
use rocketmq_rust::ArcMut;
use tracing::error;

use crate::base::message_store::MessageStore;
use crate::ha::auto_switch::auto_switch_ha_service::AutoSwitchHAService;
use crate::ha::default_ha_service::DefaultHAService;
use crate::ha::ha_client::HAClient;
Expand All @@ -36,21 +35,37 @@
use crate::store_error::HAError;
use crate::store_error::HAResult;

#[derive(Clone)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Reconsider Clone derive for shared mutable state.

Adding Clone to a struct containing ArcMut fields can lead to unexpected behavior where multiple clones share the same mutable state. This could violate Rust's safety guarantees and make reasoning about state mutations difficult.

Consider removing the Clone derive or implementing it explicitly with proper documentation about the shared state implications:

-#[derive(Clone)]
 pub struct GeneralHAService {
🤖 Prompt for AI Agents
In rocketmq-store/src/ha/general_ha_service.rs at line 38, the struct is derived
with Clone while containing ArcMut fields, which can cause multiple clones to
share the same mutable state unexpectedly. Remove the automatic Clone derive and
instead implement Clone manually if needed, ensuring to document the shared
mutable state behavior clearly to avoid confusion and maintain safety
guarantees.

pub struct GeneralHAService {
default_ha_service: Option<DefaultHAService>,
auto_switch_ha_service: Option<AutoSwitchHAService>,
default_ha_service: Option<ArcMut<DefaultHAService>>,
auto_switch_ha_service: Option<ArcMut<AutoSwitchHAService>>,
}

impl GeneralHAService {
pub fn new() -> Self {
GeneralHAService {
default_ha_service: None,
auto_switch_ha_service: None,
}
}

Check warning on line 50 in rocketmq-store/src/ha/general_ha_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/ha/general_ha_service.rs#L45-L50

Added lines #L45 - L50 were not covered by tests

pub fn new_with_default_ha_service(default_ha_service: ArcMut<DefaultHAService>) -> Self {
GeneralHAService {
default_ha_service: Some(default_ha_service),
auto_switch_ha_service: None,
}
}

Check warning on line 57 in rocketmq-store/src/ha/general_ha_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/ha/general_ha_service.rs#L52-L57

Added lines #L52 - L57 were not covered by tests

pub(crate) fn init(&mut self, message_store: ArcMut<LocalFileMessageStore>) -> HAResult<()> {
if message_store
.get_message_store_config()
.enable_controller_mode
{
self.auto_switch_ha_service = Some(AutoSwitchHAService)
self.auto_switch_ha_service = Some(ArcMut::new(AutoSwitchHAService))

Check warning on line 64 in rocketmq-store/src/ha/general_ha_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/ha/general_ha_service.rs#L64

Added line #L64 was not covered by tests
} else {
let mut default_ha_service = DefaultHAService::new(message_store);
default_ha_service.init()?;
let mut default_ha_service = ArcMut::new(DefaultHAService::new(message_store));
let default_ha_service_clone = default_ha_service.clone();
DefaultHAService::init(&mut default_ha_service, default_ha_service_clone)?;

Check warning on line 68 in rocketmq-store/src/ha/general_ha_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/ha/general_ha_service.rs#L66-L68

Added lines #L66 - L68 were not covered by tests
Comment on lines +66 to +68
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix incorrect static method call.

The call to DefaultHAService::init appears to be treated as a static method, but based on the method signature in default_ha_service.rs (line 98), it's an instance method that should be called on the default_ha_service instance.

-            let mut default_ha_service = ArcMut::new(DefaultHAService::new(message_store));
-            let default_ha_service_clone = default_ha_service.clone();
-            DefaultHAService::init(&mut default_ha_service, default_ha_service_clone)?;
+            let mut default_ha_service = ArcMut::new(DefaultHAService::new(message_store));
+            let default_ha_service_clone = default_ha_service.clone();
+            default_ha_service.init(default_ha_service_clone)?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let mut default_ha_service = ArcMut::new(DefaultHAService::new(message_store));
let default_ha_service_clone = default_ha_service.clone();
DefaultHAService::init(&mut default_ha_service, default_ha_service_clone)?;
let mut default_ha_service = ArcMut::new(DefaultHAService::new(message_store));
let default_ha_service_clone = default_ha_service.clone();
default_ha_service.init(default_ha_service_clone)?;
🤖 Prompt for AI Agents
In rocketmq-store/src/ha/general_ha_service.rs around lines 66 to 68, the method
init is incorrectly called as a static method on DefaultHAService. Instead, call
init as an instance method on the default_ha_service object by using
default_ha_service.init(...) with the appropriate arguments, adjusting the call
to match the instance method signature.

self.default_ha_service = Some(default_ha_service);
}
Ok(())
Expand Down
21 changes: 20 additions & 1 deletion rocketmq-store/src/ha/group_transfer_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::sync::Arc;

pub struct GroupTransferService;
use crate::config::message_store_config::MessageStoreConfig;
use crate::ha::general_ha_service::GeneralHAService;

pub struct GroupTransferService {
message_store_config: Arc<MessageStoreConfig>,
ha_service: GeneralHAService,
}

impl GroupTransferService {
pub fn new(
message_store_config: Arc<MessageStoreConfig>,
ha_service: GeneralHAService,
) -> Self {
GroupTransferService {
message_store_config,
ha_service,
}
}

Check warning on line 36 in rocketmq-store/src/ha/group_transfer_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/ha/group_transfer_service.rs#L28-L36

Added lines #L28 - L36 were not covered by tests
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use rocketmq_rust::ArcMut;

pub struct HAConnectionStateNotificationService;
use crate::ha::general_ha_service::GeneralHAService;
use crate::message_store::local_file_message_store::LocalFileMessageStore;

pub struct HAConnectionStateNotificationService {
ha_service: GeneralHAService,
default_message_store: ArcMut<LocalFileMessageStore>,
}

impl HAConnectionStateNotificationService {
pub fn new(
ha_service: GeneralHAService,
default_message_store: ArcMut<LocalFileMessageStore>,
) -> Self {
HAConnectionStateNotificationService {
ha_service,
default_message_store,
}
}

Check warning on line 36 in rocketmq-store/src/ha/ha_connection_state_notification_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/ha/ha_connection_state_notification_service.rs#L28-L36

Added lines #L28 - L36 were not covered by tests
}
4 changes: 4 additions & 0 deletions rocketmq-store/src/message_store/local_file_message_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,10 @@
fn do_recheck_reput_offset_from_cq(&self) {
error!("do_recheck_reput_offset_from_cq called, not implemented yet");
}

pub fn get_message_store_config(&self) -> Arc<MessageStoreConfig> {
self.message_store_config.clone()
}

Check warning on line 537 in rocketmq-store/src/message_store/local_file_message_store.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/message_store/local_file_message_store.rs#L535-L537

Added lines #L535 - L537 were not covered by tests
}

fn estimate_in_mem_by_commit_offset(
Expand Down
Loading