Skip to content

Commit 9ca7e62

Browse files
authored
Merge branch 'dev' into Fix-17701
2 parents a1224e6 + 4505d4b commit 9ca7e62

File tree

52 files changed

+1698
-130
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+1698
-130
lines changed

.github/workflows/backend.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ jobs:
143143
strategy:
144144
fail-fast: false
145145
matrix:
146-
version: ["3.1.9", "3.2.0"]
146+
version: ["3.1.9", "3.2.0", "3.3.1"]
147147
case:
148148
- name: schema-check-with-mysql
149149
script: .github/workflows/schema-check/mysql/start-job.sh

docs/docs/en/guide/upgrade/incompatible.md

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,23 @@
22

33
This document records the incompatible updates between each version. You need to check this document before you upgrade to related version.
44

5-
## dev
6-
7-
* Upgrade mysql driver version from 8.0.16 to 8.0.33 ([#14684](https://github.com/apache/dolphinscheduler/pull/14684))
8-
* Change env `PYTHON_HOME` to `PYTHON_LAUNCHER` and `DATAX_HOME` to `DATAX_LAUNCHER` ([#14523](https://github.com/apache/dolphinscheduler/pull/14523))
9-
* Change regex matching sql params in SQL task plugin ([#13378](https://github.com/apache/dolphinscheduler/pull/13378))
10-
* Remove the spark version of spark task ([#11860](https://github.com/apache/dolphinscheduler/pull/11860)).
11-
* Change the default unix shell executor from sh to bash ([#12180](https://github.com/apache/dolphinscheduler/pull/12180)).
12-
* Remove `deleteSource` in `download()` of `StorageOperate` ([#14084](https://github.com/apache/dolphinscheduler/pull/14084))
13-
* Remove default key for attribute `data-quality.jar.name` in `common.properties` ([#15551](https://github.com/apache/dolphinscheduler/pull/15551))
14-
* Rename attribute `data-quality.jar.name` to `data-quality.jar.dir` in `common.properties` and represent for directory ([#15563](https://github.com/apache/dolphinscheduler/pull/15563))
15-
16-
## 3.2.0
17-
18-
* Remove parameter `description` from public interfaces of new resource center ([#14394](https://github.com/apache/dolphinscheduler/pull/14394))
19-
205
## 3.0.0
216

227
* Copy and import workflow without 'copy' suffix [#10607](https://github.com/apache/dolphinscheduler/pull/10607)
238
* Use semicolon as default sql segment separator [#10869](https://github.com/apache/dolphinscheduler/pull/10869)
249

2510
## 3.2.0
2611

12+
* Rename attribute `data-quality.jar.name` to `data-quality.jar.dir` in `common.properties` and represent for directory ([#15563](https://github.com/apache/dolphinscheduler/pull/15563))
13+
* Remove default key for attribute `data-quality.jar.name` in `common.properties` ([#15551](https://github.com/apache/dolphinscheduler/pull/15551))
14+
* Remove `deleteSource` in `download()` of `StorageOperate` ([#14084](https://github.com/apache/dolphinscheduler/pull/14084))
15+
* Change the default unix shell executor from sh to bash ([#12180](https://github.com/apache/dolphinscheduler/pull/12180)).
16+
* Remove the spark version of spark task ([#11860](https://github.com/apache/dolphinscheduler/pull/11860)).
17+
* Change regex matching sql params in SQL task plugin ([#13378](https://github.com/apache/dolphinscheduler/pull/13378))
18+
* Change env `PYTHON_HOME` to `PYTHON_LAUNCHER` and `DATAX_HOME` to `DATAX_LAUNCHER` ([#14523](https://github.com/apache/dolphinscheduler/pull/14523))
19+
* Upgrade mysql driver version from 8.0.16 to 8.0.33 ([#14684](https://github.com/apache/dolphinscheduler/pull/14684))
2720
* Add required field `database` in /datasources/tables && /datasources/tableColumns Api [#14406](https://github.com/apache/dolphinscheduler/pull/14406)
21+
* Remove parameter `description` from public interfaces of new resource center ([#14394](https://github.com/apache/dolphinscheduler/pull/14394))
2822

2923
## 3.3.0
3024

@@ -41,4 +35,5 @@ This document records the incompatible updates between each version. You need to
4135
## 3.4.0
4236

4337
* Renamed the publicKey field to privateKey in the SSH connection parameters under the datasource configuration. ([#17666])(https://github.com/apache/dolphinscheduler/pull/17666)
38+
* Add table t_ds_serial_command. ([#17531])(https://github.com/apache/dolphinscheduler/pull/17531)
4439

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowCreateRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public class WorkflowCreateRequest {
5656
private int timeout;
5757

5858
@Schema(allowableValues = "PARALLEL / SERIAL_WAIT / SERIAL_DISCARD / SERIAL_PRIORITY", example = "PARALLEL", description = "default PARALLEL if not provide.")
59-
private String executionType;
59+
private String executionType = WorkflowExecutionTypeEnum.PARALLEL.name();
6060

6161
public WorkflowDefinition convert2WorkflowDefinition() {
6262
WorkflowDefinition workflowDefinition = new WorkflowDefinition();

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/PauseWorkflowInstanceExecutorDelegate.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
2222
import org.apache.dolphinscheduler.dao.entity.User;
2323
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
24+
import org.apache.dolphinscheduler.dao.repository.SerialCommandDao;
2425
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
2526
import org.apache.dolphinscheduler.extract.base.client.Clients;
2627
import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient;
@@ -31,6 +32,7 @@
3132

3233
import org.springframework.beans.factory.annotation.Autowired;
3334
import org.springframework.stereotype.Component;
35+
import org.springframework.transaction.support.TransactionTemplate;
3436

3537
@Slf4j
3638
@Component
@@ -41,6 +43,12 @@ public class PauseWorkflowInstanceExecutorDelegate
4143
@Autowired
4244
private WorkflowInstanceDao workflowInstanceDao;
4345

46+
@Autowired
47+
private TransactionTemplate transactionTemplate;
48+
49+
@Autowired
50+
private SerialCommandDao serialCommandDao;
51+
4452
@Override
4553
public Void execute(PauseWorkflowInstanceOperation workflowInstanceControlRequest) {
4654
final WorkflowInstance workflowInstance = workflowInstanceControlRequest.workflowInstance;
@@ -64,10 +72,15 @@ private void exceptionIfWorkflowInstanceCannotPause(WorkflowInstance workflowIns
6472
}
6573

6674
private void directPauseInDB(WorkflowInstance workflowInstance) {
67-
workflowInstanceDao.updateWorkflowInstanceState(
68-
workflowInstance.getId(),
69-
workflowInstance.getState(),
70-
WorkflowExecutionStatus.PAUSE);
75+
// todo: move the pause logic to master
76+
transactionTemplate.execute(status -> {
77+
workflowInstanceDao.updateWorkflowInstanceState(
78+
workflowInstance.getId(),
79+
workflowInstance.getState(),
80+
WorkflowExecutionStatus.PAUSE);
81+
serialCommandDao.deleteByWorkflowInstanceId(workflowInstance.getId());
82+
return null;
83+
});
7184
log.info("Update workflow instance {} state from: {} to {} success",
7285
workflowInstance.getName(),
7386
workflowInstance.getState().name(),

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/StopWorkflowInstanceExecutorDelegate.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
2222
import org.apache.dolphinscheduler.dao.entity.User;
2323
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
24+
import org.apache.dolphinscheduler.dao.repository.SerialCommandDao;
2425
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
2526
import org.apache.dolphinscheduler.extract.base.client.Clients;
2627
import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient;
@@ -31,6 +32,7 @@
3132

3233
import org.springframework.beans.factory.annotation.Autowired;
3334
import org.springframework.stereotype.Component;
35+
import org.springframework.transaction.support.TransactionTemplate;
3436

3537
@Slf4j
3638
@Component
@@ -41,6 +43,12 @@ public class StopWorkflowInstanceExecutorDelegate
4143
@Autowired
4244
private WorkflowInstanceDao workflowInstanceDao;
4345

46+
@Autowired
47+
private TransactionTemplate transactionTemplate;
48+
49+
@Autowired
50+
private SerialCommandDao serialCommandDao;
51+
4452
@Override
4553
public Void execute(StopWorkflowInstanceOperation workflowInstanceControlRequest) {
4654
final WorkflowInstance workflowInstance = workflowInstanceControlRequest.workflowInstance;
@@ -65,10 +73,15 @@ void exceptionIfWorkflowInstanceCannotStop(WorkflowInstance workflowInstance) {
6573
}
6674

6775
void directStopInDB(WorkflowInstance workflowInstance) {
68-
workflowInstanceDao.updateWorkflowInstanceState(
69-
workflowInstance.getId(),
70-
workflowInstance.getState(),
71-
WorkflowExecutionStatus.STOP);
76+
// todo: move the stop logic to master
77+
transactionTemplate.execute(status -> {
78+
workflowInstanceDao.updateWorkflowInstanceState(
79+
workflowInstance.getId(),
80+
workflowInstance.getState(),
81+
WorkflowExecutionStatus.STOP);
82+
serialCommandDao.deleteByWorkflowInstanceId(workflowInstance.getId());
83+
return null;
84+
});
7285
log.info("Update workflow instance {} state from: {} to {} success",
7386
workflowInstance.getName(),
7487
workflowInstance.getState().name(),

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowLineageServiceImpl.java

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -242,29 +242,38 @@ public List<DependentWorkflowDefinition> queryDownstreamDependentWorkflowDefinit
242242

243243
List<WorkflowDefinition> workflowDefinitionList =
244244
workflowDefinitionMapper.queryByCodes(workflowTaskLineageList.stream()
245-
.map(WorkflowTaskLineage::getDeptWorkflowDefinitionCode).distinct()
245+
.map(WorkflowTaskLineage::getWorkflowDefinitionCode).distinct()
246246
.collect(Collectors.toList()));
247247
List<TaskDefinition> taskDefinitionList = taskDefinitionMapper.queryByCodeList(workflowTaskLineageList.stream()
248-
.map(WorkflowTaskLineage::getDeptTaskDefinitionCode).distinct().collect(Collectors.toList()));
249-
for (TaskDefinition taskDefinition : taskDefinitionList) {
248+
.map(WorkflowTaskLineage::getTaskDefinitionCode).filter(code -> code != 0).distinct()
249+
.collect(Collectors.toList()));
250+
251+
for (WorkflowTaskLineage workflowLineage : workflowTaskLineageList) {
250252
DependentWorkflowDefinition dependentWorkflowDefinition = new DependentWorkflowDefinition();
251-
workflowTaskLineageList.stream()
252-
.filter(workflowLineage -> workflowLineage.getDeptTaskDefinitionCode() == taskDefinition.getCode())
253-
.findFirst()
254-
.ifPresent(workflowLineage -> {
255-
dependentWorkflowDefinition
256-
.setWorkflowDefinitionCode(workflowLineage.getDeptWorkflowDefinitionCode());
257-
dependentWorkflowDefinition.setTaskDefinitionCode(taskDefinition.getCode());
258-
dependentWorkflowDefinition.setTaskParams(taskDefinition.getTaskParams());
259-
dependentWorkflowDefinition.setWorkerGroup(taskDefinition.getWorkerGroup());
260-
});
253+
dependentWorkflowDefinition.setWorkflowDefinitionCode(workflowLineage.getWorkflowDefinitionCode());
254+
dependentWorkflowDefinition.setTaskDefinitionCode(workflowLineage.getTaskDefinitionCode());
255+
256+
// If taskDefinitionCode is 0, it means dependency on entire workflow, taskParams and workerGroup remain
257+
// null
258+
if (workflowLineage.getTaskDefinitionCode() != 0) {
259+
taskDefinitionList.stream()
260+
.filter(taskDefinition -> taskDefinition.getCode() == workflowLineage.getTaskDefinitionCode())
261+
.findFirst()
262+
.ifPresent(taskDefinition -> {
263+
dependentWorkflowDefinition.setTaskParams(taskDefinition.getTaskParams());
264+
dependentWorkflowDefinition.setWorkerGroup(taskDefinition.getWorkerGroup());
265+
});
266+
}
267+
261268
workflowDefinitionList.stream()
262-
.filter(workflowDefinition -> workflowDefinition.getCode() == dependentWorkflowDefinition
269+
.filter(workflowDefinition -> workflowDefinition.getCode() == workflowLineage
263270
.getWorkflowDefinitionCode())
264271
.findFirst()
265272
.ifPresent(workflowDefinition -> {
266273
dependentWorkflowDefinition.setWorkflowDefinitionVersion(workflowDefinition.getVersion());
267274
});
275+
276+
dependentWorkflowDefinitionList.add(dependentWorkflowDefinition);
268277
}
269278

270279
return dependentWorkflowDefinitionList;
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.api.service.impl;
19+
20+
import static org.assertj.core.api.Assertions.assertThat;
21+
import static org.mockito.Mockito.verify;
22+
import static org.mockito.Mockito.verifyNoInteractions;
23+
import static org.mockito.Mockito.when;
24+
25+
import org.apache.dolphinscheduler.common.constants.Constants;
26+
import org.apache.dolphinscheduler.dao.entity.DependentWorkflowDefinition;
27+
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
28+
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
29+
import org.apache.dolphinscheduler.dao.entity.WorkflowTaskLineage;
30+
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
31+
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;
32+
import org.apache.dolphinscheduler.dao.repository.WorkflowTaskLineageDao;
33+
34+
import java.util.Arrays;
35+
import java.util.Collections;
36+
import java.util.List;
37+
38+
import org.junit.jupiter.api.BeforeEach;
39+
import org.junit.jupiter.api.Test;
40+
import org.junit.jupiter.api.extension.ExtendWith;
41+
import org.mockito.InjectMocks;
42+
import org.mockito.Mock;
43+
import org.mockito.junit.jupiter.MockitoExtension;
44+
import org.springframework.test.util.ReflectionTestUtils;
45+
46+
@ExtendWith(MockitoExtension.class)
47+
class WorkflowLineageServiceImplTest {
48+
49+
@InjectMocks
50+
private WorkflowLineageServiceImpl workflowLineageService;
51+
52+
@Mock
53+
private WorkflowTaskLineageDao workflowTaskLineageDao;
54+
55+
@Mock
56+
private WorkflowDefinitionMapper workflowDefinitionMapper;
57+
58+
@Mock
59+
private TaskDefinitionMapper taskDefinitionMapper;
60+
61+
@BeforeEach
62+
void setUp() {
63+
ReflectionTestUtils.setField(workflowLineageService, "workflowTaskLineageDao", workflowTaskLineageDao);
64+
ReflectionTestUtils.setField(workflowLineageService, "workflowDefinitionMapper", workflowDefinitionMapper);
65+
ReflectionTestUtils.setField(workflowLineageService, "taskDefinitionMapper", taskDefinitionMapper);
66+
}
67+
68+
@Test
69+
void shouldReturnEmptyListWhenNoLineageExist() {
70+
long workflowCode = 100L;
71+
when(workflowTaskLineageDao
72+
.queryWorkFlowLineageByDept(Constants.DEFAULT_PROJECT_CODE, workflowCode, Constants.DEPENDENT_ALL_TASK))
73+
.thenReturn(Collections.emptyList());
74+
75+
List<DependentWorkflowDefinition> result =
76+
workflowLineageService.queryDownstreamDependentWorkflowDefinitions(workflowCode);
77+
78+
assertThat(result).isEmpty();
79+
verifyNoInteractions(workflowDefinitionMapper, taskDefinitionMapper);
80+
}
81+
82+
@Test
83+
void shouldBuildDependentWorkflowDefinitions() {
84+
long upstreamWorkflowCode = 1L;
85+
86+
WorkflowTaskLineage taskLineage = new WorkflowTaskLineage();
87+
taskLineage.setWorkflowDefinitionCode(200L);
88+
taskLineage.setDeptWorkflowDefinitionCode(upstreamWorkflowCode);
89+
taskLineage.setTaskDefinitionCode(300L);
90+
taskLineage.setDeptTaskDefinitionCode(0L);
91+
92+
WorkflowTaskLineage workflowLineage = new WorkflowTaskLineage();
93+
workflowLineage.setWorkflowDefinitionCode(201L);
94+
workflowLineage.setDeptWorkflowDefinitionCode(upstreamWorkflowCode);
95+
workflowLineage.setTaskDefinitionCode(0L);
96+
97+
when(workflowTaskLineageDao
98+
.queryWorkFlowLineageByDept(Constants.DEFAULT_PROJECT_CODE, upstreamWorkflowCode,
99+
Constants.DEPENDENT_ALL_TASK))
100+
.thenReturn(Arrays.asList(taskLineage, workflowLineage));
101+
102+
WorkflowDefinition workflowDefinition200 = new WorkflowDefinition();
103+
workflowDefinition200.setCode(200L);
104+
workflowDefinition200.setVersion(3);
105+
106+
WorkflowDefinition workflowDefinition201 = new WorkflowDefinition();
107+
workflowDefinition201.setCode(201L);
108+
workflowDefinition201.setVersion(4);
109+
110+
when(workflowDefinitionMapper.queryByCodes(Arrays.asList(200L, 201L)))
111+
.thenReturn(Arrays.asList(workflowDefinition200, workflowDefinition201));
112+
113+
TaskDefinition taskDefinition = new TaskDefinition();
114+
taskDefinition.setCode(300L);
115+
taskDefinition.setTaskParams("task-params");
116+
taskDefinition.setWorkerGroup("test-group");
117+
118+
when(taskDefinitionMapper.queryByCodeList(Collections.singletonList(300L)))
119+
.thenReturn(Collections.singletonList(taskDefinition));
120+
121+
List<DependentWorkflowDefinition> result =
122+
workflowLineageService.queryDownstreamDependentWorkflowDefinitions(upstreamWorkflowCode);
123+
124+
assertThat(result).hasSize(2);
125+
126+
DependentWorkflowDefinition taskDependent = result.stream()
127+
.filter(dependent -> dependent.getWorkflowDefinitionCode() == 200L)
128+
.findFirst()
129+
.orElseThrow(() -> new AssertionError("Expected DependentWorkflowDefinition with code 200 not found"));
130+
assertThat(taskDependent.getTaskDefinitionCode()).isEqualTo(300L);
131+
assertThat(taskDependent.getTaskParams()).isEqualTo("task-params");
132+
assertThat(taskDependent.getWorkerGroup()).isEqualTo("test-group");
133+
assertThat(taskDependent.getWorkflowDefinitionVersion()).isEqualTo(3);
134+
135+
DependentWorkflowDefinition workflowDependent = result.stream()
136+
.filter(dependent -> dependent.getWorkflowDefinitionCode() == 201L)
137+
.findFirst()
138+
.orElseThrow(() -> new AssertionError("Expected DependentWorkflowDefinition with code 201 not found"));
139+
assertThat(workflowDependent.getTaskDefinitionCode()).isEqualTo(0L);
140+
assertThat(workflowDependent.getTaskParams()).isNull();
141+
assertThat(workflowDependent.getWorkerGroup()).isNull();
142+
assertThat(workflowDependent.getWorkflowDefinitionVersion()).isEqualTo(4);
143+
144+
verify(taskDefinitionMapper).queryByCodeList(Collections.singletonList(300L));
145+
}
146+
}

dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ public enum CommandType {
8989
STOP(9, "stop a workflow"),
9090
/**
9191
* Recover from the serial-wait state.
92-
* todo: We may need to remove these command, and use the workflow instance origin command type when notify from serial wait.
9392
*/
9493
RECOVER_SERIAL_WAIT(11, "recover serial wait"),
9594
/**

dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionTypeEnum.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
public enum WorkflowExecutionTypeEnum {
2828

2929
PARALLEL(0, "parallel"),
30-
// todo: the serial is unstable, so we don't support them now
3130
SERIAL_WAIT(1, "serial wait"),
3231
SERIAL_DISCARD(2, "serial discard"),
3332
SERIAL_PRIORITY(3, "serial priority");
@@ -56,4 +55,8 @@ public static WorkflowExecutionTypeEnum of(int executionType) {
5655
throw new IllegalArgumentException("invalid status : " + executionType);
5756
}
5857

58+
public boolean isSerial() {
59+
return this != PARALLEL;
60+
}
61+
5962
}

0 commit comments

Comments
 (0)