Skip to content

Commit 6d1874e

Browse files
authored
[Fix-17831] Fix incorrect parallelism num when complement data in parallel excution mode (#17853)
1 parent de0b233 commit 6d1874e

File tree

1 file changed

+27
-2
lines changed

1 file changed

+27
-2
lines changed

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.dolphinscheduler.service.process.ProcessService;
3636

3737
import java.time.ZonedDateTime;
38+
import java.util.ArrayList;
3839
import java.util.Collections;
3940
import java.util.List;
4041
import java.util.stream.Collectors;
@@ -95,9 +96,9 @@ private List<Integer> doParallelBackfillWorkflow(final BackfillWorkflowDTO backf
9596
expectedParallelismNumber = listDate.size();
9697
}
9798

98-
log.info("In parallel mode, current expectedParallelismNumber:{}", expectedParallelismNumber);
99+
log.info("In parallel mode, current expectedParallelismNumber: {}", expectedParallelismNumber);
99100
final List<Integer> workflowInstanceIdList = Lists.newArrayList();
100-
for (List<ZonedDateTime> stringDate : Lists.partition(listDate, expectedParallelismNumber)) {
101+
for (List<ZonedDateTime> stringDate : splitDateTime(listDate, expectedParallelismNumber)) {
101102
final Integer workflowInstanceId = doBackfillWorkflow(
102103
backfillWorkflowDTO,
103104
stringDate.stream().map(DateUtils::dateToString).collect(Collectors.toList()));
@@ -106,6 +107,30 @@ private List<Integer> doParallelBackfillWorkflow(final BackfillWorkflowDTO backf
106107
return workflowInstanceIdList;
107108
}
108109

110+
/**
111+
* split date time list into n parts, the last part may be larger if not divisible
112+
*/
113+
private List<List<ZonedDateTime>> splitDateTime(List<ZonedDateTime> dateTimeList, int numParts) {
114+
List<List<ZonedDateTime>> result = new ArrayList<>();
115+
int n = dateTimeList.size();
116+
117+
int baseSize = n / numParts;
118+
int remainder = n % numParts;
119+
120+
int start = 0;
121+
for (int i = 0; i < numParts; i++) {
122+
int currentSize = baseSize;
123+
if (i == numParts - 1) {
124+
currentSize += remainder;
125+
}
126+
List<ZonedDateTime> part = dateTimeList.subList(start, start + currentSize);
127+
result.add(part);
128+
start += currentSize;
129+
}
130+
131+
return result;
132+
}
133+
109134
private Integer doBackfillWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO,
110135
final List<String> backfillTimeList) {
111136
final Server masterServer = registryClient.getRandomServer(RegistryNodeType.MASTER).orElse(null);

0 commit comments

Comments
 (0)