Skip to content

[ISSUE #3510]🚀Implement Comprehensive High Availability (HA) Subsystem Architecture with Enhanced Service Infrastructure✨#3511

Merged
rocketmq-rust-bot merged 1 commit intomainfrom
feature-3510
Jun 22, 2025
Merged

[ISSUE #3510]🚀Implement Comprehensive High Availability (HA) Subsystem Architecture with Enhanced Service Infrastructure✨#3511
rocketmq-rust-bot merged 1 commit intomainfrom
feature-3510

Conversation

@mxsm
Copy link
Owner

@mxsm mxsm commented Jun 22, 2025

Which Issue(s) This PR Fixes(Closes)

Fixes #3510

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • New Features

    • Added methods and constructors to several components, enabling new ways to initialize and access message store configurations and services.
  • Refactor

    • Updated various service components to use shared, mutable references for improved flexibility and consistency.
    • Enhanced several services by introducing internal state and dependency management, replacing previously stateless implementations.
  • Bug Fixes

    • Improved initialization logic for high availability services to ensure correct setup and configuration sharing.

…m Architecture with Enhanced Service Infrastructure✨
Copilot AI review requested due to automatic review settings June 22, 2025 09:17
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jun 22, 2025

Walkthrough

This change refactors the High Availability (HA) subsystem in RocketMQ-Rust, introducing enhanced service infrastructure, dependency injection, and unified service management. It updates constructors, field types, and initialization patterns for core HA components, implements proper ownership semantics with ArcMut, and adds missing constructors and methods for service coordination and configuration-driven behavior.

Changes

File(s) Change Summary
rocketmq-store/src/ha/default_ha_client.rs Changed DefaultHAClient::new to return ArcMut<Self> instead of Arc<Self>.
rocketmq-store/src/ha/default_ha_service.rs Made ha_connection_state_notification_service optional; refactored init for dependency injection, role logic.
rocketmq-store/src/ha/general_ha_client.rs Changed default_ha_service field and setter to use ArcMut<DefaultHAClient>.
rocketmq-store/src/ha/general_ha_service.rs Changed service fields to use ArcMut; added Clone derive and new constructors; refactored init logic.
rocketmq-store/src/ha/group_transfer_service.rs Made struct concrete with config and service fields; added constructor.
rocketmq-store/src/ha/ha_connection_state_notification_service.rs Made struct concrete with service and store fields; added constructor.
rocketmq-store/src/message_store/local_file_message_store.rs Added get_message_store_config() method to expose config.

Sequence Diagram(s)

sequenceDiagram
    participant Config as MessageStoreConfig
    participant Store as LocalFileMessageStore
    participant GeneralHA as GeneralHAService
    participant DefaultHA as DefaultHAService
    participant GroupTransfer as GroupTransferService
    participant Notification as HAConnectionStateNotificationService
    participant Client as DefaultHAClient

    Store->>Config: get_message_store_config()
    GeneralHA->>Config: Reads enable_controller_mode
    alt enable_controller_mode == true
        GeneralHA->>GeneralHA: Set auto_switch_ha_service
    else enable_controller_mode == false
        GeneralHA->>DefaultHA: Create DefaultHAService (ArcMut)
        DefaultHA->>GeneralHA: GeneralHAService::new_with_default_ha_service
        DefaultHA->>GroupTransfer: GroupTransferService::new(config, general_ha)
        DefaultHA->>Notification: HAConnectionStateNotificationService::new(general_ha, store)
        DefaultHA->>Config: Check broker_role
        alt broker_role == Slave
            DefaultHA->>Client: DefaultHAClient::new(store)
            DefaultHA->>Client: Set as default HA client
        end
    end
Loading

Assessment against linked issues

Objective Addressed Explanation
DefaultHAService Enhancement: Infrastructure, dependency injection, lifecycle management (#3510)
GeneralHAService Implementation: Unified architecture, mode switching, cloneable, lifecycle (#3510)
Service Component Integration: GroupTransferService and HAConnectionStateNotificationService constructors (#3510)
Configuration-Driven Architecture, Dependency Injection, Role-Based Logic (#3510)
GeneralHAClient: ArcMut usage, setter method update (#3510)

Poem

In the warren of code where the HA bunnies dwell,
New services hop in, dependencies as well.
With ArcMut carrots and configs so bright,
They wire up the burrow, both left and right.
Now master and slave, with roles well-defined,
Hop into high-availability, robust and aligned!
🐇✨

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@rocketmq-rust-bot
Copy link
Collaborator

🔊@mxsm 🚀Thanks for your contribution🎉!

💡CodeRabbit(AI) will review your code first🔥!

Note

🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Implements a comprehensive HA subsystem by introducing new service abstractions and refactoring existing HA components to use ArcMut wrappers and dynamic initialization.

  • Exposed message store configuration via a public getter.
  • Added two new services (HAConnectionStateNotificationService, GroupTransferService) with constructors.
  • Refactored GeneralHAService, GeneralHAClient, DefaultHAService, and DefaultHAClient to use ArcMut and support dynamic initialization based on broker role.

Reviewed Changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated no comments.

Show a summary per file
File Description
rocketmq-store/src/message_store/local_file_message_store.rs Added get_message_store_config public accessor.
rocketmq-store/src/ha/ha_connection_state_notification_service.rs Defined HAConnectionStateNotificationService struct with new method.
rocketmq-store/src/ha/group_transfer_service.rs Defined GroupTransferService struct with new method.
rocketmq-store/src/ha/general_ha_service.rs Refactored services to use ArcMut, added new/new_with_default and updated init.
rocketmq-store/src/ha/general_ha_client.rs Wrapped default client in ArcMut and updated setter signature.
rocketmq-store/src/ha/default_ha_service.rs Changed notification service to Option, updated init for slave/master roles.
rocketmq-store/src/ha/default_ha_client.rs Changed constructor return type from Arc to ArcMut.
Comments suppressed due to low confidence (3)

rocketmq-store/src/ha/general_ha_client.rs:24

  • [nitpick] Field is named auto_switch_ha_service but holds a client type. Consider renaming to auto_switch_ha_client (and/or wrapping it in ArcMut) for consistency with default_ha_service.
    auto_switch_ha_service: Option<AutoSwitchHAClient>,

rocketmq-store/src/ha/general_ha_service.rs:45

  • Consider adding unit tests for GeneralHAService::new and new_with_default_ha_service to verify that the internal state fields are initialized as expected.
    pub fn new() -> Self {

rocketmq-store/src/message_store/local_file_message_store.rs:535

  • The indentation of this method (4 spaces) does not match the surrounding methods (6 spaces). Align it to improve readability.
    pub fn get_message_store_config(&self) -> Arc<MessageStoreConfig> {

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🔭 Outside diff range comments (1)
rocketmq-store/src/ha/default_ha_service.rs (1)

1-32: Remove duplicate license header.

The file contains duplicate Apache license headers (lines 1-16 and 17-32), which should be cleaned up to maintain code quality.

-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 145a8fc and b8a61a1.

📒 Files selected for processing (7)
  • rocketmq-store/src/ha/default_ha_client.rs (1 hunks)
  • rocketmq-store/src/ha/default_ha_service.rs (5 hunks)
  • rocketmq-store/src/ha/general_ha_client.rs (2 hunks)
  • rocketmq-store/src/ha/general_ha_service.rs (1 hunks)
  • rocketmq-store/src/ha/group_transfer_service.rs (1 hunks)
  • rocketmq-store/src/ha/ha_connection_state_notification_service.rs (1 hunks)
  • rocketmq-store/src/message_store/local_file_message_store.rs (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: test
  • GitHub Check: build
  • GitHub Check: build (windows-latest, stable)
  • GitHub Check: build (windows-latest, nightly)
  • GitHub Check: build (macos-latest, stable)
  • GitHub Check: build (macos-latest, nightly)
  • GitHub Check: build (ubuntu-latest, nightly)
  • GitHub Check: build (ubuntu-latest, stable)
  • GitHub Check: auto-approve
🔇 Additional comments (12)
rocketmq-store/src/message_store/local_file_message_store.rs (1)

534-537: LGTM! Clean configuration accessor method.

The implementation correctly clones the Arc for thread-safe sharing of the configuration. This method enables HA service components to access the message store configuration safely.

rocketmq-store/src/ha/ha_connection_state_notification_service.rs (2)

17-21: Good dependency imports for HA integration.

The imports correctly bring in the necessary types for HA service coordination and shared mutable ownership patterns.


22-37: Well-structured service with proper dependency injection.

The service now holds references to both the HA service and message store, enabling it to coordinate HA connection state notifications effectively. The constructor follows standard patterns and properly initializes the dependencies.

rocketmq-store/src/ha/general_ha_client.rs (3)

17-17: Appropriate import for shared mutable ownership.

The ArcMut import aligns with the refactoring to use mutable atomic reference counting for HA components.


23-23: Consistent ownership model change.

The field type change from DefaultHAClient to ArcMut<DefaultHAClient> enables shared mutable access, which is consistent with the broader HA subsystem refactoring.


41-43: Parameter type updated to match field type.

The setter method parameter correctly matches the new field type, maintaining consistency in the ownership model.

rocketmq-store/src/ha/default_ha_client.rs (2)

113-113: Return type updated for mutable shared ownership.

The constructor now returns ArcMut<Self> instead of Arc<Self>, enabling mutable shared access to the DefaultHAClient instance. This aligns with the broader HA subsystem refactoring.


120-139: Constructor implementation updated consistently.

The implementation correctly uses ArcMut::new(Self { ... }) instead of Arc::new(Self { ... }), maintaining the same initialization logic while supporting the new ownership model.

rocketmq-store/src/ha/group_transfer_service.rs (1)

22-37: Well-designed transformation from stateless to stateful service.

The refactoring from an empty struct to a concrete implementation with proper dependency injection is excellent. The use of Arc<MessageStoreConfig> for shared configuration and direct ownership of GeneralHAService follows good Rust patterns for service composition.

rocketmq-store/src/ha/default_ha_service.rs (2)

69-69: Good architectural improvement with optional service initialization.

The change to Option<HAConnectionStateNotificationService> allows for more flexible initialization patterns and aligns well with the lazy initialization approach in the init method.


98-115: Comprehensive HA service initialization logic.

The init method properly orchestrates the initialization of various HA components with appropriate dependency injection. The error handling for DefaultHAClient creation and conditional initialization based on broker role is well-implemented.

rocketmq-store/src/ha/general_ha_service.rs (1)

45-57: Well-designed constructors for flexible initialization.

The new constructors provide good flexibility for different initialization scenarios. The default constructor and the specific constructor with DefaultHAService follow clear patterns for dependency injection.

Comment on lines +66 to +68
let mut default_ha_service = ArcMut::new(DefaultHAService::new(message_store));
let default_ha_service_clone = default_ha_service.clone();
DefaultHAService::init(&mut default_ha_service, default_ha_service_clone)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix incorrect static method call.

The call to DefaultHAService::init appears to be treated as a static method, but based on the method signature in default_ha_service.rs (line 98), it's an instance method that should be called on the default_ha_service instance.

-            let mut default_ha_service = ArcMut::new(DefaultHAService::new(message_store));
-            let default_ha_service_clone = default_ha_service.clone();
-            DefaultHAService::init(&mut default_ha_service, default_ha_service_clone)?;
+            let mut default_ha_service = ArcMut::new(DefaultHAService::new(message_store));
+            let default_ha_service_clone = default_ha_service.clone();
+            default_ha_service.init(default_ha_service_clone)?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let mut default_ha_service = ArcMut::new(DefaultHAService::new(message_store));
let default_ha_service_clone = default_ha_service.clone();
DefaultHAService::init(&mut default_ha_service, default_ha_service_clone)?;
let mut default_ha_service = ArcMut::new(DefaultHAService::new(message_store));
let default_ha_service_clone = default_ha_service.clone();
default_ha_service.init(default_ha_service_clone)?;
🤖 Prompt for AI Agents
In rocketmq-store/src/ha/general_ha_service.rs around lines 66 to 68, the method
init is incorrectly called as a static method on DefaultHAService. Instead, call
init as an instance method on the default_ha_service object by using
default_ha_service.init(...) with the appropriate arguments, adjusting the call
to match the instance method signature.

use crate::store_error::HAError;
use crate::store_error::HAResult;

#[derive(Clone)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Reconsider Clone derive for shared mutable state.

Adding Clone to a struct containing ArcMut fields can lead to unexpected behavior where multiple clones share the same mutable state. This could violate Rust's safety guarantees and make reasoning about state mutations difficult.

Consider removing the Clone derive or implementing it explicitly with proper documentation about the shared state implications:

-#[derive(Clone)]
 pub struct GeneralHAService {
🤖 Prompt for AI Agents
In rocketmq-store/src/ha/general_ha_service.rs at line 38, the struct is derived
with Clone while containing ArcMut fields, which can cause multiple clones to
share the same mutable state unexpectedly. Remove the automatic Clone derive and
instead implement Clone manually if needed, ensuring to document the shared
mutable state behavior clearly to avoid confusion and maintain safety
guarantees.

@codecov
Copy link

codecov bot commented Jun 22, 2025

Codecov Report

Attention: Patch coverage is 0% with 55 lines in your changes missing coverage. Please review.

Project coverage is 26.25%. Comparing base (145a8fc) to head (b8a61a1).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
rocketmq-store/src/ha/general_ha_service.rs 0.00% 16 Missing ⚠️
rocketmq-store/src/ha/default_ha_service.rs 0.00% 15 Missing ⚠️
rocketmq-store/src/ha/group_transfer_service.rs 0.00% 9 Missing ⚠️
...src/ha/ha_connection_state_notification_service.rs 0.00% 9 Missing ⚠️
...tore/src/message_store/local_file_message_store.rs 0.00% 3 Missing ⚠️
rocketmq-store/src/ha/default_ha_client.rs 0.00% 2 Missing ⚠️
rocketmq-store/src/ha/general_ha_client.rs 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3511      +/-   ##
==========================================
- Coverage   26.26%   26.25%   -0.02%     
==========================================
  Files         552      554       +2     
  Lines       78400    78447      +47     
==========================================
  Hits        20593    20593              
- Misses      57807    57854      +47     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Collaborator

@rocketmq-rust-bot rocketmq-rust-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@rocketmq-rust-bot rocketmq-rust-bot merged commit 94f20fd into main Jun 22, 2025
21 of 23 checks passed
@rocketmq-rust-bot rocketmq-rust-bot added approved PR has approved and removed ready to review waiting-review waiting review this PR labels Jun 22, 2025
@mxsm mxsm deleted the feature-3510 branch June 26, 2025 03:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AI review first Ai review pr first approved PR has approved auto merge

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement Comprehensive High Availability (HA) Subsystem Architecture with Enhanced Service Infrastructure

3 participants