Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
package org.apache.dolphinscheduler.api.executor.workflow;

import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.WorkflowLineageService;
import org.apache.dolphinscheduler.api.validator.workflow.BackfillWorkflowDTO;
import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
import org.apache.dolphinscheduler.common.enums.ExecutionOrder;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.DependentWorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.repository.CommandDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionDao;
import org.apache.dolphinscheduler.extract.base.client.Clients;
import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowBackfillTriggerRequest;
Expand All @@ -37,7 +41,11 @@
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;
Expand All @@ -57,20 +65,32 @@ public class BackfillWorkflowExecutorDelegate implements IExecutorDelegate<Backf
@Autowired
private ProcessService processService;

@Autowired
private WorkflowLineageService workflowLineageService;

@Autowired
private WorkflowDefinitionDao workflowDefinitionDao;

@Autowired
private RegistryClient registryClient;

@Override
public List<Integer> execute(final BackfillWorkflowDTO backfillWorkflowDTO) {
return executeWithVisitedCodes(backfillWorkflowDTO, null);
}

List<Integer> executeWithVisitedCodes(final BackfillWorkflowDTO backfillWorkflowDTO,
final Set<Long> visitedCodes) {
// todo: directly call the master api to do backfill
if (backfillWorkflowDTO.getBackfillParams().getRunMode() == RunMode.RUN_MODE_SERIAL) {
return doSerialBackfillWorkflow(backfillWorkflowDTO);
return doSerialBackfillWorkflow(backfillWorkflowDTO, visitedCodes);
} else {
return doParallelBackfillWorkflow(backfillWorkflowDTO);
return doParallelBackfillWorkflow(backfillWorkflowDTO, visitedCodes);
}
}
Comment on lines 77 to 90
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description currently states "This pull request is code cleanup without any test coverage", but this change introduces new dependency-triggering behavior and adds a new test class. Please update the PR description/verify section to reflect the actual behavior change and how it is tested (or why tests are sufficient).

Copilot uses AI. Check for mistakes.

private List<Integer> doSerialBackfillWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO) {
private List<Integer> doSerialBackfillWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO,
final Set<Long> visitedCodes) {
final BackfillWorkflowDTO.BackfillParamsDTO backfillParams = backfillWorkflowDTO.getBackfillParams();
final List<ZonedDateTime> backfillTimeList = backfillParams.getBackfillDateList();
if (backfillParams.getExecutionOrder() == ExecutionOrder.DESC_ORDER) {
Expand All @@ -81,11 +101,13 @@ private List<Integer> doSerialBackfillWorkflow(final BackfillWorkflowDTO backfil

final Integer workflowInstanceId = doBackfillWorkflow(
backfillWorkflowDTO,
backfillTimeList.stream().map(DateUtils::dateToString).collect(Collectors.toList()));
backfillTimeList.stream().map(DateUtils::dateToString).collect(Collectors.toList()),
visitedCodes);
return Lists.newArrayList(workflowInstanceId);
}

private List<Integer> doParallelBackfillWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO) {
private List<Integer> doParallelBackfillWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO,
final Set<Long> visitedCodes) {
final BackfillWorkflowDTO.BackfillParamsDTO backfillParams = backfillWorkflowDTO.getBackfillParams();
Integer expectedParallelismNumber = backfillParams.getExpectedParallelismNumber();

Expand All @@ -98,10 +120,14 @@ private List<Integer> doParallelBackfillWorkflow(final BackfillWorkflowDTO backf

log.info("In parallel mode, current expectedParallelismNumber: {}", expectedParallelismNumber);
final List<Integer> workflowInstanceIdList = Lists.newArrayList();
final Set<Long> baseVisitedCodes = visitedCodes == null ? new HashSet<>() : visitedCodes;
for (List<ZonedDateTime> stringDate : splitDateTime(listDate, expectedParallelismNumber)) {
// Each parallel chunk should keep its own traversal context to avoid cross-chunk pollution.
final Set<Long> chunkVisitedCodes = new HashSet<>(baseVisitedCodes);
final Integer workflowInstanceId = doBackfillWorkflow(
backfillWorkflowDTO,
stringDate.stream().map(DateUtils::dateToString).collect(Collectors.toList()));
stringDate.stream().map(DateUtils::dateToString).collect(Collectors.toList()),
chunkVisitedCodes);
workflowInstanceIdList.add(workflowInstanceId);
}
return workflowInstanceIdList;
Expand Down Expand Up @@ -132,7 +158,8 @@ private List<List<ZonedDateTime>> splitDateTime(List<ZonedDateTime> dateTimeList
}

private Integer doBackfillWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO,
final List<String> backfillTimeList) {
final List<String> backfillTimeList,
final Set<Long> visitedCodes) {
final Server masterServer = registryClient.getRandomServer(RegistryNodeType.MASTER).orElse(null);
if (masterServer == null) {
throw new ServiceException("no master server available");
Expand Down Expand Up @@ -166,13 +193,111 @@ private Integer doBackfillWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO
}
final BackfillWorkflowDTO.BackfillParamsDTO backfillParams = backfillWorkflowDTO.getBackfillParams();
if (backfillParams.getBackfillDependentMode() == ComplementDependentMode.ALL_DEPENDENT) {
doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList);
final Set<Long> effectiveVisitedCodes = visitedCodes == null ? new HashSet<>() : visitedCodes;
effectiveVisitedCodes.add(backfillWorkflowDTO.getWorkflowDefinition().getCode());
doBackfillDependentWorkflowForTesting(backfillWorkflowDTO, backfillTimeList, effectiveVisitedCodes);
}
return backfillTriggerResponse.getWorkflowInstanceId();
}

