Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;

import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
Expand Down Expand Up @@ -57,17 +58,18 @@
* Compares the currentState, pendingState with IdealState and generate messages
*/
public class MessageGenerationPhase extends AbstractBaseStage {
private final static String NO_DESIRED_STATE = "NoDesiredState";
private static final String NO_DESIRED_STATE = "NoDesiredState";

// If we see there is any invalid pending message leaving on host, i.e. message
// tells participant to change from SLAVE to MASTER, and the participant is already
// at MASTER state, we wait for timeout and if the message is still not cleaned up by
// participant, controller will cleanup them proactively to unblock further state
// transition
public final static long DEFAULT_OBSELETE_MSG_PURGE_DELAY = HelixUtil
public static final long DEFAULT_OBSELETE_MSG_PURGE_DELAY = HelixUtil
.getSystemPropertyAsLong(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY, 60 * 1000);
private final static String PENDING_MESSAGE = "pending message";
private final static String STALE_MESSAGE = "stale message";
private static final String PENDING_MESSAGE = "pending message";
private static final String STALE_MESSAGE = "stale message";
private static final String OFFLINE = "OFFLINE";

private static Logger logger = LoggerFactory.getLogger(MessageGenerationPhase.class);

Expand Down Expand Up @@ -163,6 +165,16 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr
// desired-state->list of generated-messages
Map<String, List<Message>> messageMap = new HashMap<>();

/*
Calculate the current active replica count based on state model type.
This represents the number of replicas currently serving traffic for this partition
Active replicas include: top states, secondary top states (for single-top state models if
they exist)and ERROR states.
All qualifying state transitions for this partition will receive this same value,
allowing clients to understand the current availability level and prioritize accordingly.
*/
int currentActiveReplicaCount = calculateCurrentActiveReplicaCount(currentStateMap, stateModelDef);

for (String instanceName : instanceStateMap.keySet()) {

Set<Message> staleMessages = cache.getStaleMessagesByInstance(instanceName);
Expand Down Expand Up @@ -250,17 +262,42 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr
pendingMessage, manager, resource, partition, sessionIdMap, instanceName,
stateModelDef, cancellationMessage, isCancellationEnabled);
} else {
// Set currentActiveReplicaNumber to provide metadata for potential message prioritization by
// participant.
// Assign the current active replica count to all qualifying upward transitions for this
// partition.
// This ensures consistent prioritization metadata across concurrent state transitions.
int currentActiveReplicaNumber = -1; // -1 indicates no prioritization metadata, for eg:
// Downward ST messages get a -1.

/*
Assign currentActiveReplicaNumber for qualifying upward state transitions.
Criteria for assignment:
- Must be an upward state transition according to state model
- Current state must not be considered active (according to state model type)
- Target state must be considered active (according to state model type)
*/
if (stateModelDef.isUpwardStateTransition(currentState, nextState)
&& !isCurrentlyActive(currentState, stateModelDef,
stateModelDef.isSingleTopStateModel())
&& isTargetActive(nextState, stateModelDef,
stateModelDef.isSingleTopStateModel())) {

// All qualifying transitions for this partition get the same currentActiveReplicaNumber
currentActiveReplicaNumber = currentActiveReplicaCount;
}

// Create new state transition message
message = MessageUtil
.createStateTransitionMessage(manager.getInstanceName(), manager.getSessionId(),
resource, partition.getPartitionName(), instanceName, currentState, nextState,
sessionIdMap.get(instanceName), stateModelDef.getId());
sessionIdMap.get(instanceName), stateModelDef.getId(), currentActiveReplicaNumber);

if (logger.isDebugEnabled()) {
LogUtil.logDebug(logger, _eventId, String.format(
"Resource %s partition %s for instance %s with currentState %s and nextState %s",
"Resource %s partition %s for instance %s with currentState %s, nextState %s and currentActiveReplicaNumber %d",
resource.getResourceName(), partition.getPartitionName(), instanceName,
currentState, nextState));
currentState, nextState, currentActiveReplicaNumber));
}
}
}
Expand Down Expand Up @@ -290,7 +327,110 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr
} // end of for-each-partition
}

