Skip to content

Commit 40ed67f

Browse files
authored
[DSIP-92][Master] Refactor workflow serial strategy (#17531)
1 parent f5535dc commit 40ed67f

File tree

49 files changed

+1516
-115
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+1516
-115
lines changed

.github/workflows/backend.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ jobs:
143143
strategy:
144144
fail-fast: false
145145
matrix:
146-
version: ["3.1.9", "3.2.0"]
146+
version: ["3.1.9", "3.2.0", "3.3.1"]
147147
case:
148148
- name: schema-check-with-mysql
149149
script: .github/workflows/schema-check/mysql/start-job.sh

docs/docs/en/guide/upgrade/incompatible.md

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,23 @@
22

33
This document records the incompatible updates between each version. You need to check this document before you upgrade to related version.
44

5-
## dev
6-
7-
* Upgrade mysql driver version from 8.0.16 to 8.0.33 ([#14684](https://github.com/apache/dolphinscheduler/pull/14684))
8-
* Change env `PYTHON_HOME` to `PYTHON_LAUNCHER` and `DATAX_HOME` to `DATAX_LAUNCHER` ([#14523](https://github.com/apache/dolphinscheduler/pull/14523))
9-
* Change regex matching sql params in SQL task plugin ([#13378](https://github.com/apache/dolphinscheduler/pull/13378))
10-
* Remove the spark version of spark task ([#11860](https://github.com/apache/dolphinscheduler/pull/11860)).
11-
* Change the default unix shell executor from sh to bash ([#12180](https://github.com/apache/dolphinscheduler/pull/12180)).
12-
* Remove `deleteSource` in `download()` of `StorageOperate` ([#14084](https://github.com/apache/dolphinscheduler/pull/14084))
13-
* Remove default key for attribute `data-quality.jar.name` in `common.properties` ([#15551](https://github.com/apache/dolphinscheduler/pull/15551))
14-
* Rename attribute `data-quality.jar.name` to `data-quality.jar.dir` in `common.properties` and represent for directory ([#15563](https://github.com/apache/dolphinscheduler/pull/15563))
15-
16-
## 3.2.0
17-
18-
* Remove parameter `description` from public interfaces of new resource center ([#14394](https://github.com/apache/dolphinscheduler/pull/14394))
19-
205
## 3.0.0
216

227
* Copy and import workflow without 'copy' suffix [#10607](https://github.com/apache/dolphinscheduler/pull/10607)
238
* Use semicolon as default sql segment separator [#10869](https://github.com/apache/dolphinscheduler/pull/10869)
249

2510
## 3.2.0
2611

12+
* Rename attribute `data-quality.jar.name` to `data-quality.jar.dir` in `common.properties` and represent for directory ([#15563](https://github.com/apache/dolphinscheduler/pull/15563))
13+
* Remove default key for attribute `data-quality.jar.name` in `common.properties` ([#15551](https://github.com/apache/dolphinscheduler/pull/15551))
14+
* Remove `deleteSource` in `download()` of `StorageOperate` ([#14084](https://github.com/apache/dolphinscheduler/pull/14084))
15+
* Change the default unix shell executor from sh to bash ([#12180](https://github.com/apache/dolphinscheduler/pull/12180)).
16+
* Remove the spark version of spark task ([#11860](https://github.com/apache/dolphinscheduler/pull/11860)).
17+
* Change regex matching sql params in SQL task plugin ([#13378](https://github.com/apache/dolphinscheduler/pull/13378))
18+
* Change env `PYTHON_HOME` to `PYTHON_LAUNCHER` and `DATAX_HOME` to `DATAX_LAUNCHER` ([#14523](https://github.com/apache/dolphinscheduler/pull/14523))
19+
* Upgrade mysql driver version from 8.0.16 to 8.0.33 ([#14684](https://github.com/apache/dolphinscheduler/pull/14684))
2720
* Add required field `database` in /datasources/tables && /datasources/tableColumns Api [#14406](https://github.com/apache/dolphinscheduler/pull/14406)
21+
* Remove parameter `description` from public interfaces of new resource center ([#14394](https://github.com/apache/dolphinscheduler/pull/14394))
2822

2923
## 3.3.0
3024

@@ -41,4 +35,5 @@ This document records the incompatible updates between each version. You need to
4135
## 3.4.0
4236

4337
* Renamed the publicKey field to privateKey in the SSH connection parameters under the datasource configuration. ([#17666])(https://github.com/apache/dolphinscheduler/pull/17666)
38+
* Add table t_ds_serial_command. ([#17531])(https://github.com/apache/dolphinscheduler/pull/17531)
4439

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowCreateRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public class WorkflowCreateRequest {
5656
private int timeout;
5757

5858
@Schema(allowableValues = "PARALLEL / SERIAL_WAIT / SERIAL_DISCARD / SERIAL_PRIORITY", example = "PARALLEL", description = "default PARALLEL if not provide.")
59-
private String executionType;
59+
private String executionType = WorkflowExecutionTypeEnum.PARALLEL.name();
6060

6161
public WorkflowDefinition convert2WorkflowDefinition() {
6262
WorkflowDefinition workflowDefinition = new WorkflowDefinition();

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/PauseWorkflowInstanceExecutorDelegate.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
2222
import org.apache.dolphinscheduler.dao.entity.User;
2323
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
24+
import org.apache.dolphinscheduler.dao.repository.SerialCommandDao;
2425
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
2526
import org.apache.dolphinscheduler.extract.base.client.Clients;
2627
import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient;
@@ -31,6 +32,7 @@
3132

3233
import org.springframework.beans.factory.annotation.Autowired;
3334
import org.springframework.stereotype.Component;
35+
import org.springframework.transaction.support.TransactionTemplate;
3436

3537
@Slf4j
3638
@Component
@@ -41,6 +43,12 @@ public class PauseWorkflowInstanceExecutorDelegate
4143
@Autowired
4244
private WorkflowInstanceDao workflowInstanceDao;
4345

46+
@Autowired
47+
private TransactionTemplate transactionTemplate;
48+
49+
@Autowired
50+
private SerialCommandDao serialCommandDao;
51+
4452
@Override
4553
public Void execute(PauseWorkflowInstanceOperation workflowInstanceControlRequest) {
4654
final WorkflowInstance workflowInstance = workflowInstanceControlRequest.workflowInstance;
@@ -64,10 +72,15 @@ private void exceptionIfWorkflowInstanceCannotPause(WorkflowInstance workflowIns
6472
}
6573

6674
private void directPauseInDB(WorkflowInstance workflowInstance) {
67-
workflowInstanceDao.updateWorkflowInstanceState(
68-
workflowInstance.getId(),
69-
workflowInstance.getState(),
70-
WorkflowExecutionStatus.PAUSE);
75+
// todo: move the pause logic to master
76+
transactionTemplate.execute(status -> {
77+
workflowInstanceDao.updateWorkflowInstanceState(
78+
workflowInstance.getId(),
79+
workflowInstance.getState(),
80+
WorkflowExecutionStatus.PAUSE);
81+
serialCommandDao.deleteByWorkflowInstanceId(workflowInstance.getId());
82+
return null;
83+
});
7184
log.info("Update workflow instance {} state from: {} to {} success",
7285
workflowInstance.getName(),
7386
workflowInstance.getState().name(),

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/StopWorkflowInstanceExecutorDelegate.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
2222
import org.apache.dolphinscheduler.dao.entity.User;
2323
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
24+
import org.apache.dolphinscheduler.dao.repository.SerialCommandDao;
2425
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
2526
import org.apache.dolphinscheduler.extract.base.client.Clients;
2627
import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient;
@@ -31,6 +32,7 @@
3132

3233
import org.springframework.beans.factory.annotation.Autowired;
3334
import org.springframework.stereotype.Component;
35+
import org.springframework.transaction.support.TransactionTemplate;
3436

3537
@Slf4j
3638
@Component
@@ -41,6 +43,12 @@ public class StopWorkflowInstanceExecutorDelegate
4143
@Autowired
4244
private WorkflowInstanceDao workflowInstanceDao;
4345

46+
@Autowired
47+
private TransactionTemplate transactionTemplate;
48+
49+
@Autowired
50+
private SerialCommandDao serialCommandDao;
51+
4452
@Override
4553
public Void execute(StopWorkflowInstanceOperation workflowInstanceControlRequest) {
4654
final WorkflowInstance workflowInstance = workflowInstanceControlRequest.workflowInstance;
@@ -65,10 +73,15 @@ void exceptionIfWorkflowInstanceCannotStop(WorkflowInstance workflowInstance) {
6573
}
6674

6775
void directStopInDB(WorkflowInstance workflowInstance) {
68-
workflowInstanceDao.updateWorkflowInstanceState(
69-
workflowInstance.getId(),
70-
workflowInstance.getState(),
71-
WorkflowExecutionStatus.STOP);
76+
// todo: move the stop logic to master
77+
transactionTemplate.execute(status -> {
78+
workflowInstanceDao.updateWorkflowInstanceState(
79+
workflowInstance.getId(),
80+
workflowInstance.getState(),
81+
WorkflowExecutionStatus.STOP);
82+
serialCommandDao.deleteByWorkflowInstanceId(workflowInstance.getId());
83+
return null;
84+
});
7285
log.info("Update workflow instance {} state from: {} to {} success",
7386
workflowInstance.getName(),
7487
workflowInstance.getState().name(),

dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ public enum CommandType {
8989
STOP(9, "stop a workflow"),
9090
/**
9191
* Recover from the serial-wait state.
92-
* todo: We may need to remove these command, and use the workflow instance origin command type when notify from serial wait.
9392
*/
9493
RECOVER_SERIAL_WAIT(11, "recover serial wait"),
9594
/**

dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionTypeEnum.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
public enum WorkflowExecutionTypeEnum {
2828

2929
PARALLEL(0, "parallel"),
30-
// todo: the serial is unstable, so we don't support them now
3130
SERIAL_WAIT(1, "serial wait"),
3231
SERIAL_DISCARD(2, "serial discard"),
3332
SERIAL_PRIORITY(3, "serial priority");
@@ -56,4 +55,8 @@ public static WorkflowExecutionTypeEnum of(int executionType) {
5655
throw new IllegalArgumentException("invalid status : " + executionType);
5756
}
5857

58+
public boolean isSerial() {
59+
return this != PARALLEL;
60+
}
61+
5962
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.dao.entity;
19+
20+
import java.util.Date;
21+
22+
import lombok.AllArgsConstructor;
23+
import lombok.Builder;
24+
import lombok.Data;
25+
import lombok.NoArgsConstructor;
26+
27+
import com.baomidou.mybatisplus.annotation.IdType;
28+
import com.baomidou.mybatisplus.annotation.TableId;
29+
import com.baomidou.mybatisplus.annotation.TableName;
30+
31+
@Data
32+
@Builder
33+
@NoArgsConstructor
34+
@AllArgsConstructor
35+
@TableName("t_ds_serial_command")
36+
public class SerialCommand {
37+
38+
@TableId(value = "id", type = IdType.AUTO)
39+
private Integer id;
40+
41+
private Integer workflowInstanceId;
42+
43+
private Long workflowDefinitionCode;
44+
45+
private Integer workflowDefinitionVersion;
46+
47+
private String command;
48+
49+
private int state;
50+
51+
private Date createTime;
52+
53+
private Date updateTime;
54+
55+
}

dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkflowInstance.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -210,11 +210,6 @@ public CommandType getCmdTypeIfComplement() {
210210
return commandType;
211211
}
212212

213-
/**
214-
* set state with desc
215-
* @param state
216-
* @param stateDesc
217-
*/
218213
public void setStateWithDesc(WorkflowExecutionStatus state, String stateDesc) {
219214
this.setState(state);
220215
if (StringUtils.isEmpty(this.getStateHistory())) {
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.dao.mapper;
19+
20+
import org.apache.dolphinscheduler.dao.entity.SerialCommand;
21+
22+
import org.apache.ibatis.annotations.Param;
23+
24+
import java.util.List;
25+
26+
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
27+
28+
public interface SerialCommandMapper extends BaseMapper<SerialCommand> {
29+
30+
List<SerialCommand> fetchSerialCommands(@Param("fetchSize") int fetchSize);
31+
32+
int deleteByWorkflowInstanceId(@Param("workflowInstanceId") int workflowInstanceId);
33+
34+
}

0 commit comments

Comments
 (0)