Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,32 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr
// desired-state->list of generated-messages
Map<String, List<Message>> messageMap = new HashMap<>();

/**
* Calculate the current active replica count based on state model type.
* This represents the number of replicas currently serving traffic for this partition
* Active replicas include: top states, secondary top states (for single-top state models),
* and ERROR states.
* All qualifying state transitions for this partition will receive this same value,
* allowing clients to understand the current availability level and prioritize accordingly.
*/
int currentActiveReplicaCount;
if (stateModelDef.isSingleTopStateModel()) {
// For single-top state models (e.g., OFFLINE→STANDBY→ONLINE)
// Count replicas in top state, secondary top state and ERROR state
currentActiveReplicaCount = (int) currentStateMap.values().stream()
.filter(state -> stateModelDef.getTopState().contains(state)
|| stateModelDef.getSecondTopStates().contains(state)
|| HelixDefinedState.ERROR.name().equals(state))
.count();
} else {
// For multi-top state models (e.g., OFFLINE→ONLINE)
// Count replicas that will be in top state and ERROR state
currentActiveReplicaCount = (int) currentStateMap.values().stream()
.filter(state -> stateModelDef.getTopState().contains(state)
|| HelixDefinedState.ERROR.name().equals(state))
.count();
}

for (String instanceName : instanceStateMap.keySet()) {

Set<Message> staleMessages = cache.getStaleMessagesByInstance(instanceName);
Expand Down Expand Up @@ -250,17 +276,42 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr
pendingMessage, manager, resource, partition, sessionIdMap, instanceName,
stateModelDef, cancellationMessage, isCancellationEnabled);
} else {
// Set currentActiveReplicaNumber to provide metadata for potential message prioritization by
// participant.
// Assign the current active replica count to all qualifying upward transitions for this
// partition.
// This ensures consistent prioritization metadata across concurrent state transitions.
int currentActiveReplicaNumber = -1; // -1 indicates no prioritization metadata, for eg:
// Downward ST messages get a -1.

/**
* Assign currentActiveReplicaNumber for qualifying upward state transitions.
* Criteria for assignment:
* - Must be an upward state transition according to state model
* - Current state must not be considered active (according to state model type)
* - Target state must be considered active (according to state model type)
*/
if (stateModelDef.isUpwardStateTransition(currentState, nextState)
&& !isCurrentlyActive(currentState, stateModelDef,
stateModelDef.isSingleTopStateModel())
&& isTargetActive(nextState, stateModelDef,
stateModelDef.isSingleTopStateModel())) {

// All qualifying transitions for this partition get the same currentActiveReplicaNumber
currentActiveReplicaNumber = currentActiveReplicaCount;
}

// Create new state transition message
message = MessageUtil
.createStateTransitionMessage(manager.getInstanceName(), manager.getSessionId(),
resource, partition.getPartitionName(), instanceName, currentState, nextState,
sessionIdMap.get(instanceName), stateModelDef.getId());
sessionIdMap.get(instanceName), stateModelDef.getId(), currentActiveReplicaNumber);

if (logger.isDebugEnabled()) {
LogUtil.logDebug(logger, _eventId, String.format(
"Resource %s partition %s for instance %s with currentState %s and nextState %s",
"Resource %s partition %s for instance %s with currentState %s, nextState %s and currentActiveReplicaNumber %d",
resource.getResourceName(), partition.getPartitionName(), instanceName,
currentState, nextState));
currentState, nextState, currentActiveReplicaNumber));
}
}
}
Expand Down Expand Up @@ -290,7 +341,54 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr
} // end of for-each-partition
}

