Skip to content

NPU Driven hamgrd infrastructure#145

Open
BYGX-wcr wants to merge 58 commits intosonic-net:masterfrom
BYGX-wcr:npu-driven-hamgrd-infra
Open

NPU Driven hamgrd infrastructure#145
BYGX-wcr wants to merge 58 commits intosonic-net:masterfrom
BYGX-wcr:npu-driven-hamgrd-infra

Conversation

@BYGX-wcr
Copy link

What I did

  1. I added all the basic infrastructure for NPU-driven Hamgrd
  2. I implemented the state machine for NPU-driven HA

Why I did it

To enable NPU-driven HA.

How I verified it

Details if related

Signed-off-by: BYGX-wcr <wcr@live.cn>
Signed-off-by: BYGX-wcr <wcr@live.cn>
Signed-off-by: BYGX-wcr <wcr@live.cn>
Signed-off-by: BYGX-wcr <wcr@live.cn>
Signed-off-by: BYGX-wcr <wcr@live.cn>
Signed-off-by: BYGX-wcr <wcr@live.cn>
Signed-off-by: BYGX-wcr <wcr@live.cn>
Signed-off-by: BYGX-wcr <wcr@live.cn>
Signed-off-by: BYGX-wcr <wcr@live.cn>
Signed-off-by: BYGX-wcr <wcr@live.cn>
Signed-off-by: BYGX-wcr <wcr@live.cn>
Signed-off-by: BYGX-wcr <wcr@live.cn>
Signed-off-by: BYGX-wcr <wcr@live.cn>
Signed-off-by: BYGX-wcr <wcr@live.cn>
Signed-off-by: BYGX-wcr <wcr@live.cn>
Signed-off-by: BYGX-wcr <wcr@live.cn>
Signed-off-by: BYGX-wcr <wcr@live.cn>
Signed-off-by: BYGX-wcr <wcr@live.cn>
Signed-off-by: BYGX-wcr <wcr@live.cn>
Signed-off-by: BYGX-wcr <wcr@live.cn>
Signed-off-by: BYGX-wcr <wcr@live.cn>
Signed-off-by: BYGX-wcr <wcr@live.cn>
Signed-off-by: BYGX-wcr <wcr@live.cn>
Signed-off-by: BYGX-wcr <wcr@live.cn>
@mssonicbld
Copy link

/azp run

@azure-pipelines
Copy link

Azure Pipelines will not run the associated pipelines, because the pull request was updated after the run command was issued. Review the pull request again and issue a new run command.

Signed-off-by: BYGX-wcr <wcr@live.cn>
@mssonicbld
Copy link

/azp run

… ha scope

Signed-off-by: BYGX-wcr <wcr@live.cn>
@mssonicbld
Copy link

/azp run

Signed-off-by: BYGX-wcr <wcr@live.cn>
@BYGX-wcr BYGX-wcr force-pushed the npu-driven-hamgrd-infra branch from 12b68c9 to 2282938 Compare February 20, 2026 19:30
@BYGX-wcr
Copy link
Author

/azpw run

@mssonicbld
Copy link

/AzurePipelines run

Signed-off-by: BYGX-wcr <wcr@live.cn>
@mssonicbld
Copy link

/azp run

Signed-off-by: BYGX-wcr <wcr@live.cn>
@mssonicbld
Copy link

/azp run

@mssonicbld
Copy link

/azp run

@BYGX-wcr BYGX-wcr marked this pull request as ready for review February 24, 2026 21:59
Copilot AI review requested due to automatic review settings February 24, 2026 21:59
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements NPU-driven High Availability (HA) infrastructure for SONiC SmartSwitch DASH deployments, adding a complete state machine and supporting components alongside the existing DPU-driven HA implementation.

Changes:

  • Adds delayed message capability to swbus-actor for scheduled peer communication
  • Implements comprehensive NPU-driven HA state machine with peer voting, heartbeat, and bulk sync protocols
  • Refactors HaScopeActor into an enum supporting both DPU and NPU-driven modes with shared base functionality
  • Adds new database structures for flow sync sessions and HA set state tracking
  • Updates HaSetActor to handle NPU-driven mode and route updates based on HA owner

Reviewed changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 13 comments.

Show a summary per file
File Description
crates/swbus-actor/src/state/outgoing.rs Adds send_with_delay method for scheduling messages; contains critical bug in delay logic
crates/hamgrd/src/ha_actor_messages.rs Defines new message types for NPU HA protocol (PeerHeartbeat, VoteRequest/Reply, BulkSyncUpdate, HAStateChanged, SelfNotification); adds vdpu_ids and owner fields to existing messages
crates/hamgrd/src/db_structs.rs Adds DashFlowSyncSessionTable, DashFlowSyncSessionState, DpuDashHaSetState; adds ha_term and peer state tracking to NpuDashHaScopeState
crates/hamgrd/src/actors/test.rs Extends test macros to support excluding fields from comparison for dynamic values like timestamps
crates/hamgrd/src/actors/ha_set.rs Adds dp_channel_is_alive and ha_owner tracking; conditionally updates VNET routes based on owner; handles HaScopeActorState updates for NPU mode
crates/hamgrd/src/actors/ha_scope/npu.rs Implements complete NPU-driven HA state machine with peer communication, voting protocol, and state transitions
crates/hamgrd/src/actors/ha_scope/mod.rs Refactors HaScopeActor into enum with Dpu/Npu variants; adds comprehensive tests for both modes
crates/hamgrd/src/actors/ha_scope/dpu.rs Extracts DPU-driven implementation from monolithic actor
crates/hamgrd/src/actors/ha_scope/base.rs Shared base functionality for both DPU and NPU variants
crates/hamgrd/src/main.rs Adds producer bridge for DASH_FLOW_SYNC_SESSION_TABLE

