Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rocketmq-store/src/ha.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ pub(crate) mod flow_monitor;
pub(crate) mod general_ha_client;
pub(crate) mod general_ha_connection;
pub(crate) mod general_ha_service;
mod group_transfer_service;
pub(crate) mod ha_client;
pub(crate) mod ha_connection;
pub(crate) mod ha_connection_state;
pub(crate) mod ha_connection_state_notification_request;
mod ha_connection_state_notification_service;
pub(crate) mod ha_service;
pub(crate) mod wait_notify_object;
36 changes: 33 additions & 3 deletions rocketmq-store/src/ha/default_ha_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,53 @@

use std::sync::atomic::AtomicI32;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;

use rocketmq_remoting::protocol::body::ha_runtime_info::HARuntimeInfo;
use rocketmq_rust::ArcMut;
use tracing::error;

use crate::ha::general_ha_client::GeneralHAClient;
use crate::ha::general_ha_connection::GeneralHAConnection;
use crate::ha::group_transfer_service::GroupTransferService;
use crate::ha::ha_client::HAClient;
use crate::ha::ha_connection::HAConnection;
use crate::ha::ha_connection_state_notification_request::HAConnectionStateNotificationRequest;
use crate::ha::ha_connection_state_notification_service::HAConnectionStateNotificationService;
use crate::ha::ha_service::HAService;
use crate::ha::wait_notify_object::WaitNotifyObject;
use crate::log_file::flush_manager_impl::group_commit_request::GroupCommitRequest;
use crate::message_store::local_file_message_store::LocalFileMessageStore;
use crate::store_error::HAResult;

#[derive(Default)]
pub struct DefaultHAService {}
pub struct DefaultHAService {
connection_count: Arc<AtomicU64>,
connection_list: Vec<GeneralHAConnection>,
accept_socket_service: AcceptSocketService,
default_message_store: ArcMut<LocalFileMessageStore>,
wait_notify_object: Arc<WaitNotifyObject>,
push2_slave_max_offset: Arc<AtomicU64>,
group_transfer_service: Option<GroupTransferService>,
ha_client: GeneralHAClient,
ha_connection_state_notification_service: HAConnectionStateNotificationService,
}

impl DefaultHAService {
pub fn new(message_store: ArcMut<LocalFileMessageStore>) -> Self {
DefaultHAService {
connection_count: Arc::new(AtomicU64::new(0)),
connection_list: Vec::new(),
accept_socket_service: AcceptSocketService,
default_message_store: message_store,
wait_notify_object: Arc::new(WaitNotifyObject),
push2_slave_max_offset: Arc::new(AtomicU64::new(0)),
group_transfer_service: None,
ha_client: GeneralHAClient::new(),
ha_connection_state_notification_service: HAConnectionStateNotificationService,
}
}

Check warning on line 81 in rocketmq-store/src/ha/default_ha_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/ha/default_ha_service.rs#L69-L81

Added lines #L69 - L81 were not covered by tests

// Add any necessary fields here
pub fn get_default_message_store(&self) -> &LocalFileMessageStore {
unimplemented!(" get_default_message_store method is not implemented");
Expand All @@ -63,7 +91,7 @@
unimplemented!(" notify_transfer_some method is not implemented");
}

pub(crate) fn init(&mut self, message_store: ArcMut<LocalFileMessageStore>) -> HAResult<()> {
pub(crate) fn init(&mut self) -> HAResult<()> {

Check warning on line 94 in rocketmq-store/src/ha/default_ha_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/ha/default_ha_service.rs#L94

Added line #L94 was not covered by tests
// Initialize the DefaultHAService with the provided message store.
Ok(())
}
Expand Down Expand Up @@ -152,3 +180,5 @@
todo!()
}
}

struct AcceptSocketService;
25 changes: 25 additions & 0 deletions rocketmq-store/src/ha/general_ha_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,28 @@
default_ha_service: Option<DefaultHAClient>,
auto_switch_ha_service: Option<AutoSwitchHAClient>,
}

impl Default for GeneralHAClient {
fn default() -> Self {
GeneralHAClient::new()
}

Check warning on line 29 in rocketmq-store/src/ha/general_ha_client.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/ha/general_ha_client.rs#L27-L29

Added lines #L27 - L29 were not covered by tests
}

impl GeneralHAClient {
pub fn new() -> Self {
GeneralHAClient {
default_ha_service: None,
auto_switch_ha_service: None,
}
}

Check warning on line 38 in rocketmq-store/src/ha/general_ha_client.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/ha/general_ha_client.rs#L33-L38

Added lines #L33 - L38 were not covered by tests

pub fn set_default_ha_service(&mut self, service: DefaultHAClient) {
self.default_ha_service = Some(service);
}

Check warning on line 42 in rocketmq-store/src/ha/general_ha_client.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/ha/general_ha_client.rs#L40-L42

Added lines #L40 - L42 were not covered by tests

pub fn set_auto_switch_ha_service(&mut self, service: AutoSwitchHAClient) {
self.auto_switch_ha_service = Some(service);
}

Check warning on line 46 in rocketmq-store/src/ha/general_ha_client.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/ha/general_ha_client.rs#L44-L46

Added lines #L44 - L46 were not covered by tests

// Additional methods to interact with the HA services can be added here
}
4 changes: 2 additions & 2 deletions rocketmq-store/src/ha/general_ha_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
{
self.auto_switch_ha_service = Some(AutoSwitchHAService)
} else {
let mut default_ha_service = DefaultHAService::default();
default_ha_service.init(message_store)?;
let mut default_ha_service = DefaultHAService::new(message_store);
default_ha_service.init()?;

Check warning on line 53 in rocketmq-store/src/ha/general_ha_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/ha/general_ha_service.rs#L52-L53

Added lines #L52 - L53 were not covered by tests
self.default_ha_service = Some(default_ha_service);
}
Ok(())
Expand Down
18 changes: 18 additions & 0 deletions rocketmq-store/src/ha/group_transfer_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

pub struct GroupTransferService;
18 changes: 18 additions & 0 deletions rocketmq-store/src/ha/ha_connection_state_notification_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

pub struct HAConnectionStateNotificationService;
Loading