Skip to content

Commit e348d20

Browse files
authored
Merge branch 'apache:dev' into dev
2 parents 8b84ee1 + d1fddfb commit e348d20

File tree

18 files changed

+71
-208
lines changed

18 files changed

+71
-208
lines changed

.asf.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ github:
5252
- "Title Validator"
5353
required_pull_request_reviews:
5454
dismiss_stale_reviews: true
55-
required_approving_review_count: 2
55+
required_approving_review_count: 1
5656
features:
5757
# Enable issues management
5858
issues: true

dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-webexteams/pom.xml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,6 @@
3737
<scope>provided</scope>
3838
</dependency>
3939

40-
<dependency>
41-
<groupId>org.apache.dolphinscheduler</groupId>
42-
<artifactId>dolphinscheduler-spi</artifactId>
43-
<scope>provided</scope>
44-
</dependency>
45-
4640
<dependency>
4741
<groupId>org.apache.dolphinscheduler</groupId>
4842
<artifactId>dolphinscheduler-alert-api</artifactId>

dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-wechat/pom.xml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,6 @@
3737
<scope>provided</scope>
3838
</dependency>
3939

40-
<dependency>
41-
<groupId>org.apache.dolphinscheduler</groupId>
42-
<artifactId>dolphinscheduler-spi</artifactId>
43-
<scope>provided</scope>
44-
</dependency>
45-
4640
<dependency>
4741
<groupId>org.apache.dolphinscheduler</groupId>
4842
<artifactId>dolphinscheduler-alert-api</artifactId>

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
2323
import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler;
2424
import org.apache.dolphinscheduler.dao.DaoConfiguration;
25-
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
25+
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourcePluginManager;
2626
import org.apache.dolphinscheduler.plugin.storage.api.StorageConfiguration;
2727
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
2828
import org.apache.dolphinscheduler.registry.api.RegistryConfiguration;
@@ -57,7 +57,7 @@ public static void main(String[] args) {
5757
public void run(ApplicationReadyEvent readyEvent) {
5858
ServerLifeCycleManager.toRunning();
5959
log.info("Received spring application context ready event will load taskPlugin and write to DB");
60-
DataSourceProcessorProvider.initialize();
60+
DataSourcePluginManager.loadDataSourcePlugin();
6161
TaskPluginManager.loadTaskPlugin();
6262
}
6363
}

dolphinscheduler-bom/pom.xml

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@
8686
<jsr305.version>3.0.0</jsr305.version>
8787
<commons-compress.version>1.27.1</commons-compress.version>
8888
<commons-math3.version>3.6.1</commons-math3.version>
89-
<error_prone_annotations.version>2.5.1</error_prone_annotations.version>
9089
<hibernate-validator.version>6.2.2.Final</hibernate-validator.version>
9190
<aws-sdk.version>1.12.300</aws-sdk.version>
9291
<aliyun-sdk-oss.version>3.15.1</aliyun-sdk-oss.version>
@@ -152,16 +151,6 @@
152151
<type>pom</type>
153152
<scope>import</scope>
154153
</dependency>
155-
<dependency>
156-
<groupId>io.grpc</groupId>
157-
<artifactId>grpc-api</artifactId>
158-
<version>${io.grpc.version}</version>
159-
</dependency>
160-
<dependency>
161-
<groupId>io.grpc</groupId>
162-
<artifactId>grpc-core</artifactId>
163-
<version>${io.grpc.version}</version>
164-
</dependency>
165154
<dependency>
166155
<groupId>io.grpc</groupId>
167156
<artifactId>grpc-services</artifactId>
@@ -740,11 +729,6 @@
740729
<version>${py4j.version}</version>
741730
</dependency>
742731

743-
<dependency>
744-
<groupId>com.google.code.findbugs</groupId>
745-
<artifactId>jsr305</artifactId>
746-
<version>${jsr305.version}</version>
747-
</dependency>
748732
<dependency>
749733
<groupId>org.apache.commons</groupId>
750734
<artifactId>commons-compress</artifactId>
@@ -755,11 +739,6 @@
755739
<artifactId>commons-math3</artifactId>
756740
<version>${commons-math3.version}</version>
757741
</dependency>
758-
<dependency>
759-
<groupId>com.google.errorprone</groupId>
760-
<artifactId>error_prone_annotations</artifactId>
761-
<version>${error_prone_annotations.version}</version>
762-
</dependency>
763742

