Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;

import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
Expand Down Expand Up @@ -57,17 +58,18 @@
* Compares the currentState, pendingState with IdealState and generate messages
*/
public class MessageGenerationPhase extends AbstractBaseStage {
private final static String NO_DESIRED_STATE = "NoDesiredState";
private static final String NO_DESIRED_STATE = "NoDesiredState";

// If we see there is any invalid pending message leaving on host, i.e. message
// tells participant to change from SLAVE to MASTER, and the participant is already
// at MASTER state, we wait for timeout and if the message is still not cleaned up by
// participant, controller will cleanup them proactively to unblock further state
// transition
public final static long DEFAULT_OBSELETE_MSG_PURGE_DELAY = HelixUtil
public static final long DEFAULT_OBSELETE_MSG_PURGE_DELAY = HelixUtil
.getSystemPropertyAsLong(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY, 60 * 1000);
private final static String PENDING_MESSAGE = "pending message";
private final static String STALE_MESSAGE = "stale message";
private static final String PENDING_MESSAGE = "pending message";
private static final String STALE_MESSAGE = "stale message";
private static final String OFFLINE = "OFFLINE";

private static Logger logger = LoggerFactory.getLogger(MessageGenerationPhase.class);

Expand Down Expand Up @@ -163,6 +165,16 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr
// desired-state->list of generated-messages
Map<String, List<Message>> messageMap = new HashMap<>();

/*
* Calculate the current active replica count based on state model type.
* This represents the number of replicas currently serving traffic for this partition
* Active replicas include: top states, secondary top states excluding OFFLINE (if
* they exist) and ERROR states. Active replica count also excluded DROPPED states.
* All qualifying state transitions for this partition will receive this same value,
* allowing clients to understand the current availability level and prioritize accordingly.
*/
int currentActiveReplicaCount = calculateCurrentActiveReplicaCount(currentStateMap, stateModelDef);

for (String instanceName : instanceStateMap.keySet()) {

Set<Message> staleMessages = cache.getStaleMessagesByInstance(instanceName);
Expand Down Expand Up @@ -250,17 +262,40 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr
pendingMessage, manager, resource, partition, sessionIdMap, instanceName,
stateModelDef, cancellationMessage, isCancellationEnabled);
} else {
// Set currentActiveReplicaNumber to provide metadata for potential message prioritization by
// participant.
// Assign the current active replica count to all qualifying upward transitions for this
// partition.
// This ensures consistent prioritization metadata across concurrent state transitions.
int currentActiveReplicaNumber = -1; // -1 indicates no prioritization metadata, for eg:
// Downward ST messages get a -1.

/*
Assign currentActiveReplicaNumber for qualifying upward state transitions.
Criteria for assignment:
- Must be an upward state transition according to state model
- Current state must not be considered active (according to state model type)
- Target state must be considered active (according to state model type)
*/
if (stateModelDef.isUpwardStateTransition(currentState, nextState)
&& !isStateActive(currentState, stateModelDef)
&& isStateActive(nextState, stateModelDef)) {

// All qualifying transitions for this partition get the same currentActiveReplicaNumber
currentActiveReplicaNumber = currentActiveReplicaCount;
}

// Create new state transition message
message = MessageUtil
.createStateTransitionMessage(manager.getInstanceName(), manager.getSessionId(),
resource, partition.getPartitionName(), instanceName, currentState, nextState,
sessionIdMap.get(instanceName), stateModelDef.getId());
sessionIdMap.get(instanceName), stateModelDef.getId(), currentActiveReplicaNumber);

if (logger.isDebugEnabled()) {
LogUtil.logDebug(logger, _eventId, String.format(
"Resource %s partition %s for instance %s with currentState %s and nextState %s",
"Resource %s partition %s for instance %s with currentState %s, nextState %s and currentActiveReplicaNumber %d",
resource.getResourceName(), partition.getPartitionName(), instanceName,
currentState, nextState));
currentState, nextState, currentActiveReplicaNumber));
}
}
}
Expand Down Expand Up @@ -290,7 +325,82 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr
} // end of for-each-partition
}