private boolean shouldCreateSTCancellation(Message pendingMessage, String desiredState,
/**
* Calculate the current active replica count based on state model type.
* The count includes replicas in top states, secondary top states (where applicable),
* and ERROR states since helix considers them active.
* State model handling:
* - Single-top state models: Differentiates between patterns with and without secondary top
* states
* - ONLINE-OFFLINE: Counts ONLINE + ERROR states only
* - MASTER-SLAVE-OFFLINE: Counts MASTER + SLAVE + ERROR states
* - ONLINE-STANDBY-OFFLINE: Counts ONLINE + STANDBY + ERROR states
* - Multi-top state models: Counts only top states + ERROR states
* @param currentStateMap Map of instance name to current state for this partition, representing
* the actual state of each replica before any pending transitions
* @param stateModelDef State model definition containing the state hierarchy and transition rules
* used to determine which states are considered active
* @return The number of replicas currently in active states, used to determine the
* currentActiveReplicaNumber for the partition.
*/
private int calculateCurrentActiveReplicaCount(Map<String, String> currentStateMap,
StateModelDefinition stateModelDef) {
List<String> trueSecondaryTopStates = getTrueSecondaryTopStates(stateModelDef);
return (int) currentStateMap.values().stream()
.filter(state -> stateModelDef.getTopState().contains(state) // Top states (MASTER, ONLINE,
// LEADER)
|| trueSecondaryTopStates.contains(state) // True secondary states (SLAVE, STANDBY,
// BOOTSTRAP)
|| HelixDefinedState.ERROR.name().equals(state) // ERROR states (still considered
// active)
// DROPPED and OFFLINE are automatically excluded by getTrueSecondaryTopStates()
).count();
}

/**
* Get true secondary top states - states that:
* 1. Are not the top state itself (avoid double-counting)
* 2. Are not non-serving states like OFFLINE and DROPPED.
* Reasons for elimination:
* - getSecondTopStates() can include the top state itself in some state models.
* Example - OnlineOfflineWithBootstrap:
* topState="ONLINE", getSecondTopStates()=["ONLINE", "BOOTSTRAP"]
* After filtering: trueSecondaryTopStates=["BOOTSTRAP"] (removes "ONLINE" as it is top state.)
* - getSecondTopStates() can also include OFFLINE as a secondary top state in some state models.
* Example - OnlineOffline:
* getSecondTopStates() = ["OFFLINE"] as it transitions to ONLINE.
* After filtering: trueSecondaryTopStates=[] (removes "OFFLINE" as it's not a serving state).
* @param stateModelDef State model definition containing state hierarchy information
*/
private List<String> getTrueSecondaryTopStates(StateModelDefinition stateModelDef) {
return stateModelDef.getSecondTopStates().stream()
// Remove top-state duplicates
.filter(state -> !stateModelDef.getTopState().equals(state))
// Remove non-serving states
.filter(state -> !OFFLINE.equals(state) && !HelixDefinedState.DROPPED.name().equals(state))
.collect(Collectors.toList());
}

/**
* Determines if the given current state is considered active based on the state model type.
* For single-top state models, top, true secondary top, and ERROR states are active.
* For multi-top state models, top and ERROR states are active.
* ERROR state replicas are considered active in HELIX as they do not affect availability.
* @param currentState The current state to check
* @param stateModelDef State model definition containing state hierarchy information
* @param isSingleTopState Whether this is a single-top state model
* @return true if the current state is considered active, false otherwise
*/
private boolean isCurrentlyActive(String currentState, StateModelDefinition stateModelDef,
boolean isSingleTopState) {
// ERROR state is always considered active regardless of state model type
if (HelixDefinedState.ERROR.name().equals(currentState)) {
return true;
}
if (isSingleTopState) {
return stateModelDef.getTopState().contains(currentState)
|| getTrueSecondaryTopStates(stateModelDef).contains(currentState);
} else {
return stateModelDef.getTopState().contains(currentState);
}
}

/**
* Determines if the given target state is considered active based on the state model type.
* For single-top state models, both top,true secondary top and ERROR states are active.
* For multi-top state models, top and ERROR states are active.
* @param targetState The target state to check
* @param stateModelDef State model definition containing state hierarchy information
* @param isSingleTopState Whether this is a single-top state model
* @return true if the target state is considered active, false otherwise
*/
private boolean isTargetActive(String targetState, StateModelDefinition stateModelDef,
boolean isSingleTopState) {
// ERROR state is always considered active regardless of state model type
if (HelixDefinedState.ERROR.name().equals(targetState)) {
return true;
}
if (isSingleTopState) {
return stateModelDef.getTopState().contains(targetState)
|| getTrueSecondaryTopStates(stateModelDef).contains(targetState);
} else {
return stateModelDef.getTopState().contains(targetState);
}
}

private boolean shouldCreateSTCancellation(Message pendingMessage, String desiredState,
String initialState) {
if (pendingMessage == null) {
return false;
Expand Down
44 changes: 37 additions & 7 deletions helix-core/src/main/java/org/apache/helix/model/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ public enum Attributes {
RELAY_FROM,
EXPIRY_PERIOD,
SRC_CLUSTER,
ST_REBALANCE_TYPE
ST_REBALANCE_TYPE,
CURRENT_ACTIVE_REPLICA_NUMBER
}

/**
Expand Down Expand Up @@ -137,12 +138,8 @@ public enum STRebalanceType {
/**
* Compares the creation time of two Messages
*/
public static final Comparator<Message> CREATE_TIME_COMPARATOR = new Comparator<Message>() {
@Override
public int compare(Message m1, Message m2) {
return new Long(m1.getCreateTimeStamp()).compareTo(new Long(m2.getCreateTimeStamp()));
}
};
public static final Comparator<Message> CREATE_TIME_COMPARATOR =
(m1, m2) -> Long.compare(m2.getCreateTimeStamp(), m1.getCreateTimeStamp());

/**
* Instantiate a message
Expand Down Expand Up @@ -935,6 +932,39 @@ public void setSrcClusterName(String clusterName) {
_record.setSimpleField(Attributes.SRC_CLUSTER.name(), clusterName);
}

/**
* Set current active replica count for participant-side message prioritization.
* This field indicates the number of replicas currently in active states (including ERROR states)
* for this partition at the time the state transition message is generated.
* Active states include top states, secondary top states (for single-top state models),
* and ERROR states.
*
* This metadata enables participants to prioritize recovery scenarios (low active counts)
* over load balancing scenarios (high active counts) in custom thread pools or message handlers.
* For example, 2→3 transitions get higher priority than 3→4 transitions.
*
* Default value is -1 for transitions that don't require prioritization metadata.(for eg : downward transitions).
*
* @param currentActiveReplicaNumber the number of currently active replicas (-1 when there is no prioritization metadata,
* >=0 for transitions containing current availability level)
*/
public void setCurrentActiveReplicaNumber(int currentActiveReplicaNumber) {
_record.setIntField(Attributes.CURRENT_ACTIVE_REPLICA_NUMBER.name(), currentActiveReplicaNumber);
}

/**
* Get the current active replica count for this partition at message generation time.
* This value represents the number of replicas in active states (including ERROR states) before
* any state transitions occur, enabling participant-side message prioritization based on
* current availability levels.
* @return current active replica count, or -1 for cases where we don't provide metadata for
* prioritization like downward state transitions.
*/

public int getCurrentActiveReplicaNumber() {
return _record.getIntField(Attributes.CURRENT_ACTIVE_REPLICA_NUMBER.name(), -1);
}

/**
* Check if this message is targetted for a controller
* @return true if this is a controller message, false otherwise
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,21 @@ public OnlineOfflineSMD() {
}

/**
* Build OnlineOffline state model definition
* @return
* Build OnlineOffline state model definition with default replica count
* @return StateModelDefinition for OnlineOffline model
*/
public static StateModelDefinition build() {
StateModelDefinition.Builder builder =new StateModelDefinition.Builder(name);
return build(StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);
}

/**
* Build OnlineOffline state model definition with custom replica count
* @param instanceCount the maximum number of instances that can be in ONLINE state
* @return StateModelDefinition for OnlineOffline model
*/
public static StateModelDefinition build(String instanceCount) {
StateModelDefinition.Builder builder = new StateModelDefinition.Builder(name);

// init state
builder.initialState(States.OFFLINE.name());

Expand All @@ -62,9 +72,8 @@ public static StateModelDefinition build() {
builder.addTransition(States.OFFLINE.name(), States.ONLINE.name(), 1);
builder.addTransition(States.OFFLINE.name(), HelixDefinedState.DROPPED.name());

// bounds
builder.dynamicUpperBound(States.ONLINE.name(),
StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);
// bounds - uses the instanceCount parameter instead of constant
builder.dynamicUpperBound(States.ONLINE.name(), instanceCount);

return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,4 +505,28 @@ public static Map<String, Integer> getStateCounts(Map<String, String> stateMap)
}
return stateCounts;
}

/**
* Check if a state transition is upward
* @param fromState source state
* @param toState destination state
* @return True if it's an upward state transition, false otherwise
*/
public boolean isUpwardStateTransition(String fromState, String toState) {

if (fromState == null || toState == null) {
return false;
}

Map<String, Integer> statePriorityMap = getStatePriorityMap();

Integer fromStateWeight = statePriorityMap.get(fromState);
Integer toStateWeight = statePriorityMap.get(toState);

if (fromStateWeight == null || toStateWeight == null) {
return false;
}

return toStateWeight < fromStateWeight;
}
}
Loading