void doBackfillDependentWorkflowForTesting(final BackfillWorkflowDTO backfillWorkflowDTO,
final List<String> backfillTimeList,
final Set<Long> visitedCodes) {
doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList, visitedCodes);
}
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doBackfillDependentWorkflowForTesting introduces a production method purely as a test seam. Consider making the underlying dependency-triggering method package-private/protected (or extract a collaborator) and annotate with something like @VisibleForTesting, instead of keeping a "ForTesting" method in the main class API.

Copilot uses AI. Check for mistakes.

private void doBackfillDependentWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO,
final List<String> backfillTimeList) {
// todo:
final List<String> backfillTimeList,
final Set<Long> visitedCodes) {
// 1) Query downstream dependent workflows for the current workflow
final WorkflowDefinition upstreamWorkflow = backfillWorkflowDTO.getWorkflowDefinition();
final long upstreamWorkflowCode = upstreamWorkflow.getCode();

List<DependentWorkflowDefinition> downstreamDefinitions =
workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode);

if (downstreamDefinitions == null || downstreamDefinitions.isEmpty()) {
log.info("No downstream dependent workflows found for workflow code {}", upstreamWorkflowCode);
return;
}
final Set<Long> downstreamCodes = downstreamDefinitions.stream()
.map(DependentWorkflowDefinition::getWorkflowDefinitionCode)
.collect(Collectors.toCollection(LinkedHashSet::new));
final List<WorkflowDefinition> downstreamWorkflowList = workflowDefinitionDao.queryByCodes(downstreamCodes);
final Map<Long, WorkflowDefinition> downstreamWorkflowMap = downstreamWorkflowList.stream()
.collect(Collectors.toMap(WorkflowDefinition::getCode, workflow -> workflow));

// 2) Convert upstream backfill time from string to ZonedDateTime as the base business dates for downstream
// backfill
final List<ZonedDateTime> upstreamBackfillDates = backfillTimeList.stream()
.map(DateUtils::stringToZoneDateTime)
.collect(Collectors.toList());
Comment on lines +230 to +234
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doBackfillDependentWorkflow converts backfillTimeList (strings) to ZonedDateTime via DateUtils.stringToZoneDateTime, then later downstream execution converts those ZonedDateTimes back to strings via DateUtils.dateToString. Because stringToZoneDateTime interprets the input using the thread-local/default timezone but returns a ZonedDateTime in ZoneId.systemDefault(), this round-trip can shift the local date/time when the request timezone differs from the JVM default, causing downstream workflows to be backfilled for the wrong business dates. Avoid the string->ZonedDateTime->string round-trip by propagating the original ZonedDateTime chunk list into downstream params, or ensure parsing/formatting uses the same timezone consistently (e.g., the request/thread-local timezone).