private boolean shouldCreateSTCancellation(Message pendingMessage, String desiredState,
/**
* Calculate the current active replica count based on state model type.
* The count includes replicas in top states, secondary top states (where applicable),
* and ERROR states since helix considers them active.
* State model handling:
* - Single-top state models: Differentiates between patterns with and without secondary top
* states
* - ONLINE-OFFLINE: Counts ONLINE + ERROR states only
* - MASTER-SLAVE-OFFLINE: Counts MASTER + SLAVE + ERROR states
* - ONLINE-STANDBY-OFFLINE: Counts ONLINE + STANDBY + ERROR states
* - Multi-top state models: Counts only top states + ERROR states
* @param currentStateMap Map of instance name to current state for this partition, representing
* the actual state of each replica before any pending transitions
* @param stateModelDef State model definition containing the state hierarchy and transition rules
* used to determine which states are considered active
* @return The number of replicas currently in active states, used to determine the
* currentActiveReplicaNumber for the partition.
*/
private int calculateCurrentActiveReplicaCount(Map<String, String> currentStateMap,
StateModelDefinition stateModelDef) {
List<String> activeSecondaryTopStates = getActiveSecondaryTopStates(stateModelDef);
return (int) currentStateMap.values().stream()
.filter(state -> stateModelDef.getTopState().contains(state) // Top states (MASTER, ONLINE,
// LEADER)
|| activeSecondaryTopStates.contains(state) // Active secondary states (SLAVE, STANDBY,
// BOOTSTRAP)
|| HelixDefinedState.ERROR.name().equals(state) // ERROR states (still considered
// active)
// DROPPED and OFFLINE are automatically excluded by getActiveSecondaryTopStates()
).count();
}

/**
* Get active secondary top states - states that are not non-serving states like OFFLINE and DROPPED.
* Reasons for elimination:
* - getSecondTopStates() can include OFFLINE as a secondary top state in some state models.
* Example - OnlineOffline:
* getSecondTopStates() = ["OFFLINE"] as it transitions to ONLINE.
* After filtering: activeSecondaryTopStates=[] (removes "OFFLINE" as it's not a serving state).
* @param stateModelDef State model definition containing state hierarchy information
*/
private List<String> getActiveSecondaryTopStates(StateModelDefinition stateModelDef) {
return stateModelDef.getSecondTopStates().stream()
// Remove non-serving states
.filter(state -> !OFFLINE.equals(state) && !HelixDefinedState.DROPPED.name().equals(state))
.collect(Collectors.toList());
}

/**
* Determines if the given state is considered active based on the state model type.
* Active states are defined as:
* - For single-top state models: top states, active secondary top states, and ERROR states
* - For multi-top state models: top states and ERROR states
* ERROR state replicas are always considered active in HELIX as they do not affect
* availability.
* @param state The state to check (can be current state or target state)
* @param stateModelDef State model definition containing state hierarchy information
* @return true if the state is considered active, false otherwise
*/
private boolean isStateActive(String state, StateModelDefinition stateModelDef) {
// ERROR state is always considered active regardless of state model type
if (HelixDefinedState.ERROR.name().equals(state)) {
return true;
}

if (stateModelDef.isSingleTopStateModel()) {
// For single-top models, both primary top states and active secondary states are considered active
return stateModelDef.getTopState().contains(state)
|| getActiveSecondaryTopStates(stateModelDef).contains(state);
} else {
// For multi-top models, only top states are considered active
return stateModelDef.getTopState().contains(state);
}
}

private boolean shouldCreateSTCancellation(Message pendingMessage, String desiredState,
String initialState) {
if (pendingMessage == null) {
return false;
Expand Down
44 changes: 37 additions & 7 deletions helix-core/src/main/java/org/apache/helix/model/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ public enum Attributes {
RELAY_FROM,
EXPIRY_PERIOD,
SRC_CLUSTER,
ST_REBALANCE_TYPE
ST_REBALANCE_TYPE,
CURRENT_ACTIVE_REPLICA_NUMBER
}

/**
Expand Down Expand Up @@ -137,12 +138,8 @@ public enum STRebalanceType {
/**
* Compares the creation time of two Messages
*/
public static final Comparator<Message> CREATE_TIME_COMPARATOR = new Comparator<Message>() {
@Override
public int compare(Message m1, Message m2) {
return new Long(m1.getCreateTimeStamp()).compareTo(new Long(m2.getCreateTimeStamp()));
}
};
public static final Comparator<Message> CREATE_TIME_COMPARATOR =
(m1, m2) -> Long.compare(m2.getCreateTimeStamp(), m1.getCreateTimeStamp());

/**
* Instantiate a message
Expand Down Expand Up @@ -935,6 +932,39 @@ public void setSrcClusterName(String clusterName) {
_record.setSimpleField(Attributes.SRC_CLUSTER.name(), clusterName);
}

/**
* Set current active replica count for participant-side message prioritization.
* This field indicates the number of replicas currently in active states (including ERROR states)
* for this partition at the time the state transition message is generated.
* Active states include top states, secondary top states (for single-top state models),
* and ERROR states.
*
* This metadata enables participants to prioritize recovery scenarios (low active counts)
* over load balancing scenarios (high active counts) in custom thread pools or message handlers.
* For example, 2→3 transitions get higher priority than 3→4 transitions.
*
* Default value is -1 for transitions that don't require prioritization metadata.(for eg : downward transitions).
*
* @param currentActiveReplicaNumber the number of currently active replicas (-1 when there is no prioritization metadata,
* >=0 for transitions containing current availability level)
*/
public void setCurrentActiveReplicaNumber(int currentActiveReplicaNumber) {
_record.setIntField(Attributes.CURRENT_ACTIVE_REPLICA_NUMBER.name(), currentActiveReplicaNumber);
}

/**
* Get the current active replica count for this partition at message generation time.
* This value represents the number of replicas in active states (including ERROR states) before
* any state transitions occur, enabling participant-side message prioritization based on
* current availability levels.
* @return current active replica count, or -1 for cases where we don't provide metadata for
* prioritization like downward state transitions.
*/

public int getCurrentActiveReplicaNumber() {
return _record.getIntField(Attributes.CURRENT_ACTIVE_REPLICA_NUMBER.name(), -1);
}

/**
* Check if this message is targetted for a controller
* @return true if this is a controller message, false otherwise
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,21 @@ public OnlineOfflineSMD() {
}

/**
* Build OnlineOffline state model definition
* @return
* Build OnlineOffline state model definition with default replica count
* @return StateModelDefinition for OnlineOffline model
*/
public static StateModelDefinition build() {
StateModelDefinition.Builder builder =new StateModelDefinition.Builder(name);
return build(StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);
}

/**
* Build OnlineOffline state model definition with custom replica count
* @param instanceCount the maximum number of instances that can be in ONLINE state
* @return StateModelDefinition for OnlineOffline model
*/
public static StateModelDefinition build(String instanceCount) {
StateModelDefinition.Builder builder = new StateModelDefinition.Builder(name);

// init state
builder.initialState(States.OFFLINE.name());

Expand All @@ -62,9 +72,8 @@ public static StateModelDefinition build() {
builder.addTransition(States.OFFLINE.name(), States.ONLINE.name(), 1);
builder.addTransition(States.OFFLINE.name(), HelixDefinedState.DROPPED.name());

// bounds
builder.dynamicUpperBound(States.ONLINE.name(),
StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);
// bounds - uses the instanceCount parameter instead of constant
builder.dynamicUpperBound(States.ONLINE.name(), instanceCount);

return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,4 +505,28 @@ public static Map<String, Integer> getStateCounts(Map<String, String> stateMap)
}
return stateCounts;
}

/**
* Check if a state transition is upward
* @param fromState source state
* @param toState destination state
* @return True if it's an upward state transition, false otherwise
*/
public boolean isUpwardStateTransition(String fromState, String toState) {

if (fromState == null || toState == null) {
return false;
}

Map<String, Integer> statePriorityMap = getStatePriorityMap();

Integer fromStateWeight = statePriorityMap.get(fromState);
Integer toStateWeight = statePriorityMap.get(toState);

if (fromStateWeight == null || toStateWeight == null) {
return false;
}

return toStateWeight < fromStateWeight;
}
}
Loading