private boolean shouldCreateSTCancellation(Message pendingMessage, String desiredState,
/**
* Determines if the given current state is considered active based on the state model type.
* For single-top state models, top, secondary top, and ERROR states are active.
* For multi-top state models, top and ERROR states are active.
* ERROR state replicas are considered active in HELIX as they do not affect availability.
* @param currentState The current state to check
* @param stateModelDef State model definition containing state hierarchy information
* @param isSingleTopState Whether this is a single-top state model
* @return true if the current state is considered active, false otherwise
*/
private boolean isCurrentlyActive(String currentState, StateModelDefinition stateModelDef,
boolean isSingleTopState) {
// ERROR state is always considered active regardless of state model type
if (HelixDefinedState.ERROR.name().equals(currentState)) {
return true;
}
if (isSingleTopState) {
return stateModelDef.getTopState().contains(currentState)
|| stateModelDef.getSecondTopStates().contains(currentState);
} else {
return stateModelDef.getTopState().contains(currentState);
}
}

/**
* Determines if the given target state is considered active based on the state model type.
* For single-top state models, both top,secondary top and ERROR states are active.
* For multi-top state models, top and ERROR states are active.
* @param targetState The target state to check
* @param stateModelDef State model definition containing state hierarchy information
* @param isSingleTopState Whether this is a single-top state model
* @return true if the target state is considered active, false otherwise
*/
private boolean isTargetActive(String targetState, StateModelDefinition stateModelDef,
boolean isSingleTopState) {
// ERROR state is always considered active regardless of state model type
if (HelixDefinedState.ERROR.name().equals(targetState)) {
return true;
}
if (isSingleTopState) {
return stateModelDef.getTopState().contains(targetState)
|| stateModelDef.getSecondTopStates().contains(targetState);
} else {
return stateModelDef.getTopState().contains(targetState);
}
}

private boolean shouldCreateSTCancellation(Message pendingMessage, String desiredState,
String initialState) {
if (pendingMessage == null) {
return false;
Expand Down
44 changes: 37 additions & 7 deletions helix-core/src/main/java/org/apache/helix/model/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ public enum Attributes {
RELAY_FROM,
EXPIRY_PERIOD,
SRC_CLUSTER,
ST_REBALANCE_TYPE
ST_REBALANCE_TYPE,
CURRENT_ACTIVE_REPLICA_NUMBER
}

/**
Expand Down Expand Up @@ -137,12 +138,8 @@ public enum STRebalanceType {
/**
* Compares the creation time of two Messages
*/
public static final Comparator<Message> CREATE_TIME_COMPARATOR = new Comparator<Message>() {
@Override
public int compare(Message m1, Message m2) {
return new Long(m1.getCreateTimeStamp()).compareTo(new Long(m2.getCreateTimeStamp()));
}
};
public static final Comparator<Message> CREATE_TIME_COMPARATOR =
(m1, m2) -> Long.compare(m2.getCreateTimeStamp(), m1.getCreateTimeStamp());

/**
* Instantiate a message
Expand Down Expand Up @@ -935,6 +932,39 @@ public void setSrcClusterName(String clusterName) {
_record.setSimpleField(Attributes.SRC_CLUSTER.name(), clusterName);
}

/**
* Set current active replica count for participant-side message prioritization.
* This field indicates the number of replicas currently in active states (including ERROR states)
* for this partition at the time the state transition message is generated.
* Active states include top states, secondary top states (for single-top state models),
* and ERROR states.
*
* This metadata enables participants to prioritize recovery scenarios (low active counts)
* over load balancing scenarios (high active counts) in custom thread pools or message handlers.
* For example, 2→3 transitions get higher priority than 3→4 transitions.
*
* Default value is -1 for transitions that don't require prioritization metadata.(for eg : downward transitions).
*
* @param currentActiveReplicaNumber the number of currently active replicas (-1 when there is no prioritization metadata,
* >=0 for transitions containing current availability level)
*/
public void setCurrentActiveReplicaNumber(int currentActiveReplicaNumber) {
_record.setIntField(Attributes.CURRENT_ACTIVE_REPLICA_NUMBER.name(), currentActiveReplicaNumber);
}

/**
* Get the current active replica count for this partition at message generation time.
* This value represents the number of replicas in active states (including ERROR states) before
* any state transitions occur, enabling participant-side message prioritization based on
* current availability levels.
* @return current active replica count, or -1 for cases where we don't provide metadata for
* prioritization like downward state transitions.
*/

public int getCurrentActiveReplicaNumber() {
return _record.getIntField(Attributes.CURRENT_ACTIVE_REPLICA_NUMBER.name(), -1);
}

/**
* Check if this message is targetted for a controller
* @return true if this is a controller message, false otherwise
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,4 +505,28 @@ public static Map<String, Integer> getStateCounts(Map<String, String> stateMap)
}
return stateCounts;
}

/**
* Check if a state transition is upward
* @param fromState source state
* @param toState destination state
* @return True if it's an upward state transition, false otherwise
*/
public boolean isUpwardStateTransition(String fromState, String toState) {

if (fromState == null || toState == null) {
return false;
}

Map<String, Integer> statePriorityMap = getStatePriorityMap();

Integer fromStateWeight = statePriorityMap.get(fromState);
Integer toStateWeight = statePriorityMap.get(toState);

if (fromStateWeight == null || toStateWeight == null) {
return false;
}

return toStateWeight < fromStateWeight;
}
}
94 changes: 70 additions & 24 deletions helix-core/src/main/java/org/apache/helix/util/MessageUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public static Message createStateTransitionCancellationMessage(String srcInstanc
toState);

Message message =
createStateTransitionMessage(Message.MessageType.STATE_TRANSITION_CANCELLATION,
createBasicStateTransitionMessage(Message.MessageType.STATE_TRANSITION_CANCELLATION,
srcInstanceName, srcSessionId, resource, partitionName, instanceName, currentState,
nextState, sessionId, stateModelDefName);

Expand All @@ -60,28 +60,6 @@ public static Message createStateTransitionCancellationMessage(String srcInstanc
return null;
}

public static Message createStateTransitionMessage(String srcInstanceName, String srcSessionId,
Resource resource, String partitionName, String instanceName, String currentState,
String nextState, String tgtSessionId, String stateModelDefName) {
Message message =
createStateTransitionMessage(Message.MessageType.STATE_TRANSITION, srcInstanceName,
srcSessionId, resource, partitionName, instanceName, currentState, nextState, tgtSessionId,
stateModelDefName);

// Set the retry count for state transition messages.
// TODO: make the retry count configurable in ClusterConfig or IdealState
message.setRetryCount(DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT);

if (resource.getResourceGroupName() != null) {
message.setResourceGroupName(resource.getResourceGroupName());
}
if (resource.getResourceTag() != null) {
message.setResourceTag(resource.getResourceTag());
}

return message;
}

/**
* Creates a message to change participant status
* {@link org.apache.helix.model.LiveInstance.LiveInstanceStatus}
Expand Down Expand Up @@ -121,7 +99,7 @@ private static Message createBasicMessage(Message.MessageType messageType, Strin
}

/* Creates state transition or state transition cancellation message */
private static Message createStateTransitionMessage(Message.MessageType messageType,
private static Message createBasicStateTransitionMessage(Message.MessageType messageType,
String srcInstanceName, String srcSessionId, Resource resource, String partitionName,
String instanceName, String currentState, String nextState, String tgtSessionId,
String stateModelDefName) {
Expand All @@ -136,4 +114,72 @@ private static Message createStateTransitionMessage(Message.MessageType messageT

return message;
}

/**
* Create a state transition message with replica prioritization metadata
* @param srcInstanceName source instance name
* @param srcSessionId source session id
* @param resource resource
* @param partitionName partition name
* @param instanceName target instance name
* @param currentState current state
* @param nextState next state
* @param tgtSessionId target session id
* @param stateModelDefName state model definition name
* @param currentActiveReplicaNumber The number of replicas currently in active states
* for this partition before any state transitions occur. This metadata
* enables participant-side message prioritization by indicating the
* current availability level (e.g., 0→1 recovery scenarios get higher
* priority than 2→3 load balancing scenarios). Set to -1 for transitions
* that should not be prioritized (downward transitions).
* Active states include top states, secondary top states (for single-top
* state models), and ERROR states since they are still considered active by Helix.
* @return state transition message
*/
public static Message createStateTransitionMessage(String srcInstanceName, String srcSessionId,
Resource resource, String partitionName, String instanceName, String currentState,
String nextState, String tgtSessionId, String stateModelDefName,
int currentActiveReplicaNumber) {
Message message = createBasicStateTransitionMessage(Message.MessageType.STATE_TRANSITION,
srcInstanceName, srcSessionId, resource, partitionName, instanceName, currentState,
nextState, tgtSessionId, stateModelDefName);

// Set the retry count for state transition messages.
// TODO: make the retry count configurable in ClusterConfig or IdealState
message.setRetryCount(DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT);

if (resource.getResourceGroupName() != null) {
message.setResourceGroupName(resource.getResourceGroupName());
}
if (resource.getResourceTag() != null) {
message.setResourceTag(resource.getResourceTag());
}

// Set current active replica number for participant-side prioritization
message.setCurrentActiveReplicaNumber(currentActiveReplicaNumber);

return message;
}

/**
* Create a state transition message (backward compatibility)
* @param srcInstanceName source instance name
* @param srcSessionId source session id
* @param resource resource
* @param partitionName partition name
* @param instanceName target instance name
* @param currentState current state
* @param nextState next state
* @param tgtSessionId target session id
* @param stateModelDefName state model definition name
* @return state transition message
*/
public static Message createStateTransitionMessage(String srcInstanceName, String srcSessionId,
Resource resource, String partitionName, String instanceName, String currentState,
String nextState, String tgtSessionId, String stateModelDefName) {
// currentActiveReplicaNumber is set to -1 for ST messages needing no prioritization metadata
// (backward compatibility)
return createStateTransitionMessage(srcInstanceName, srcSessionId, resource, partitionName,
instanceName, currentState, nextState, tgtSessionId, stateModelDefName, -1);
}
}
Loading