|
46 | 46 | import java.util.Set; |
47 | 47 | import java.util.stream.Collectors; |
48 | 48 |
|
| 49 | + |
49 | 50 | import lombok.extern.slf4j.Slf4j; |
50 | 51 |
|
51 | 52 | import org.springframework.beans.factory.annotation.Autowired; |
|
57 | 58 | @Component |
58 | 59 | public class BackfillWorkflowExecutorDelegate implements IExecutorDelegate<BackfillWorkflowDTO, List<Integer>> { |
59 | 60 |
|
60 | | - private static final ThreadLocal<Set<Long>> BACKFILL_VISITING_WORKFLOWS = new ThreadLocal<>(); |
61 | | - |
62 | 61 | @Autowired |
63 | 62 | private CommandDao commandDao; |
64 | 63 |
|
@@ -180,111 +179,100 @@ private Integer doBackfillWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO |
180 | 179 | } |
181 | 180 | final BackfillWorkflowDTO.BackfillParamsDTO backfillParams = backfillWorkflowDTO.getBackfillParams(); |
182 | 181 | if (backfillParams.getBackfillDependentMode() == ComplementDependentMode.ALL_DEPENDENT) { |
183 | | - doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList); |
| 182 | + final Set<Long> visitedCodes = new HashSet<>(); |
| 183 | + visitedCodes.add(backfillWorkflowDTO.getWorkflowDefinition().getCode()); |
| 184 | + doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList, visitedCodes); |
184 | 185 | } |
185 | 186 | return backfillTriggerResponse.getWorkflowInstanceId(); |
186 | 187 | } |
187 | 188 |
|
188 | 189 | private void doBackfillDependentWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO, |
189 | | - final List<String> backfillTimeList) { |
190 | | - final boolean isRootCall = BACKFILL_VISITING_WORKFLOWS.get() == null; |
191 | | - if (isRootCall) { |
192 | | - final Set<Long> visitedCodes = new HashSet<>(); |
193 | | - visitedCodes.add(backfillWorkflowDTO.getWorkflowDefinition().getCode()); |
194 | | - BACKFILL_VISITING_WORKFLOWS.set(visitedCodes); |
| 190 | + final List<String> backfillTimeList, |
| 191 | + final Set<Long> visitedCodes) { |
| 192 | + // 1) Query downstream dependent workflows for the current workflow |
| 193 | + final WorkflowDefinition upstreamWorkflow = backfillWorkflowDTO.getWorkflowDefinition(); |
| 194 | + final long upstreamWorkflowCode = upstreamWorkflow.getCode(); |
| 195 | + |
| 196 | + List<DependentWorkflowDefinition> downstreamDefinitions = |
| 197 | + workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode); |
| 198 | + |
| 199 | + if (downstreamDefinitions == null || downstreamDefinitions.isEmpty()) { |
| 200 | + log.info("No downstream dependent workflows found for workflow code {}", upstreamWorkflowCode); |
| 201 | + return; |
195 | 202 | } |
196 | | - try { |
197 | | - final Set<Long> visitedCodes = BACKFILL_VISITING_WORKFLOWS.get(); |
198 | 203 |
|
199 | | - // 1) Query downstream dependent workflows for the current workflow |
200 | | - final WorkflowDefinition upstreamWorkflow = backfillWorkflowDTO.getWorkflowDefinition(); |
201 | | - final long upstreamWorkflowCode = upstreamWorkflow.getCode(); |
| 204 | + // 2) Convert upstream backfill time from string to ZonedDateTime as the base business dates for downstream |
| 205 | + // backfill |
| 206 | + final List<ZonedDateTime> upstreamBackfillDates = backfillTimeList.stream() |
| 207 | + .map(DateUtils::stringToZoneDateTime) |
| 208 | + .collect(Collectors.toList()); |
202 | 209 |
|
203 | | - List<DependentWorkflowDefinition> downstreamDefinitions = |
204 | | - workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode); |
| 210 | + // 3) Iterate downstream workflows and build/trigger corresponding BackfillWorkflowDTO |
| 211 | + for (DependentWorkflowDefinition dependentWorkflowDefinition : downstreamDefinitions) { |
| 212 | + long downstreamCode = dependentWorkflowDefinition.getWorkflowDefinitionCode(); |
205 | 213 |
|
206 | | - if (downstreamDefinitions == null || downstreamDefinitions.isEmpty()) { |
207 | | - log.info("No downstream dependent workflows found for workflow code {}", upstreamWorkflowCode); |
208 | | - return; |
| 214 | + // Prevent self-dependency and circular dependency chains |
| 215 | + if (visitedCodes.contains(downstreamCode)) { |
| 216 | + log.warn("Skip circular dependent workflow {}", downstreamCode); |
| 217 | + continue; |
209 | 218 | } |
210 | 219 |
|
211 | | - // 2) Convert upstream backfill time from string to ZonedDateTime as the base business dates for downstream |
212 | | - // backfill |
213 | | - final List<ZonedDateTime> upstreamBackfillDates = backfillTimeList.stream() |
214 | | - .map(DateUtils::stringToZoneDateTime) |
215 | | - .collect(Collectors.toList()); |
216 | | - |
217 | | - // 3) Iterate downstream workflows and build/trigger corresponding BackfillWorkflowDTO |
218 | | - for (DependentWorkflowDefinition dependentWorkflowDefinition : downstreamDefinitions) { |
219 | | - long downstreamCode = dependentWorkflowDefinition.getWorkflowDefinitionCode(); |
220 | | - |
221 | | - // Prevent self-dependency and circular dependency chains |
222 | | - if (visitedCodes.contains(downstreamCode)) { |
223 | | - log.warn("Skip circular dependent workflow {}", downstreamCode); |
224 | | - continue; |
225 | | - } |
226 | | - |
227 | | - WorkflowDefinition downstreamWorkflow = |
228 | | - workflowDefinitionDao.queryByCode(downstreamCode).orElse(null); |
229 | | - if (downstreamWorkflow == null) { |
230 | | - log.warn("Skip dependent workflow {}, definition not found", downstreamCode); |
231 | | - continue; |
232 | | - } |
233 | | - |
234 | | - if (downstreamWorkflow.getReleaseState() != ReleaseState.ONLINE) { |
235 | | - log.warn("Skip dependent workflow {}, release state is not ONLINE", downstreamCode); |
236 | | - continue; |
237 | | - } |
238 | | - |
239 | | - // Currently, reuse the same business date list as upstream for downstream backfill; |
240 | | - // later we can refine the dates based on dependency cycle configuration in dependentWorkflowDefinition |
241 | | - // (taskParams). |
242 | | - BackfillWorkflowDTO.BackfillParamsDTO originalParams = backfillWorkflowDTO.getBackfillParams(); |
243 | | - boolean allLevelDependent = originalParams.isAllLevelDependent(); |
244 | | - ComplementDependentMode downstreamDependentMode = |
245 | | - allLevelDependent ? originalParams.getBackfillDependentMode() : ComplementDependentMode.OFF_MODE; |
246 | | - |
247 | | - BackfillWorkflowDTO.BackfillParamsDTO dependentParams = BackfillWorkflowDTO.BackfillParamsDTO.builder() |
248 | | - .runMode(originalParams.getRunMode()) |
249 | | - .backfillDateList(upstreamBackfillDates) |
250 | | - .expectedParallelismNumber(originalParams.getExpectedParallelismNumber()) |
251 | | - // Control whether downstream will continue triggering its own dependencies based on |
252 | | - // allLevelDependent flag |
253 | | - .backfillDependentMode(downstreamDependentMode) |
254 | | - .allLevelDependent(allLevelDependent) |
255 | | - .executionOrder(originalParams.getExecutionOrder()) |
256 | | - .build(); |
257 | | - |
258 | | - BackfillWorkflowDTO dependentBackfillDTO = BackfillWorkflowDTO.builder() |
259 | | - .loginUser(backfillWorkflowDTO.getLoginUser()) |
260 | | - .workflowDefinition(downstreamWorkflow) |
261 | | - .startNodes(null) |
262 | | - .failureStrategy(backfillWorkflowDTO.getFailureStrategy()) |
263 | | - .taskDependType(backfillWorkflowDTO.getTaskDependType()) |
264 | | - .execType(backfillWorkflowDTO.getExecType()) |
265 | | - .warningType(backfillWorkflowDTO.getWarningType()) |
266 | | - .warningGroupId(downstreamWorkflow.getWarningGroupId()) |
267 | | - .runMode(dependentParams.getRunMode()) |
268 | | - .workflowInstancePriority(backfillWorkflowDTO.getWorkflowInstancePriority()) |
269 | | - .workerGroup(backfillWorkflowDTO.getWorkerGroup()) |
270 | | - .tenantCode(backfillWorkflowDTO.getTenantCode()) |
271 | | - .environmentCode(backfillWorkflowDTO.getEnvironmentCode()) |
272 | | - .startParamList(backfillWorkflowDTO.getStartParamList()) |
273 | | - .dryRun(backfillWorkflowDTO.getDryRun()) |
274 | | - .backfillParams(dependentParams) |
275 | | - .build(); |
276 | | - |
277 | | - log.info("Trigger dependent workflow {} for upstream workflow {} with backfill dates {}", |
278 | | - downstreamCode, upstreamWorkflowCode, backfillTimeList); |
279 | | - |
280 | | - // 4) Mark as visiting before recursive trigger to detect cycles |
281 | | - visitedCodes.add(downstreamCode); |
282 | | - execute(dependentBackfillDTO); |
| 220 | + WorkflowDefinition downstreamWorkflow = |
| 221 | + workflowDefinitionDao.queryByCode(downstreamCode).orElse(null); |
| 222 | + if (downstreamWorkflow == null) { |
| 223 | + log.warn("Skip dependent workflow {}, definition not found", downstreamCode); |
| 224 | + continue; |
283 | 225 | } |
284 | | - } finally { |
285 | | - if (isRootCall) { |
286 | | - BACKFILL_VISITING_WORKFLOWS.remove(); |
| 226 | + |
| 227 | + if (downstreamWorkflow.getReleaseState() != ReleaseState.ONLINE) { |
| 228 | + log.warn("Skip dependent workflow {}, release state is not ONLINE", downstreamCode); |
| 229 | + continue; |
287 | 230 | } |
| 231 | + |
| 232 | + // Currently, reuse the same business date list as upstream for downstream backfill; |
| 233 | + // later we can refine the dates based on dependency cycle configuration in dependentWorkflowDefinition |
| 234 | + // (taskParams). |
| 235 | + BackfillWorkflowDTO.BackfillParamsDTO originalParams = backfillWorkflowDTO.getBackfillParams(); |
| 236 | + boolean allLevelDependent = originalParams.isAllLevelDependent(); |
| 237 | + ComplementDependentMode downstreamDependentMode = |
| 238 | + allLevelDependent ? originalParams.getBackfillDependentMode() : ComplementDependentMode.OFF_MODE; |
| 239 | + |
| 240 | + BackfillWorkflowDTO.BackfillParamsDTO dependentParams = BackfillWorkflowDTO.BackfillParamsDTO.builder() |
| 241 | + .runMode(originalParams.getRunMode()) |
| 242 | + .backfillDateList(upstreamBackfillDates) |
| 243 | + .expectedParallelismNumber(originalParams.getExpectedParallelismNumber()) |
| 244 | + // Control whether downstream will continue triggering its own dependencies based on |
| 245 | + // allLevelDependent flag |
| 246 | + .backfillDependentMode(downstreamDependentMode) |
| 247 | + .allLevelDependent(allLevelDependent) |
| 248 | + .executionOrder(originalParams.getExecutionOrder()) |
| 249 | + .build(); |
| 250 | + |
| 251 | + BackfillWorkflowDTO dependentBackfillDTO = BackfillWorkflowDTO.builder() |
| 252 | + .loginUser(backfillWorkflowDTO.getLoginUser()) |
| 253 | + .workflowDefinition(downstreamWorkflow) |
| 254 | + .startNodes(null) |
| 255 | + .failureStrategy(backfillWorkflowDTO.getFailureStrategy()) |
| 256 | + .taskDependType(backfillWorkflowDTO.getTaskDependType()) |
| 257 | + .execType(backfillWorkflowDTO.getExecType()) |
| 258 | + .warningType(backfillWorkflowDTO.getWarningType()) |
| 259 | + .warningGroupId(downstreamWorkflow.getWarningGroupId()) |
| 260 | + .runMode(dependentParams.getRunMode()) |
| 261 | + .workflowInstancePriority(backfillWorkflowDTO.getWorkflowInstancePriority()) |
| 262 | + .workerGroup(backfillWorkflowDTO.getWorkerGroup()) |
| 263 | + .tenantCode(backfillWorkflowDTO.getTenantCode()) |
| 264 | + .environmentCode(backfillWorkflowDTO.getEnvironmentCode()) |
| 265 | + .startParamList(backfillWorkflowDTO.getStartParamList()) |
| 266 | + .dryRun(backfillWorkflowDTO.getDryRun()) |
| 267 | + .backfillParams(dependentParams) |
| 268 | + .build(); |
| 269 | + |
| 270 | + log.info("Trigger dependent workflow {} for upstream workflow {} with backfill dates {}", |
| 271 | + downstreamCode, upstreamWorkflowCode, backfillTimeList); |
| 272 | + |
| 273 | + // 4) Mark as visiting before recursive trigger to detect cycles, then trigger downstream backfill |
| 274 | + visitedCodes.add(downstreamCode); |
| 275 | + execute(dependentBackfillDTO); |
288 | 276 | } |
289 | 277 | } |
290 | 278 | } |
0 commit comments