@BYGX-wcr BYGX-wcr changed the title NPU Driven hamgrd NPU Driven hamgrd infrastructure Feb 24, 2026
Signed-off-by: BYGX-wcr <wcr@live.cn>
@mssonicbld
Copy link

/azp run

…orState

Signed-off-by: BYGX-wcr <wcr@live.cn>
Copilot AI review requested due to automatic review settings February 25, 2026 18:09
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 11 changed files in this pull request and generated 7 comments.

Comments suppressed due to low confidence (1)

crates/swbus-actor/src/state/outgoing.rs:88

  • In send_queued_messages, delayed messages keep their original time_sent (scheduled time) even when they are actually sent. Since time_sent is later used by the resend/retention logic (get_elapsed_time(&msg.time_sent)), this can cause immediate resends (or premature dropping) if the actor didn't process the queue until long after the scheduled send time. Consider updating msg.time_sent = SystemTime::now() right before the first successful send_raw so resend timing is based on the real send time.
        let mut delayed_messages: Vec<UnackedMessage> = Vec::new();
        for msg in self.queued_messages.drain(..) {
            if SystemTime::now() < msg.time_sent {
                // The message hasn't reach its sending time yet
                delayed_messages.push(msg);
                continue;
            }

            debug!("Sending message: {msg:?}");
            self.swbus_client
                .send_raw(msg.swbus_message.clone())
                .await
                .expect("Sending swbus message failed");

            let id = msg.swbus_message.header.as_ref().unwrap().id;

ActorMessage::new(
Self::msg_key(my_id),
&Self {
up: true,
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HaSetActorState::new_actor_msg ignores its up parameter and always serializes up: true. Callers now pass self.dp_channel_is_alive as up, but the value will never be reflected in messages. Either use the up argument when building Self, or remove the parameter and keep the field constant (and adjust call sites accordingly).

Suggested change
up: true,
up,

Copilot uses AI. Check for mistakes.
Comment on lines +117 to +126
pub fn decode_hascope_actor_message<T>(&self, incoming: &Incoming, key: &str) -> Option<T>
where
T: DeserializeOwned,
{
let msg = incoming.get(key)?;
match msg.deserialize_data() {
Ok(data) => Some(data),
Err(e) => {
error!("Failed to deserialize VoteReply from message: {}", e);
None
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decode_hascope_actor_message is generic over message types, but the error log always says "Failed to deserialize VoteReply". This is misleading when decoding other messages (e.g., HaScopeActorState, VoteRequest, etc.). Consider making the log message generic or including the target type/key in the log.

Copilot uses AI. Check for mistakes.
Comment on lines +19 to +27
pub struct NpuHaScopeActor {
pub(super) base: HaScopeBase,
/// Target state that HAmgrd should transition to upon HA events
pub(super) target_ha_scope_state: Option<TargetState>,
/// Retry count used for voting
pub(super) retry_count: u32,
/// Is peer connected?
pub(super) peer_connected: bool,
}
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retry_count is documented as "used for voting", but it's also used by the peer-connection retry loop (CheckPeerConnection). Sharing a single counter across independent retry workflows can cause incorrect behavior (e.g., vote retries affecting peer connection retries, or vice versa). Consider splitting this into separate counters (e.g., vote_retry_count and peer_connect_retry_count).

Copilot uses AI. Check for mistakes.
Comment on lines +1097 to +1103
let bulk_sync_session = DashFlowSyncSessionTable {
ha_set_id: ha_set_id.clone(),
target_server_ip: haset.ha_set.peer_ip.clone(),
target_server_port: haset
.ha_set
.cp_data_channel_port
.expect("cp_data_channel_port must be configured"),
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add_bulk_sync_session uses expect("cp_data_channel_port must be configured") on an optional config field. If cp_data_channel_port is missing, hamgrd will panic and crash. Prefer returning an error (or skipping bulk sync) so the actor can continue running and surface a clear error/log message.

Suggested change
let bulk_sync_session = DashFlowSyncSessionTable {
ha_set_id: ha_set_id.clone(),
target_server_ip: haset.ha_set.peer_ip.clone(),
target_server_port: haset
.ha_set
.cp_data_channel_port
.expect("cp_data_channel_port must be configured"),
let Some(cp_data_channel_port) = haset.ha_set.cp_data_channel_port else {
warn!(
"HA-SET {} missing cp_data_channel_port. Cannot start bulk sync session!",
&ha_set_id
);
return Ok(None);
};
let bulk_sync_session = DashFlowSyncSessionTable {
ha_set_id: ha_set_id.clone(),
target_server_ip: haset.ha_set.peer_ip.clone(),
target_server_port: cp_data_channel_port,

Copilot uses AI. Check for mistakes.
Comment on lines +43 to +60
if key == HaScopeActor::table_name() {
match self.handle_dash_ha_scope_config_table_message_npu_driven(state, key, context) {
Ok(incoming_event) => {
event = Some(incoming_event);
}
Err(_e) => {
error!("Error when processing HA Scope Config Table Update!")
}
}
} else if key.starts_with(DpuDashHaScopeState::table_name()) {
// Update NPU ha scope state based on dpu ha scope state update
match self.handle_dpu_ha_scope_state_update_npu_driven(state) {
Ok(incoming_event) => {
event = Some(incoming_event);
}
Err(_e) => {
error!("Error when processing DPU HA Scope State Update!")
}
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Several Err(_e) branches in handle_message_inner discard the underlying error value, which makes debugging production failures much harder. Consider logging the actual error (and ideally the key) in these branches, or propagating the error upward when appropriate.

Copilot uses AI. Check for mistakes.
@BYGX-wcr
Copy link
Author

/azpw run

@mssonicbld
Copy link

/AzurePipelines run

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants