Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
2a0fbb0
add basic divergence between DPU-driven mode and NPU-driven mode
BYGX-wcr Jan 15, 2026
055e2bb
enhance ha_set actor to handle ha_set dplane channel up/down signal
BYGX-wcr Jan 21, 2026
1093c4c
add DpuDashHaSetState table
BYGX-wcr Jan 21, 2026
2bf826e
Define three new actor message types
BYGX-wcr Jan 21, 2026
cfdec81
add vdpu_ids in HaSetActorState messages
BYGX-wcr Jan 22, 2026
3bd2582
initial NPU-driven HA message processing
BYGX-wcr Jan 24, 2026
f9269a3
define HAStateChanged
BYGX-wcr Feb 3, 2026
0b01bd1
add peer_ha_state_last_updated_time_in_ms in db_struct.rs
BYGX-wcr Feb 4, 2026
bd24554
add term and timestamp to HAStateChanged
BYGX-wcr Feb 4, 2026
e8aaa6e
remove unnecessary registration types
BYGX-wcr Feb 5, 2026
826d521
revision based on latest update on HLD
BYGX-wcr Feb 5, 2026
b4ea39a
add self notification in ha actor messages
BYGX-wcr Feb 5, 2026
f970a7d
revise state machines
BYGX-wcr Feb 5, 2026
7c5bb10
adding more HA events
BYGX-wcr Feb 7, 2026
9bfdaf1
add DASH_FLOW_SYNC_SESSION_TABLE in db_struct.rs
BYGX-wcr Feb 9, 2026
50985c4
add DASH_FLOW_SYNC_SESSION_STATE in db_struct.rs
BYGX-wcr Feb 9, 2026
95c491d
add bulk sync handling
BYGX-wcr Feb 9, 2026
b08dd0b
add admin launch handling and vote request initiation
BYGX-wcr Feb 10, 2026
ce68f83
add handling for DPU state changes
BYGX-wcr Feb 10, 2026
ca12c33
add handling for HA term changes
BYGX-wcr Feb 11, 2026
2a7e7d3
add notification for ha owner from ha scope actor to ha set actor
BYGX-wcr Feb 13, 2026
2787353
fix syntax error
BYGX-wcr Feb 13, 2026
22bc758
move generic functions to common code section
BYGX-wcr Feb 13, 2026
699d5d2
add VNET route update based on HA scope state update in HA set actor
BYGX-wcr Feb 14, 2026
0fef08a
remove forbidden as_str and rom_str impl for HaState
BYGX-wcr Feb 14, 2026
65b6563
remove forbidden as_str and rom_str impl for HaRole
BYGX-wcr Feb 14, 2026
4cfdf8d
remove forbidden as_str and rom_str impl for DesiredHaStae
BYGX-wcr Feb 14, 2026
2a89547
add emissions of HaScopeActorState
BYGX-wcr Feb 14, 2026
69593f5
Merge branch 'master' into npu-driven-hamgrd-infra
BYGX-wcr Feb 17, 2026
76beee4
fix for format checks
BYGX-wcr Feb 17, 2026
b056b08
fix for compilation errors
BYGX-wcr Feb 17, 2026
067fd7b
add missing Duration definitions
BYGX-wcr Feb 17, 2026
63903ac
redesign peer connection check
BYGX-wcr Feb 18, 2026
ffe0597
fix for compilation errors
BYGX-wcr Feb 18, 2026
4c35f26
fix format errors
BYGX-wcr Feb 18, 2026
13a8936
fix for compilation errors
BYGX-wcr Feb 18, 2026
daf2ba0
fix for compilation errors
BYGX-wcr Feb 19, 2026
dbbefd9
fix for compilation errors
BYGX-wcr Feb 19, 2026
c3ebb72
Merge branch 'master' into npu-driven-hamgrd-infra
BYGX-wcr Feb 19, 2026
da85c87
fix for formatting
BYGX-wcr Feb 19, 2026
4a4a38c
fix for compilation errors
BYGX-wcr Feb 19, 2026
bc451c4
fix for compilation errors
BYGX-wcr Feb 19, 2026
6252b90
fix for compilation errors
BYGX-wcr Feb 20, 2026
743746c
fix for compilation errors
BYGX-wcr Feb 20, 2026
987256f
fix for document build errors
BYGX-wcr Feb 20, 2026
640a8ee
make linter happy
BYGX-wcr Feb 20, 2026
a79ffeb
fix for compilation errors
BYGX-wcr Feb 20, 2026
7145ccd
make linter happy
BYGX-wcr Feb 20, 2026
23332c9
make linter happy
BYGX-wcr Feb 20, 2026
aab7b9e
major code refactoring to separate npu-driven ha scope and dpu-driven…
BYGX-wcr Feb 20, 2026
2282938
fix for compilation errors
BYGX-wcr Feb 20, 2026
7a4cd86
fix UTs
BYGX-wcr Feb 21, 2026
f0e7ee5
fix formatting errors
BYGX-wcr Feb 22, 2026
bf30454
implement the original missing mechanism to send a swbus message with…
BYGX-wcr Feb 24, 2026
9eb7750
implement the 'exclude' keyword for recv! macro
BYGX-wcr Feb 24, 2026
c4719ba
add unit tests for NPU-driven HA scope actor / fix bugs / refactoring
BYGX-wcr Feb 24, 2026
331d1f1
fix typos
BYGX-wcr Feb 24, 2026
7fcfd3c
refactor the code to merge the usage of HaStateChanged and HaScopeAct…
BYGX-wcr Feb 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
963 changes: 0 additions & 963 deletions crates/hamgrd/src/actors/ha_scope.rs