Copilot uses AI. Check for mistakes.

// 3) Iterate downstream workflows and build/trigger corresponding BackfillWorkflowDTO
for (DependentWorkflowDefinition dependentWorkflowDefinition : downstreamDefinitions) {
long downstreamCode = dependentWorkflowDefinition.getWorkflowDefinitionCode();

// Prevent self-dependency and circular dependency chains
if (visitedCodes.contains(downstreamCode)) {
log.warn("Skip circular dependent workflow {}", downstreamCode);
continue;
}

WorkflowDefinition downstreamWorkflow = downstreamWorkflowMap.get(downstreamCode);
if (downstreamWorkflow == null) {
log.warn("Skip dependent workflow {}, workflow definition not found", downstreamCode);
continue;
}

if (downstreamWorkflow.getReleaseState() != ReleaseState.ONLINE) {
log.warn("Skip dependent workflow {}, release state is not ONLINE", downstreamCode);
continue;
}

// Currently, reuse the same business date list as upstream for downstream backfill;
// later we can refine the dates based on dependency cycle configuration in dependentWorkflowDefinition
// (taskParams).
BackfillWorkflowDTO.BackfillParamsDTO originalParams = backfillWorkflowDTO.getBackfillParams();
boolean allLevelDependent = originalParams.isAllLevelDependent();
ComplementDependentMode downstreamDependentMode =
allLevelDependent ? originalParams.getBackfillDependentMode() : ComplementDependentMode.OFF_MODE;

BackfillWorkflowDTO.BackfillParamsDTO dependentParams = BackfillWorkflowDTO.BackfillParamsDTO.builder()
.runMode(originalParams.getRunMode())
.backfillDateList(upstreamBackfillDates)
.expectedParallelismNumber(originalParams.getExpectedParallelismNumber())
// Control whether downstream will continue triggering its own dependencies based on
// allLevelDependent flag
.backfillDependentMode(downstreamDependentMode)
.allLevelDependent(allLevelDependent)
.executionOrder(originalParams.getExecutionOrder())
.build();

BackfillWorkflowDTO dependentBackfillDTO = BackfillWorkflowDTO.builder()
.loginUser(backfillWorkflowDTO.getLoginUser())
.workflowDefinition(downstreamWorkflow)
.startNodes(null)
.failureStrategy(backfillWorkflowDTO.getFailureStrategy())
.taskDependType(backfillWorkflowDTO.getTaskDependType())
.execType(backfillWorkflowDTO.getExecType())
.warningType(backfillWorkflowDTO.getWarningType())
.warningGroupId(downstreamWorkflow.getWarningGroupId())
.runMode(dependentParams.getRunMode())
.workflowInstancePriority(backfillWorkflowDTO.getWorkflowInstancePriority())
.workerGroup(backfillWorkflowDTO.getWorkerGroup())
.tenantCode(backfillWorkflowDTO.getTenantCode())
.environmentCode(backfillWorkflowDTO.getEnvironmentCode())
.startParamList(backfillWorkflowDTO.getStartParamList())
.dryRun(backfillWorkflowDTO.getDryRun())
.backfillParams(dependentParams)
.build();

log.info("Trigger dependent workflow {} for upstream workflow {} with backfill dates {}",
downstreamCode, upstreamWorkflowCode, backfillTimeList);

// 4) Mark as visiting before recursive trigger to detect cycles, then trigger downstream backfill
visitedCodes.add(downstreamCode);
executeWithVisitedCodes(dependentBackfillDTO, visitedCodes);
}
}
}
Loading
Loading