764743
<!-- https://mvnrepository.com/artifact/org.hibernate.validator/hibernate-validator -->
765744
<dependency>
@@ -936,12 +915,6 @@
936915
<version>${zeppelin-client.version}</version>
937916
</dependency>
938917

939-
<dependency>
940-
<groupId>com.google.protobuf</groupId>
941-
<artifactId>protobuf-java</artifactId>
942-
<version>${protobuf.version}</version>
943-
</dependency>
944-
945918
<dependency>
946919
<groupId>com.huaweicloud</groupId>
947920
<artifactId>esdk-obs-java-bundle</artifactId>
@@ -1048,7 +1021,7 @@
10481021
<activeByDefault>true</activeByDefault>
10491022
</activation>
10501023
<properties>
1051-
<zookeeper.version>3.8.0</zookeeper.version>
1024+
<zookeeper.version>3.8.3</zookeeper.version>
10521025
<curator.version>5.5.0</curator.version>
10531026
</properties>
10541027
</profile>

dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/BaseAdHocDataSourceClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.dolphinscheduler.plugin.datasource.api.client;
1919

20-
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
20+
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourcePluginManager;
2121
import org.apache.dolphinscheduler.spi.datasource.AdHocDataSourceClient;
2222
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
2323
import org.apache.dolphinscheduler.spi.enums.DbType;
@@ -38,7 +38,7 @@ protected BaseAdHocDataSourceClient(BaseConnectionParam baseConnectionParam, DbT
3838
@Override
3939
public Connection getConnection() throws SQLException {
4040
try {
41-
return DataSourceProcessorProvider.getDataSourceProcessor(dbType).getConnection(baseConnectionParam);
41+
return DataSourcePluginManager.getDataSourceProcessor(dbType).getConnection(baseConnectionParam);
4242
} catch (Exception e) {
4343
throw new SQLException("Create adhoc connection error", e);
4444
}

dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceClientProvider.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,18 +57,13 @@ public class DataSourceClientProvider {
5757
})
5858
.maximumSize(100)
5959
.build();
60-
private static final DataSourcePluginManager dataSourcePluginManager = new DataSourcePluginManager();
61-
62-
static {
63-
dataSourcePluginManager.installPlugin();
64-
}
6560

6661
public static DataSourceClient getPooledDataSourceClient(DbType dbType,
6762
ConnectionParam connectionParam) throws ExecutionException {
6863
BaseConnectionParam baseConnectionParam = (BaseConnectionParam) connectionParam;
6964
String datasourceUniqueId = DataSourceUtils.getDatasourceUniqueId(baseConnectionParam, dbType);
7065
return POOLED_DATASOURCE_CLIENT_CACHE.get(datasourceUniqueId, () -> {
71-
DataSourceChannel dataSourceChannel = dataSourcePluginManager.getDataSourceChannel(dbType);
66+
DataSourceChannel dataSourceChannel = DataSourcePluginManager.getDataSourceChannel(dbType);
7267
if (null == dataSourceChannel) {
7368
throw new RuntimeException(String.format("datasource plugin '%s' is not found", dbType.getName()));
7469
}
@@ -83,7 +78,7 @@ public static Connection getPooledConnection(DbType dbType,
8378

8479
public static AdHocDataSourceClient getAdHocDataSourceClient(DbType dbType, ConnectionParam connectionParam) {
8580
BaseConnectionParam baseConnectionParam = (BaseConnectionParam) connectionParam;
86-
DataSourceChannel dataSourceChannel = dataSourcePluginManager.getDataSourceChannel(dbType);
81+
DataSourceChannel dataSourceChannel = DataSourcePluginManager.getDataSourceChannel(dbType);
8782
if (null == dataSourceChannel) {
8883
throw new RuntimeException(String.format("datasource plugin '%s' is not found", dbType.getName()));
8984
}

dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourcePluginManager.java

Lines changed: 47 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,47 +19,74 @@
1919

2020
import static java.lang.String.format;
2121

22+
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
2223
import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel;
2324
import org.apache.dolphinscheduler.spi.datasource.DataSourceChannelFactory;
2425
import org.apache.dolphinscheduler.spi.enums.DbType;
2526
import org.apache.dolphinscheduler.spi.plugin.PrioritySPIFactory;
2627

28+
import org.apache.commons.collections4.MapUtils;
29+
2730
import java.util.Map;
31+
import java.util.ServiceLoader;
2832
import java.util.concurrent.ConcurrentHashMap;
2933

34+
import lombok.NonNull;
3035
import lombok.extern.slf4j.Slf4j;
3136

3237
@Slf4j
3338
public class DataSourcePluginManager {
3439

35-
private final Map<String, DataSourceChannel> datasourceChannelMap = new ConcurrentHashMap<>();
36-
37-
public DataSourceChannel getDataSourceChannel(final DbType dbType) {
38-
return datasourceChannelMap.get(dbType.getName());
39-
}
40+
private static final Map<String, DataSourceChannel> datasourceChannelMap = new ConcurrentHashMap<>();
4041

41-
public void installPlugin() {
42+
private static final Map<String, DataSourceProcessor> dataSourceProcessorMap = new ConcurrentHashMap<>();
4243

43-
PrioritySPIFactory<DataSourceChannelFactory> prioritySPIFactory =
44-
new PrioritySPIFactory<>(DataSourceChannelFactory.class);
45-
for (Map.Entry<String, DataSourceChannelFactory> entry : prioritySPIFactory.getSPIMap().entrySet()) {
46-
final DataSourceChannelFactory factory = entry.getValue();
47-
final String name = entry.getKey();
44+
static {
45+
loadDataSourcePlugin();
46+
}
4847

49-
log.info("Registering datasource plugin: {}", name);
48+
public static DataSourceChannel getDataSourceChannel(@NonNull DbType dbType) {
49+
return datasourceChannelMap.get(dbType.getName());
50+
}
5051

51-
if (datasourceChannelMap.containsKey(name)) {
52-
throw new IllegalStateException(format("Duplicate datasource plugins named '%s'", name));
53-
}
52+
public static DataSourceProcessor getDataSourceProcessor(@NonNull DbType dbType) {
53+
return dataSourceProcessorMap.get(dbType.getName());
54+
}
5455

55-
loadDatasourceClient(factory);
56+
public static void loadDataSourcePlugin() {
57+
initializeDataSourceChannel();
58+
initializeDataSourceProcessor();
59+
}
5660

57-
log.info("Registered datasource plugin: {}", name);
61+
private static synchronized void initializeDataSourceChannel() {
62+
if (MapUtils.isNotEmpty(datasourceChannelMap)) {
63+
return;
5864
}
65+
new PrioritySPIFactory<>(DataSourceChannelFactory.class).getSPIMap().forEach(
66+
(dataSourceChannelName, dataSourceChannelFactory) -> {
67+
if (datasourceChannelMap.containsKey(dataSourceChannelName)) {
68+
throw new IllegalStateException(
69+
format("Duplicate datasource channel named '%s'", dataSourceChannelName));
70+
}
71+
datasourceChannelMap.put(dataSourceChannelName, dataSourceChannelFactory.create());
72+
log.info("Registered datasource channel: {}", dataSourceChannelName);
73+
});
5974
}
6075

61-
private void loadDatasourceClient(DataSourceChannelFactory datasourceChannelFactory) {
62-
DataSourceChannel datasourceChannel = datasourceChannelFactory.create();
63-
datasourceChannelMap.put(datasourceChannelFactory.getName(), datasourceChannel);
76+
private static synchronized void initializeDataSourceProcessor() {
77+
if (MapUtils.isNotEmpty(dataSourceProcessorMap)) {
78+
return;
79+
}
80+
81+
ServiceLoader.load(DataSourceProcessor.class).forEach(factory -> {
82+
final String name = factory.getDbType().getName();
83+
if (dataSourceProcessorMap.containsKey(name)) {
84+
throw new IllegalStateException(format("Duplicate datasource processor named '%s'", name));
85+
}
86+
DataSourceProcessor dataSourceProcessor = factory.create();
87+
dataSourceProcessorMap.put(name, dataSourceProcessor);
88+
log.info("Success register datasource processor -> {}", name);
89+
});
6490
}
91+
6592
}

dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorManager.java

Lines changed: 0 additions & 58 deletions
This file was deleted.

dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java

Lines changed: 0 additions & 52 deletions
This file was deleted.

0 commit comments

Comments
 (0)