Skip to content

Commit f630e1b

Browse files
pravinbhatmsmygit
andauthored
Randomized the pending token-range list (#354)
* Randomized the pending token-range list returned by the `trackRun` feature * Update src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java Agree, this looks cleaner Co-authored-by: Madhavan <[email protected]> * Apply suggestions from code review --------- Co-authored-by: Madhavan <[email protected]>
1 parent a477bf4 commit f630e1b

File tree

3 files changed

+17
-7
lines changed

3 files changed

+17
-7
lines changed

RELEASE.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# Release Notes
22

3+
## [5.2.3] - 2025-04-15
4+
- Randomized the pending token-range list returned by the `trackRun` feature (when rerunning a previously incomplete job) for better load distribution across the cluster.
5+
36
## [5.2.2] - 2025-04-02
47
- Replaced deprecated (not supported in Cassandra 5.0.3+) function `dateof()` with `totimestamp()`
58

src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.ArrayList;
2121
import java.util.Collection;
2222
import java.util.Collections;
23+
import java.util.List;
2324

2425
import org.slf4j.Logger;
2526
import org.slf4j.LoggerFactory;
@@ -111,11 +112,19 @@ public Collection<PartitionRange> getPendingPartitions(long prevRunId, JobType j
111112
}
112113
}
113114

114-
final Collection<PartitionRange> pendingParts = new ArrayList<PartitionRange>();
115-
pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.NOT_STARTED.toString(), jobType));
116-
pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.STARTED.toString(), jobType));
117-
pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.FAIL.toString(), jobType));
118-
pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.DIFF.toString(), jobType));
115+
final List<PartitionRange> pendingParts = new ArrayList<PartitionRange>();
116+
// Use an array of statuses for iteration
117+
String[] statuses = {
118+
TrackRun.RUN_STATUS.NOT_STARTED.toString(),
119+
TrackRun.RUN_STATUS.STARTED.toString(),
120+
TrackRun.RUN_STATUS.FAIL.toString(),
121+
TrackRun.RUN_STATUS.DIFF.toString()
122+
};
123+
for (String status : statuses) {
124+
pendingParts.addAll(getPartitionsByStatus(prevRunId, status, jobType));
125+
}
126+
Collections.shuffle(pendingParts);
127+
Collections.shuffle(pendingParts);
119128

120129
return pendingParts;
121130
}

src/main/java/com/datastax/cdm/job/SplitPartitions.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@ public static List<PartitionRange> getRandomSubPartitions(int numSplits, BigInte
3535
List<PartitionRange> partitions = getSubPartitions(numSplits, min, max, coveragePercent, jobType);
3636
Collections.shuffle(partitions);
3737
Collections.shuffle(partitions);
38-
Collections.shuffle(partitions);
39-
Collections.shuffle(partitions);
4038
return partitions;
4139
}
4240

0 commit comments

Comments
 (0)