This file was deleted.

422 changes: 422 additions & 0 deletions crates/hamgrd/src/actors/ha_scope/base.rs

Large diffs are not rendered by default.

313 changes: 313 additions & 0 deletions crates/hamgrd/src/actors/ha_scope/dpu.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,313 @@
use crate::actors::{spawn_consumer_bridge_for_actor, DbBasedActor};
use crate::db_structs::*;
use crate::ha_actor_messages::*;
use anyhow::Result;
use sonic_common::SonicDbTable;
use sonic_dash_api_proto::decode_from_field_values;
use sonic_dash_api_proto::ha_scope_config::{DesiredHaState, HaScopeConfig};
use swbus_actor::{ActorMessage, Context, State};
use swss_common::{KeyOpFieldValues, KeyOperation};
use tracing::{debug, error};
use uuid::Uuid;

use super::base::HaScopeBase;
use super::HaScopeActor;

pub struct DpuHaScopeActor {
pub(super) base: HaScopeBase,
}

impl DpuHaScopeActor {
/// Main message dispatch for DPU-driven mode
pub async fn handle_message_inner(&mut self, state: &mut State, key: &str, context: &mut Context) -> Result<()> {
if key == HaScopeActor::table_name() {
return self.handle_dash_ha_scope_config_table_message_dpu_driven(state, key);
}
if VDpuActorState::is_my_msg(key) {
return self.handle_vdpu_state_update(state, context).await;
}
if HaSetActorState::is_my_msg(key) {
return self.handle_haset_state_update(state);
}
if key.starts_with(DpuDashHaScopeState::table_name()) {
// dpu ha scope state update
return self.handle_dpu_ha_scope_state_update(state);
}
Ok(())
}

/// Handles updates to the DASH_HA_SCOPE_CONFIG_TABLE in the case of DPU-driven HA.
/// Updates the actor's internal config and performs any necessary initialization or subscriptions.
/// Update DPU DASH_HA_SCOPE_TABLE
/// Update NPU DASH_HA_SCOPE_STATE if approved_pending_operation_ids is not empty
fn handle_dash_ha_scope_config_table_message_dpu_driven(&mut self, state: &mut State, key: &str) -> Result<()> {
let (_internal, incoming, _outgoing) = state.get_all();

// Retrieve the config update from the incoming message
let kfv: KeyOpFieldValues = incoming.get_or_fail(key)?.deserialize_data()?;
let dash_ha_scope_config: HaScopeConfig = decode_from_field_values(&kfv.field_values)?;

// Update internal config
self.base.dash_ha_scope_config = Some(dash_ha_scope_config);

// this is not a ha_scope for the target vDPU. Skip
if !self.base.vdpu_is_managed(incoming) {
return Ok(());
}

// update the DASH_HA_SCOPE_TABLE in DPU
self.update_dpu_ha_scope_table(state)?;

// update the NPU DASH_HA_SCOPE_STATE because some fields are derived from dash_ha_scope_config
self.update_npu_ha_scope_state_ha_state(state)?;

// need to update operation list if approved_pending_operation_ids is not empty
let approved_pending_operation_ids = self
.base
.dash_ha_scope_config
.as_ref()
.unwrap()
.approved_pending_operation_ids
.clone();

if !approved_pending_operation_ids.is_empty() {
self.base.update_npu_ha_scope_state_pending_operations(
state,
Vec::new(),
approved_pending_operation_ids,
)?;
}

Ok(())
}

/// Handles VDPU state update messages for this HA scope.
/// If the vdpu is unmanaged, the actor is put in dormant state. Otherwise, the actor subscribes to the
/// DASH_HA_SCOPE_STATE table and updates the NPU HA scope state.
async fn handle_vdpu_state_update(&mut self, state: &mut State, context: &mut Context) -> Result<()> {
let (internal, incoming, _outgoing) = state.get_all();
let Some(vdpu) = self.base.get_vdpu(incoming) else {
error!("Failed to retrieve vDPU {} from incoming state", &self.base.vdpu_id);
return Ok(());
};

if !vdpu.dpu.is_managed {
debug!("vDPU {} is unmanaged. Put actor in dormant state", &self.base.vdpu_id);
return Ok(());
}

// create an internal entry for npu STATE_DB/DASH_HA_SCOPE_STATE, which will be the
// notification channel to SDN controller
let swss_key = format!(
"{}{}{}",
self.base.vdpu_id,
NpuDashHaScopeState::key_separator(),
self.base.ha_scope_id
);
if !internal.has_entry(NpuDashHaScopeState::table_name(), &swss_key) {
let db = crate::db_for_table::<NpuDashHaScopeState>().await?;
let table = swss_common::Table::new_async(db, NpuDashHaScopeState::table_name()).await?;
internal.add(NpuDashHaScopeState::table_name(), table, swss_key).await;
}

if self.base.bridges.is_empty() {
// subscribe to dpu DASH_HA_SCOPE_STATE
self.base.bridges.push(
spawn_consumer_bridge_for_actor::<DpuDashHaScopeState>(
context.get_edge_runtime().clone(),
HaScopeActor::name(),
Some(&self.base.id),
true,
)
.await?,
);
}
// ha_scope_table in dpu has no info derived from vDPU but it won't be programed until we receive vDPU which confirms the vDPU is managed
self.update_dpu_ha_scope_table(state)?;
self.base.update_npu_ha_scope_state_base(state)?;
Ok(())
}

/// Handles HaSet state update messages for this HA scope.
/// Update NPU DASH_HA_SCOPE_STATE
fn handle_haset_state_update(&mut self, state: &mut State) -> Result<()> {
// the ha_scope is not managing the target vDPU. Skip
let incoming = state.incoming();
if !self.base.vdpu_is_managed(incoming) {
return Ok(());
}

// ha_scope vip_v4 and vip_v6 are derived from ha_set
self.update_dpu_ha_scope_table(state)?;
self.base.update_npu_ha_scope_state_base(state)?;
Ok(())
}

/// Handles DPU DASH_HA_SCOPE_STATE update messages for this HA scope.
/// Update NPU DASH_HA_SCOPE_STATE ha_state related fields
/// Update NPU DASH_HA_SCOPE_STATE pending operation list if there are new operations requested by DPU
fn handle_dpu_ha_scope_state_update(&mut self, state: &mut State) -> Result<()> {
let (_internal, incoming, _) = state.get_all();
// calculate operation requested by dpu
let Some(new_dpu_ha_scope_state) = self.base.get_dpu_ha_scope_state(incoming) else {
// no valid state received from dpu, skip
return Ok(());
};
let mut operations: Vec<(String, String)> = Vec::new();

// if hamgrd is restarted, we will lose the cached old state. In this case, we will treat
// all pending operations as new and request the sdn controller via npu dash_ha_scope_state
// to take action. If these have been notified to sdn controller prior to hamgrd restart,
// they will be no change to dash_ha_scope_state and no action will be taken by sdn controller.
let old_dpu_ha_scope_state = self.base.dpu_ha_scope_state.as_ref().cloned().unwrap_or_default();
if new_dpu_ha_scope_state.activate_role_pending && !old_dpu_ha_scope_state.activate_role_pending {
operations.push((Uuid::new_v4().to_string(), "activate_role".to_string()));
}

if new_dpu_ha_scope_state.brainsplit_recover_pending && !old_dpu_ha_scope_state.brainsplit_recover_pending {
operations.push((Uuid::new_v4().to_string(), "brainsplit_recover".to_string()));
}

if new_dpu_ha_scope_state.flow_reconcile_pending && !old_dpu_ha_scope_state.flow_reconcile_pending {
operations.push((Uuid::new_v4().to_string(), "flow_reconcile".to_string()));
}

self.base.dpu_ha_scope_state = Some(new_dpu_ha_scope_state);

self.update_npu_ha_scope_state_ha_state(state)?;

if !operations.is_empty() {
self.base
.update_npu_ha_scope_state_pending_operations(state, operations, Vec::new())?;
}

Ok(())
}

/// Update DPU HA Scope Table purely based on HA Scope & HA Set Config, no flex parameters
/// Used in DPU-driven mode
fn update_dpu_ha_scope_table(&self, state: &mut State) -> Result<()> {
let Some(dash_ha_scope_config) = self.base.dash_ha_scope_config.as_ref() else {
return Ok(());
};

let (internal, incoming, outgoing) = state.get_all();

let ha_set_id = self.base.get_haset_id().unwrap();
let Some(haset) = self.base.get_haset(incoming) else {
debug!(
"HA-SET {} has not been received. Skip DASH_HA_SCOPE_TABLE update",
&ha_set_id
);
return Ok(());
};

let mut activate_role_requested = false;
let mut flow_reconcile_requested = false;
let approved_ops = dash_ha_scope_config.approved_pending_operation_ids.clone();
if !approved_ops.is_empty() {
let pending_operations = self.base.get_pending_operations(internal, None)?;
for op_id in approved_ops {
let Some(op) = pending_operations.get(&op_id) else {
// has been removed from pending list
continue;
};
match op.as_str() {
"switchover" => {
// todo: this is for switch driven ha
}
"activate_role" => {
activate_role_requested = true;
}
"flow_reconcile" => {
flow_reconcile_requested = true;
}
"brainsplit_recover" => {
// todo: what's the action here?
}
_ => {
error!("Unknown operation type {}", op);
}
}
}
}

let dash_ha_scope = DashHaScopeTable {
version: dash_ha_scope_config.version.parse().unwrap(),
disabled: dash_ha_scope_config.disabled,
ha_set_id: dash_ha_scope_config.ha_set_id.clone(),
vip_v4: haset.ha_set.vip_v4.clone(),
vip_v6: haset.ha_set.vip_v6.clone(),
ha_role: if dash_ha_scope_config.desired_ha_state == DesiredHaState::Unspecified as i32 {
"standby".to_string()
} else {
format!(
"{}",
DesiredHaState::try_from(dash_ha_scope_config.desired_ha_state).unwrap()
)
.to_lowercase()
}, /*todo, how switching_to_active is derived. Is it relevant to dpu driven mode */
ha_term: "0".to_string(), // TODO: not clear what need to be done for DPU-driven mode
flow_reconcile_requested,
activate_role_requested,
};

let fv = swss_serde::to_field_values(&dash_ha_scope)?;
let kfv = KeyOpFieldValues {
key: self.base.ha_scope_id.clone(),
operation: KeyOperation::Set,
field_values: fv,
};

let msg = ActorMessage::new(self.base.ha_scope_id.clone(), &kfv)?;
outgoing.send(outgoing.common_bridge_sp::<DashHaScopeTable>(), msg);

Ok(())
}

fn update_npu_ha_scope_state_ha_state(&self, state: &mut State) -> Result<()> {
let Some(ref dash_ha_scope_config) = self.base.dash_ha_scope_config else {
return Ok(());
};
let (internal, incoming, _outgoing) = state.get_all();

let Some(mut npu_ha_scope_state) = self.base.get_npu_ha_scope_state(internal) else {
tracing::info!("Cannot update STATE_DB/DASH_HA_SCOPE_STATE until it is populated with basic information",);
return Ok(());
};

let Some(dpu_ha_scope_state) = self.base.get_dpu_ha_scope_state(incoming) else {
debug!(
"DPU HA-SCOPE STATE {} is corrupted or has not been received. Skip DASH_HA_SCOPE_STATE update",
&self.base.id
);
return Ok(());
};

// in dpu driven mode, local_ha_state is same as dpu acked ha state
npu_ha_scope_state.local_ha_state = Some(dpu_ha_scope_state.ha_state.clone());
npu_ha_scope_state.local_ha_state_last_updated_time_in_ms = Some(dpu_ha_scope_state.ha_role_start_time);
// The reason of the last HA state change.
npu_ha_scope_state.local_ha_state_last_updated_reason = Some("dpu initiated".to_string());

// The target HA state in ASIC. This is the state that hamgrd generates and asking DPU to move to.
npu_ha_scope_state.local_target_asic_ha_state = Some(
format!(
"{}",
DesiredHaState::try_from(dash_ha_scope_config.desired_ha_state).unwrap()
)
.to_lowercase(),
);
// The HA state that ASIC acked.
npu_ha_scope_state.local_acked_asic_ha_state = Some(dpu_ha_scope_state.ha_state.clone());

// The current target term of the HA state machine. in dpu-driven mode, use the term acked by asic
npu_ha_scope_state.local_target_term = Some(dpu_ha_scope_state.ha_term.clone());
npu_ha_scope_state.local_acked_term = Some(dpu_ha_scope_state.ha_term);

let fvs = swss_serde::to_field_values(&npu_ha_scope_state)?;
internal.get_mut(NpuDashHaScopeState::table_name()).clone_from(&fvs);

Ok(())
}
}
Loading
Loading