Skip to content

Commit 9b80190

Browse files
author
Charanya Sudharsanan
committed
Update based on review/feedback
1 parent 9392e6e commit 9b80190

File tree

4 files changed

+93
-110
lines changed

4 files changed

+93
-110
lines changed

helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java

Lines changed: 8 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -164,14 +164,14 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr
164164
Map<String, List<Message>> messageMap = new HashMap<>();
165165

166166
// Calculate metadata for message prioritization
167-
int totalInstanceCount = instanceStateMap.keySet().size();
167+
int partitionInstanceCount = instanceStateMap.keySet().size();
168168

169169
// Get pending upward state transition messages to second top or top state
170-
List<Message> pendingUpwardStateTransitionMessages = getPendingUpwardStateTransitionMessages(
170+
List<Message> pendingUpwardStateTransitionMessagesToTopOrSecondTopStates = getPendingTransitionsToTopOrSecondTopStates(
171171
resourceName, partition, currentStateOutput, stateModelDef);
172172

173173
// Initialize replica counter for prioritization
174-
int initialReplicaNumber = totalInstanceCount - pendingUpwardStateTransitionMessages.size();
174+
int initialReplicaNumber = partitionInstanceCount - pendingUpwardStateTransitionMessagesToTopOrSecondTopStates.size();
175175

176176
for (String instanceName : instanceStateMap.keySet()) {
177177

@@ -260,16 +260,14 @@ private void generateMessage(final Resource resource, final BaseControllerDataPr
260260
pendingMessage, manager, resource, partition, sessionIdMap, instanceName,
261261
stateModelDef, cancellationMessage, isCancellationEnabled);
262262
} else {
263-
// Default currentReplicaNumber is -1 (no prioritization)
263+
// Default currentReplicaNumber is -1 (provides metadata for participant-side prioritization)
264264
int currentReplicaNumber = -1;
265265
// Check if this is an upward state transition from non-second top state to second top
266266
// or top state
267-
if (isUpwardStateTransition(currentState, nextState, stateModelDef)
267+
if (stateModelDef.isUpwardStateTransition(currentState, nextState)
268268
&& !stateModelDef.getSecondTopStates().contains(currentState)
269269
&& (isSecondTopState(nextState, stateModelDef)
270-
|| isTopState(nextState, stateModelDef))
271-
&& !isInPendingMessages(resourceName, partition, instanceName, currentState,
272-
nextState, pendingUpwardStateTransitionMessages)) {
270+
|| isTopState(nextState, stateModelDef))) {
273271

274272
// Assign the replica number for prioritization
275273
currentReplicaNumber = initialReplicaNumber--;
@@ -335,57 +333,6 @@ private boolean isSecondTopState(String state, StateModelDefinition stateModelDe
335333
return stateModelDef.getSecondTopStates().contains(state);
336334
}
337335

338-
/**
339-
* Check if a state transition is already in the pending messages
340-
* @param resourceName The resource name
341-
* @param partition The partition
342-
* @param instanceName The instance name
343-
* @param fromState The from state
344-
* @param toState The to state
345-
* @param pendingMessages The list of pending messages
346-
* @return True if the state transition is already in pending messages, false otherwise
347-
*/
348-
private boolean isInPendingMessages(String resourceName, Partition partition, String instanceName,
349-
String fromState, String toState, List<Message> pendingMessages) {
350-
351-
for (Message message : pendingMessages) {
352-
if (message.getResourceName().equals(resourceName)
353-
&& message.getPartitionName().equals(partition.getPartitionName())
354-
&& message.getTgtName().equals(instanceName) && message.getFromState().equals(fromState)
355-
&& message.getToState().equals(toState)) {
356-
return true;
357-
}
358-
}
359-
360-
return false;
361-
}
362-
363-
/**
364-
* Check if a state transition is upward
365-
* @param fromState The from state
366-
* @param toState The to state
367-
* @param stateModelDef The state model definition
368-
* @return True if it's an upward state transition, false otherwise
369-
*/
370-
private boolean isUpwardStateTransition(String fromState, String toState,
371-
StateModelDefinition stateModelDef) {
372-
373-
if (fromState == null || toState == null) {
374-
return false;
375-
}
376-
377-
Map<String, Integer> statePriorityMap = stateModelDef.getStatePriorityMap();
378-
379-
Integer fromStateWeight = statePriorityMap.get(fromState);
380-
Integer toStateWeight = statePriorityMap.get(toState);
381-
382-
if (fromStateWeight == null || toStateWeight == null) {
383-
return false;
384-
}
385-
386-
return toStateWeight < fromStateWeight;
387-
}
388-
389336
/**
390337
* Get pending upward state transition messages from non-second top state to second top or top
391338
* state
@@ -395,7 +342,7 @@ private boolean isUpwardStateTransition(String fromState, String toState,
395342
* @param stateModelDef The state model definition
396343
* @return List of pending messages for upward state transitions
397344
*/
398-
private List<Message> getPendingUpwardStateTransitionMessages(String resourceName,
345+
private List<Message> getPendingTransitionsToTopOrSecondTopStates(String resourceName,
399346
Partition partition, CurrentStateOutput currentStateOutput,
400347
StateModelDefinition stateModelDef) {
401348
List<Message> pendingUpwardSTMessages = new ArrayList<>();
@@ -411,7 +358,7 @@ private List<Message> getPendingUpwardStateTransitionMessages(String resourceNam
411358

412359
// Check if it's an upward state transition from non-second top state to second top or top
413360
// state
414-
if (isUpwardStateTransition(fromState, toState, stateModelDef)
361+
if (stateModelDef.isUpwardStateTransition(fromState, toState)
415362
&& !isSecondTopState(fromState, stateModelDef)
416363
&& (isSecondTopState(toState, stateModelDef) || isTopState(toState, stateModelDef))) {
417364
pendingUpwardSTMessages.add(message);

helix-core/src/main/java/org/apache/helix/model/Message.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -138,12 +138,8 @@ public enum STRebalanceType {
138138
/**
139139
* Compares the creation time of two Messages
140140
*/
141-
public static final Comparator<Message> CREATE_TIME_COMPARATOR = new Comparator<Message>() {
142-
@Override
143-
public int compare(Message m1, Message m2) {
144-
return new Long(m1.getCreateTimeStamp()).compareTo(new Long(m2.getCreateTimeStamp()));
145-
}
146-
};
141+
public static final Comparator<Message> CREATE_TIME_COMPARATOR =
142+
(m1, m2) -> Long.compare(m2.getCreateTimeStamp(), m1.getCreateTimeStamp());
147143

148144
/**
149145
* Instantiate a message
@@ -937,8 +933,14 @@ public void setSrcClusterName(String clusterName) {
937933
}
938934

939935
/**
940-
* Set the currentReplicaNumber for transition-related messages
941-
* @param currentReplicaNumber the replica count
936+
* Set replica number for participant-side message prioritization.
937+
* This field indicates the number of top and second-top state replicas at the time a state transition message is generated.
938+
* It is used to prioritize messages, with lower values indicating higher priority.
939+
* Participants can use this in custom thread pools or message handlers to process
940+
* critical transitions first during recovery scenarios.
941+
* Default value is -1 for transitions that don't require prioritization.
942+
* @param currentReplicaNumber the replica priority number (-1 for no prioritization, >=0 for
943+
* prioritized)
942944
*/
943945
public void setCurrentReplicaNumber(int currentReplicaNumber) {
944946
_record.setIntField(Attributes.CURRENT_REPLICA_NUMBER.name(), currentReplicaNumber);

helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -505,4 +505,28 @@ public static Map<String, Integer> getStateCounts(Map<String, String> stateMap)
505505
}
506506
return stateCounts;
507507
}
508+
509+
/**
510+
* Check if a state transition is upward
511+
* @param fromState source state
512+
* @param toState destination state
513+
* @return True if it's an upward state transition, false otherwise
514+
*/
515+
public boolean isUpwardStateTransition(String fromState, String toState) {
516+
517+
if (fromState == null || toState == null) {
518+
return false;
519+
}
520+
521+
Map<String, Integer> statePriorityMap = getStatePriorityMap();
522+
523+
Integer fromStateWeight = statePriorityMap.get(fromState);
524+
Integer toStateWeight = statePriorityMap.get(toState);
525+
526+
if (fromStateWeight == null || toStateWeight == null) {
527+
return false;
528+
}
529+
530+
return toStateWeight < fromStateWeight;
531+
}
508532
}

helix-core/src/main/java/org/apache/helix/util/MessageUtil.java

Lines changed: 51 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public static Message createStateTransitionCancellationMessage(String srcInstanc
4848
toState);
4949

5050
Message message =
51-
createStateTransitionMessage(Message.MessageType.STATE_TRANSITION_CANCELLATION,
51+
createBasicStateTransitionMessage(Message.MessageType.STATE_TRANSITION_CANCELLATION,
5252
srcInstanceName, srcSessionId, resource, partitionName, instanceName, currentState,
5353
nextState, sessionId, stateModelDefName);
5454

@@ -60,28 +60,6 @@ public static Message createStateTransitionCancellationMessage(String srcInstanc
6060
return null;
6161
}
6262

63-
public static Message createStateTransitionMessage(String srcInstanceName, String srcSessionId,
64-
Resource resource, String partitionName, String instanceName, String currentState,
65-
String nextState, String tgtSessionId, String stateModelDefName) {
66-
Message message =
67-
createStateTransitionMessage(Message.MessageType.STATE_TRANSITION, srcInstanceName,
68-
srcSessionId, resource, partitionName, instanceName, currentState, nextState, tgtSessionId,
69-
stateModelDefName);
70-
71-
// Set the retry count for state transition messages.
72-
// TODO: make the retry count configurable in ClusterConfig or IdealState
73-
message.setRetryCount(DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT);
74-
75-
if (resource.getResourceGroupName() != null) {
76-
message.setResourceGroupName(resource.getResourceGroupName());
77-
}
78-
if (resource.getResourceTag() != null) {
79-
message.setResourceTag(resource.getResourceTag());
80-
}
81-
82-
return message;
83-
}
84-
8563
/**
8664
* Creates a message to change participant status
8765
* {@link org.apache.helix.model.LiveInstance.LiveInstanceStatus}
@@ -121,7 +99,7 @@ private static Message createBasicMessage(Message.MessageType messageType, Strin
12199
}
122100

123101
/* Creates state transition or state transition cancellation message */
124-
private static Message createStateTransitionMessage(Message.MessageType messageType,
102+
private static Message createBasicStateTransitionMessage(Message.MessageType messageType,
125103
String srcInstanceName, String srcSessionId, Resource resource, String partitionName,
126104
String instanceName, String currentState, String nextState, String tgtSessionId,
127105
String stateModelDefName) {
@@ -138,30 +116,62 @@ private static Message createStateTransitionMessage(Message.MessageType messageT
138116
}
139117

140118
/**
141-
* Create a state transition message with currentReplicaNumber for prioritization
142-
* @param msgSender message sender
143-
* @param sessionId session id
119+
* Create a state transition message with replica prioritization metadata
120+
* @param srcInstanceName source instance name
121+
* @param srcSessionId source session id
144122
* @param resource resource
145123
* @param partitionName partition name
146-
* @param instanceName instance name
147-
* @param fromState from state
148-
* @param toState to state
149-
* @param sessionIdForInstance session id for instance
150-
* @param stateModelDefName state model def name
151-
* @param currentReplicaNumber the current replica number (for prioritization)
152-
* @return message
124+
* @param instanceName target instance name
125+
* @param currentState current state
126+
* @param nextState next state
127+
* @param tgtSessionId target session id
128+
* @param stateModelDefName state model definition name
129+
* @param currentReplicaNumber replica priority number (-1 for no prioritization, >=0 for
130+
* prioritized)
131+
* @return state transition message
153132
*/
154-
public static Message createStateTransitionMessage(String msgSender, String sessionId,
155-
Resource resource, String partitionName, String instanceName, String fromState,
156-
String toState, String sessionIdForInstance, String stateModelDefName,
157-
int currentReplicaNumber) {
133+
public static Message createStateTransitionMessage(String srcInstanceName, String srcSessionId,
134+
Resource resource, String partitionName, String instanceName, String currentState,
135+
String nextState, String tgtSessionId, String stateModelDefName, int currentReplicaNumber) {
136+
Message message = createBasicStateTransitionMessage(Message.MessageType.STATE_TRANSITION,
137+
srcInstanceName, srcSessionId, resource, partitionName, instanceName, currentState,
138+
nextState, tgtSessionId, stateModelDefName);
139+
140+
// Set the retry count for state transition messages.
141+
// TODO: make the retry count configurable in ClusterConfig or IdealState
142+
message.setRetryCount(DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT);
158143

159-
Message message = createStateTransitionMessage(msgSender, sessionId, resource, partitionName,
160-
instanceName, fromState, toState, sessionIdForInstance, stateModelDefName);
144+
if (resource.getResourceGroupName() != null) {
145+
message.setResourceGroupName(resource.getResourceGroupName());
146+
}
147+
if (resource.getResourceTag() != null) {
148+
message.setResourceTag(resource.getResourceTag());
149+
}
161150

162-
// Set the current replica number for prioritization
151+
// Set replica number for participant-side prioritization
163152
message.setCurrentReplicaNumber(currentReplicaNumber);
164153

165154
return message;
166155
}
156+
157+
/**
158+
* Create a state transition message (backward compatibility)
159+
* @param srcInstanceName source instance name
160+
* @param srcSessionId source session id
161+
* @param resource resource
162+
* @param partitionName partition name
163+
* @param instanceName target instance name
164+
* @param currentState current state
165+
* @param nextState next state
166+
* @param tgtSessionId target session id
167+
* @param stateModelDefName state model definition name
168+
* @return state transition message
169+
*/
170+
public static Message createStateTransitionMessage(String srcInstanceName, String srcSessionId,
171+
Resource resource, String partitionName, String instanceName, String currentState,
172+
String nextState, String tgtSessionId, String stateModelDefName) {
173+
// currentReplicaNumber is set to -1 for ST messages needing no prioritization metadata (backward compatibility)
174+
return createStateTransitionMessage(srcInstanceName, srcSessionId, resource, partitionName,
175+
instanceName, currentState, nextState, tgtSessionId, stateModelDefName, -1);
176+
}
167177
}

0 commit comments

Comments
 (0)