Skip to content

Commit 4ff861a

Browse files
liuyou2yangzhiyue
authored andcommitted
Provides the ability to obtain and manage orchestrator instances. #356
1 parent dec6e73 commit 4ff861a

File tree

8 files changed

+473
-0
lines changed

8 files changed

+473
-0
lines changed
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Copyright 2019 WeBank
4+
~ Licensed under the Apache License, Version 2.0 (the "License");
5+
~ you may not use this file except in compliance with the License.
6+
~ You may obtain a copy of the License at
7+
~
8+
~ http://www.apache.org/licenses/LICENSE-2.0
9+
~
10+
~ Unless required by applicable law or agreed to in writing, software
11+
~ distributed under the License is distributed on an "AS IS" BASIS,
12+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
~ See the License for the specific language governing permissions and
14+
~ limitations under the License.
15+
~
16+
-->
17+
18+
<project xmlns="http://maven.apache.org/POM/4.0.0"
19+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
20+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
21+
<parent>
22+
<artifactId>dss</artifactId>
23+
<groupId>com.webank.wedatasphere.dss</groupId>
24+
<version>1.0.0</version>
25+
</parent>
26+
<modelVersion>4.0.0</modelVersion>
27+
28+
<artifactId>dss-orchestrator-loader</artifactId>
29+
<dependencies>
30+
<dependency>
31+
<groupId>jakarta.annotation</groupId>
32+
<artifactId>jakarta.annotation-api</artifactId>
33+
<version>1.3.5</version>
34+
<scope>provided</scope>
35+
</dependency>
36+
<dependency>
37+
<groupId>com.webank.wedatasphere.dss</groupId>
38+
<artifactId>dss-development-process-standard</artifactId>
39+
<version>${dss.version}</version>
40+
41+
</dependency>
42+
<dependency>
43+
<groupId>com.webank.wedatasphere.dss</groupId>
44+
<artifactId>dss-orchestrator-core</artifactId>
45+
<version>${dss.version}</version>
46+
</dependency>
47+
<dependency>
48+
<groupId>com.webank.wedatasphere.dss</groupId>
49+
<artifactId>dss-orchestrator-db</artifactId>
50+
<version>${dss.version}</version>
51+
</dependency>
52+
<dependency>
53+
<groupId>com.webank.wedatasphere.dss</groupId>
54+
<artifactId>dss-common</artifactId>
55+
<version>${dss.version}</version>
56+
<scope>provided</scope>
57+
</dependency>
58+
<dependency>
59+
<groupId>org.springframework</groupId>
60+
<artifactId>spring-context</artifactId>
61+
<version>${spring.version}</version>
62+
<scope>provided</scope>
63+
</dependency>
64+
<dependency>
65+
<groupId>com.webank.wedatasphere.dss</groupId>
66+
<artifactId>dss-appconn-manager-core</artifactId>
67+
<version>${dss.version}</version>
68+
</dependency>
69+
</dependencies>
70+
71+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright 2019 WeBank
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*
15+
*/
16+
17+
package com.webank.wedatasphere.dss.orchestrator.loader;
18+
19+
import com.webank.wedatasphere.dss.appconn.core.AppConn;
20+
import com.webank.wedatasphere.dss.appconn.manager.AppConnManager;
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
import org.springframework.stereotype.Component;
26+
27+
28+
29+
@Component
30+
class DefaultLinkedAppConnResolver implements LinkedAppConnResolver {
31+
32+
33+
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultLinkedAppConnResolver.class);
34+
35+
36+
static{
37+
LOGGER.info("component resolver inited");
38+
}
39+
40+
@Override
41+
public List<AppConn> resolveAppConnByUser(String userName, String workspaceName, String typeName) {
42+
//todo 后面可以使用数据库表来定义用户可以加载的AppConn.
43+
List<AppConn> appConns = new ArrayList<>();
44+
for(AppConn appConn : AppConnManager.getAppConnManager().listAppConns()){
45+
//可以在这里根据用户情况和工作空间情况,限制appConn的加载
46+
appConns.add(appConn);
47+
}
48+
49+
return appConns;
50+
}
51+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright 2019 WeBank
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*
15+
*/
16+
17+
package com.webank.wedatasphere.dss.orchestrator.loader;
18+
19+
import com.webank.wedatasphere.dss.appconn.core.AppConn;
20+
import com.webank.wedatasphere.dss.appconn.core.exception.AppConnErrorException;
21+
import com.webank.wedatasphere.dss.appconn.core.ext.OnlyDevelopmentAppConn;
22+
import com.webank.wedatasphere.dss.appconn.manager.AppConnManager;
23+
import com.webank.wedatasphere.dss.common.label.DSSLabel;
24+
import com.webank.wedatasphere.dss.orchestrator.core.DSSOrchestrator;
25+
import com.webank.wedatasphere.dss.orchestrator.core.DSSOrchestratorContext;
26+
import com.webank.wedatasphere.dss.orchestrator.core.impl.DefaultOrchestrator;
27+
import java.util.List;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
import org.springframework.beans.factory.annotation.Autowired;
31+
import org.springframework.stereotype.Component;
32+
33+
34+
@Component
35+
public class DefaultOrchestratorLoader implements OrchestratorLoader {
36+
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultOrchestratorLoader.class);
37+
@Autowired
38+
private DSSOrchestratorContext dssOrchestratorContext;
39+
40+
@Autowired
41+
private LinkedAppConnResolver linkedAppConnResolver;
42+
43+
44+
@Override
45+
public DSSOrchestrator loadOrchestrator(String userName,
46+
String workspaceName,
47+
String typeName,
48+
String appConnName,
49+
List<DSSLabel> dssLabels) throws AppConnErrorException {
50+
51+
//todo load DSSOrchestatror by type name
52+
DSSOrchestrator dssOrchestrator = new DefaultOrchestrator() {
53+
@Override
54+
protected DSSOrchestratorContext createOrchestratorContext() {
55+
return dssOrchestratorContext;
56+
}
57+
};
58+
59+
//向工作流添加实现了第三级规范的AppConn
60+
List<AppConn> appConnList = linkedAppConnResolver.resolveAppConnByUser(userName, workspaceName, typeName);
61+
for (AppConn appConn : appConnList) {
62+
if(appConn instanceof OnlyDevelopmentAppConn){
63+
dssOrchestrator.addLinkedAppConn(appConn);
64+
}
65+
66+
}
67+
LOGGER.info("Load dss orchestrator:"+appConnName+",typeName:"+typeName);
68+
AppConn appConn = AppConnManager.getAppConnManager().getAppConn(appConnName);
69+
dssLabels.forEach(dssOrchestrator::addLinkedDssLabels);
70+
dssOrchestrator.setAppConn(appConn);
71+
return dssOrchestrator;
72+
}
73+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright 2019 WeBank
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*
15+
*/
16+
17+
package com.webank.wedatasphere.dss.orchestrator.loader;
18+
19+
import com.webank.wedatasphere.dss.appconn.core.AppConn;
20+
21+
22+
import java.util.List;
23+
24+
public interface LinkedAppConnResolver {
25+
List<AppConn> resolveAppConnByUser(String userName, String workspaceName, String typeName);
26+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2019 WeBank
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*
15+
*/
16+
17+
package com.webank.wedatasphere.dss.orchestrator.loader;
18+
19+
20+
import com.webank.wedatasphere.dss.appconn.core.exception.AppConnErrorException;
21+
import com.webank.wedatasphere.dss.common.label.DSSLabel;
22+
import com.webank.wedatasphere.dss.orchestrator.core.DSSOrchestrator;
23+
24+
import java.util.List;
25+
26+
public interface OrchestratorLoader {
27+
28+
/**
29+
* 用于返回一个指定类型的的Orchestrator
30+
* @param userName
31+
* @param workspaceName
32+
* @param typeName
33+
* @param appConnName 唯一标识一种类型的AppConn,比如workflowOrchestratorAppConn
34+
* @return
35+
*/
36+
DSSOrchestrator loadOrchestrator(String userName,
37+
String workspaceName,
38+
String typeName,
39+
String appConnName,
40+
List<DSSLabel> dssLabels) throws AppConnErrorException;
41+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright 2019 WeBank
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*
15+
*/
16+
17+
package com.webank.wedatasphere.dss.orchestrator.loader;
18+
19+
import com.webank.wedatasphere.dss.appconn.core.exception.AppConnErrorException;
20+
import com.webank.wedatasphere.dss.common.label.DSSLabel;
21+
import com.webank.wedatasphere.dss.orchestrator.core.DSSOrchestrator;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
import org.springframework.beans.factory.annotation.Autowired;
25+
import org.springframework.stereotype.Component;
26+
27+
import java.util.List;
28+
import java.util.Map;
29+
import java.util.concurrent.ConcurrentHashMap;
30+
31+
32+
@Component
33+
public class OrchestratorManager {
34+
35+
private final static Logger logger = LoggerFactory.getLogger(OrchestratorManager.class);
36+
37+
private Map<String, DSSOrchestrator> cacheDssOrchestrator = new ConcurrentHashMap<>();
38+
39+
@Autowired
40+
private DefaultOrchestratorLoader defaultOrchestratorLoader;
41+
42+
public DSSOrchestrator getOrCreateOrchestrator(String userName,
43+
String workspaceName,
44+
String typeName,
45+
String appConnName,
46+
List<DSSLabel> dssLabels) {
47+
String findKey = getCacheKey(userName, workspaceName, typeName, appConnName);
48+
DSSOrchestrator dssOrchestrator = cacheDssOrchestrator.get(findKey);
49+
if (null == dssOrchestrator) {
50+
try {
51+
52+
dssOrchestrator = defaultOrchestratorLoader.loadOrchestrator(userName, workspaceName, typeName, appConnName, dssLabels);
53+
54+
cacheDssOrchestrator.put(findKey, dssOrchestrator);
55+
} catch (AppConnErrorException e) {
56+
logger.error("OrchestratorManager get DSSOrchestrator exception!", e);
57+
}
58+
}
59+
return dssOrchestrator;
60+
}
61+
62+
protected String getCacheKey(String userName, String workspaceName, String typeName, String appConnName) {
63+
return userName + "_" + workspaceName + "_" + typeName + "_" + appConnName;
64+
}
65+
}

0 commit comments

Comments
 (0)