Skip to content

Commit 8a91e49

Browse files
authored
Merge branch 'dev' into Improvement-18045
2 parents 55dde4a + 6472576 commit 8a91e49

File tree

38 files changed

+479
-171
lines changed

38 files changed

+479
-171
lines changed

dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,20 @@
2525
import java.io.Serializable;
2626
import java.util.Date;
2727

28+
import lombok.AllArgsConstructor;
29+
import lombok.Builder;
2830
import lombok.Data;
31+
import lombok.NoArgsConstructor;
2932

3033
import com.baomidou.mybatisplus.annotation.IdType;
3134
import com.baomidou.mybatisplus.annotation.TableField;
3235
import com.baomidou.mybatisplus.annotation.TableId;
3336
import com.baomidou.mybatisplus.annotation.TableName;
3437

3538
@Data
39+
@Builder
40+
@AllArgsConstructor
41+
@NoArgsConstructor
3642
@TableName("t_ds_task_instance")
3743
public class TaskInstance implements Serializable {
3844

@@ -132,10 +138,4 @@ public class TaskInstance implements Serializable {
132138

133139
private TaskExecuteType taskExecuteType;
134140

135-
public void init(String host, Date startTime, String executePath) {
136-
this.host = host;
137-
this.startTime = startTime;
138-
this.executePath = executePath;
139-
}
140-
141141
}

dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@
241241
</include>
242242
from t_ds_task_instance instance
243243
join (
244-
select task_code, max(end_time) as max_end_time
244+
select task_code, max(end_time) as max_end_time, workflow_instance_id
245245
from t_ds_task_instance
246246
where 1=1
247247
and workflow_instance_id = #{workflowInstanceId}

dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImplTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,7 @@
3636

3737
import org.junit.jupiter.api.RepeatedTest;
3838
import org.springframework.beans.factory.annotation.Autowired;
39-
import org.springframework.test.annotation.DirtiesContext;
4039

41-
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
4240
class CommandDaoImplTest extends BaseDaoTest {
4341

4442
@Autowired
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.dao.repository.impl;
19+
20+
import static org.junit.jupiter.api.Assertions.assertEquals;
21+
import static org.junit.jupiter.api.Assertions.assertNotNull;
22+
23+
import org.apache.dolphinscheduler.common.enums.Flag;
24+
import org.apache.dolphinscheduler.dao.BaseDaoTest;
25+
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
26+
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
27+
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
28+
29+
import java.util.Arrays;
30+
import java.util.Date;
31+
import java.util.HashSet;
32+
import java.util.List;
33+
import java.util.Set;
34+
35+
import org.junit.jupiter.api.Test;
36+
import org.springframework.beans.factory.annotation.Autowired;
37+
38+
class TaskInstanceDaoImplTest extends BaseDaoTest {
39+
40+
private static final int WORKFLOW_INSTANCE_ID = 1;
41+
private static final long EXTRACT_TASK = 8001L;
42+
private static final long TRANSFORM_TASK = 8002L;
43+
44+
@Autowired
45+
private TaskInstanceDao taskInstanceDao;
46+
47+
@Test
48+
void queryLastTaskInstanceListIntervalInWorkflowInstance() {
49+
Date earlier = new Date(System.currentTimeMillis() - 3600_000);
50+
Date later = new Date();
51+
52+
insertTaskInstance(EXTRACT_TASK, TaskExecutionStatus.SUCCESS, earlier);
53+
insertTaskInstance(EXTRACT_TASK, TaskExecutionStatus.SUCCESS, later);
54+
insertTaskInstance(TRANSFORM_TASK, TaskExecutionStatus.SUCCESS, later);
55+
56+
Set<Long> taskCodes = new HashSet<>(Arrays.asList(EXTRACT_TASK, TRANSFORM_TASK));
57+
List<TaskInstance> result = taskInstanceDao.queryLastTaskInstanceListIntervalInWorkflowInstance(
58+
WORKFLOW_INSTANCE_ID, taskCodes);
59+
60+
assertEquals(2, result.size());
61+
TaskInstance extractResult = result.stream()
62+
.filter(ti -> ti.getTaskCode() == EXTRACT_TASK)
63+
.findFirst().orElse(null);
64+
assertNotNull(extractResult);
65+
assertEquals(later.getTime() / 1000, extractResult.getEndTime().getTime() / 1000);
66+
}
67+
68+
@Test
69+
void queryLastTaskInstanceIntervalInWorkflowInstance() {
70+
Date earlier = new Date(System.currentTimeMillis() - 3600_000);
71+
Date later = new Date();
72+
73+
insertTaskInstance(EXTRACT_TASK, TaskExecutionStatus.SUCCESS, earlier);
74+
insertTaskInstance(EXTRACT_TASK, TaskExecutionStatus.SUCCESS, later);
75+
76+
TaskInstance result = taskInstanceDao.queryLastTaskInstanceIntervalInWorkflowInstance(
77+
WORKFLOW_INSTANCE_ID, EXTRACT_TASK);
78+
assertNotNull(result);
79+
assertEquals(later.getTime() / 1000, result.getEndTime().getTime() / 1000);
80+
}
81+
82+
private void insertTaskInstance(long taskCode, TaskExecutionStatus state, Date endTime) {
83+
TaskInstance ti = TaskInstance.builder()
84+
.name("shell-task-" + taskCode)
85+
.taskType("SHELL")
86+
.workflowInstanceId(WORKFLOW_INSTANCE_ID)
87+
.workflowInstanceName("daily-etl-pipeline")
88+
.taskCode(taskCode)
89+
.taskDefinitionVersion(1)
90+
.state(state)
91+
.flag(Flag.YES)
92+
.submitTime(new Date())
93+
.firstSubmitTime(new Date())
94+
.startTime(new Date())
95+
.endTime(endTime)
96+
.host("192.168.1.50:5678")
97+
.executePath("/tmp/dolphinscheduler/exec/" + WORKFLOW_INSTANCE_ID + "/" + taskCode)
98+
.logPath("/tmp/dolphinscheduler/logs/" + WORKFLOW_INSTANCE_ID + "/" + taskCode + ".log")
99+
.build();
100+
taskInstanceDao.upsertTaskInstance(ti);
101+
}
102+
}

dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/DataSourceProcessor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public interface DataSourceProcessor {
5151
/**
5252
* create BaseDataSourceParamDTO by connectionJson
5353
*
54-
* @param connectionJson see{@link org.apache.dolphinscheduler.dao.entity.Datasource}
54+
* @param connectionJson see{@link org.apache.dolphinscheduler.dao.entity.DataSource}
5555
* @return {@link BaseDataSourceParamDTO}
5656
*/
5757
BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson);
@@ -67,7 +67,7 @@ public interface DataSourceProcessor {
6767
* deserialize json to datasource connection param
6868
*
6969
* @param connectionJson {@code org.apache.dolphinscheduler.dao.entity.DataSource.connectionParams}
70-
* @return {@link BaseConnectionParam}
70+
* @return {@link ConnectionParam}
7171
*/
7272
ConnectionParam createConnectionParams(String connectionJson);
7373

@@ -96,7 +96,7 @@ public interface DataSourceProcessor {
9696
* @return {@link Connection}
9797
*/
9898
// todo: Change to return a ConnectionWrapper
99-
Connection getConnection(ConnectionParam connectionParam) throws ClassNotFoundException, SQLException, IOException;
99+
Connection getConnection(ConnectionParam connectionParam) throws SQLException, IOException;
100100

101101
/**
102102
* test connection
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.plugin.datasource.api.datasource;
19+
20+
import java.sql.Connection;
21+
import java.sql.SQLException;
22+
23+
public interface JdbcConnectionProvider {
24+
25+
Connection getConnection() throws SQLException;
26+
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.plugin.datasource.api.datasource;
19+
20+
import java.sql.Connection;
21+
import java.sql.Driver;
22+
import java.sql.SQLException;
23+
import java.util.Properties;
24+
25+
import lombok.Builder;
26+
27+
@Builder
28+
public class JdbcDriverConnectionProvider implements JdbcConnectionProvider {
29+
30+
private final String jdbcDriverClassName;
31+
private final String jdbcUrl;
32+
private final String username;
33+
private final String password;
34+
private final Properties properties;
35+
36+
@Override
37+
public Connection getConnection() throws SQLException {
38+
try {
39+
Driver driver = (Driver) Class.forName(jdbcDriverClassName).getDeclaredConstructor().newInstance();
40+
Properties p = new Properties(properties);
41+
if (username != null) {
42+
p.setProperty("user", username);
43+
}
44+
if (password != null) {
45+
p.setProperty("password", password);
46+
}
47+
return driver.connect(jdbcUrl, p);
48+
} catch (ReflectiveOperationException e) {
49+
throw new SQLException("Failed to instantiate driver: " + jdbcDriverClassName, e);
50+
}
51+
}
52+
}

dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-athena/src/main/java/org/apache/dolphinscheduler/plugin/datasource/athena/param/AthenaDataSourceProcessor.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor;
2424
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
2525
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
26+
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.JdbcDriverConnectionProvider;
2627
import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
2728
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
2829
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
@@ -31,7 +32,6 @@
3132
import org.apache.commons.collections4.MapUtils;
3233

3334
import java.sql.Connection;
34-
import java.sql.DriverManager;
3535
import java.sql.SQLException;
3636
import java.util.ArrayList;
3737
import java.util.List;
@@ -106,11 +106,15 @@ public String getJdbcUrl(ConnectionParam connectionParam) {
106106
}
107107

108108
@Override
109-
public Connection getConnection(ConnectionParam connectionParam) throws ClassNotFoundException, SQLException {
109+
public Connection getConnection(ConnectionParam connectionParam) throws SQLException {
110110
AthenaConnectionParam athenaConnectionParam = (AthenaConnectionParam) connectionParam;
111-
Class.forName(this.getDatasourceDriver());
112-
return DriverManager.getConnection(this.getJdbcUrl(connectionParam),
113-
athenaConnectionParam.getUser(), PasswordUtils.decodePassword(athenaConnectionParam.getPassword()));
111+
return JdbcDriverConnectionProvider.builder()
112+
.jdbcDriverClassName(getDatasourceDriver())
113+
.jdbcUrl(getJdbcUrl(athenaConnectionParam))
114+
.username(athenaConnectionParam.getUser())
115+
.password(PasswordUtils.decodePassword(athenaConnectionParam.getPassword()))
116+
.build()
117+
.getConnection();
114118
}
115119

116120
@Override

dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-azure-sql/src/main/java/org/apache/dolphinscheduler/plugin/datasource/azuresql/param/AzureSQLDataSourceProcessor.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor;
2424
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
2525
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
26+
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.JdbcDriverConnectionProvider;
2627
import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
2728
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
2829
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
@@ -33,7 +34,6 @@
3334

3435
import java.net.MalformedURLException;
3536
import java.sql.Connection;
36-
import java.sql.DriverManager;
3737
import java.sql.SQLException;
3838
import java.util.HashMap;
3939
import java.util.HashSet;
@@ -147,16 +147,20 @@ public String getJdbcUrl(ConnectionParam connectionParam) {
147147
}
148148

149149
@Override
150-
public Connection getConnection(ConnectionParam connectionParam) throws ClassNotFoundException, SQLException {
150+
public Connection getConnection(ConnectionParam connectionParam) throws SQLException {
151151
AzureSQLConnectionParam azureSQLConnectionParam = (AzureSQLConnectionParam) connectionParam;
152152
// token access way
153153
if (azureSQLConnectionParam.getMode().equals(AzureSQLAuthMode.ACCESSTOKEN)) {
154154
return tokenGetConnection(azureSQLConnectionParam);
155155
}
156156
// normal way
157-
Class.forName(getDatasourceDriver());
158-
return DriverManager.getConnection(getJdbcUrl(connectionParam), azureSQLConnectionParam.getUser(),
159-
PasswordUtils.decodePassword(azureSQLConnectionParam.getPassword()));
157+
return JdbcDriverConnectionProvider.builder()
158+
.jdbcDriverClassName(getDatasourceDriver())
159+
.jdbcUrl(getJdbcUrl(azureSQLConnectionParam))
160+
.username(azureSQLConnectionParam.getUser())
161+
.password(PasswordUtils.decodePassword(azureSQLConnectionParam.getPassword()))
162+
.build()
163+
.getConnection();
160164
}
161165

162166
@Override
@@ -255,6 +259,7 @@ private String processAuthMode(String jdbcUrl, AzureSQLDataSourceParamDTO param)
255259

256260
/**
257261
* by default, add {"trustServerCertificate":true} to other to deal with SSL trust issue
262+
*
258263
* @param paramDTO
259264
*/
260265
private void checkTrustServerCertificate(BaseDataSourceParamDTO paramDTO) {

dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-clickhouse/src/main/java/org/apache/dolphinscheduler/plugin/datasource/clickhouse/param/ClickHouseDataSourceProcessor.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,14 @@
2323
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor;
2424
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
2525
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
26+
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.JdbcDriverConnectionProvider;
2627
import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
2728
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
2829
import org.apache.dolphinscheduler.spi.enums.DbType;
2930

3031
import org.apache.commons.collections4.MapUtils;
3132

3233
import java.sql.Connection;
33-
import java.sql.DriverManager;
3434
import java.sql.SQLException;
3535
import java.util.ArrayList;
3636
import java.util.List;
@@ -109,12 +109,15 @@ public String getJdbcUrl(ConnectionParam connectionParam) {
109109
}
110110

111111
@Override
112-
public Connection getConnection(ConnectionParam connectionParam) throws ClassNotFoundException, SQLException {
112+
public Connection getConnection(ConnectionParam connectionParam) throws SQLException {
113113
ClickHouseConnectionParam clickhouseConnectionParam = (ClickHouseConnectionParam) connectionParam;
114-
Class.forName(getDatasourceDriver());
115-
return DriverManager.getConnection(getJdbcUrl(clickhouseConnectionParam),
116-
clickhouseConnectionParam.getUser(),
117-
PasswordUtils.decodePassword(clickhouseConnectionParam.getPassword()));
114+
return JdbcDriverConnectionProvider.builder()
115+
.jdbcDriverClassName(getDatasourceDriver())
116+
.jdbcUrl(getJdbcUrl(clickhouseConnectionParam))
117+
.username(clickhouseConnectionParam.getUser())
118+
.password(PasswordUtils.decodePassword(clickhouseConnectionParam.getPassword()))
119+
.build()
120+
.getConnection();
118121
}
119122

120123
@Override

0 commit comments

Comments
 (0)