Skip to content

Commit 4784280

Browse files
fix the bug that nullpointexception in dataapiservice when fetch resultSet.
1 parent 7c72997 commit 4784280

File tree

3 files changed

+108
-91
lines changed

3 files changed

+108
-91
lines changed

dss-apps/dss-apiservice-server/src/main/java/com/webank/wedatasphere/dss/apiservice/core/execute/ExecuteCodeHelper.java

Lines changed: 20 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,15 @@
2121
import com.webank.wedatasphere.dss.apiservice.core.action.ResultWorkspaceIds;
2222
import com.webank.wedatasphere.dss.apiservice.core.config.ApiServiceConfiguration;
2323
import com.webank.wedatasphere.dss.apiservice.core.exception.ApiExecuteException;
24+
import org.apache.commons.lang.StringUtils;
2425
import org.apache.linkis.common.utils.Utils;
25-
import org.apache.linkis.httpclient.response.Result;
2626
import org.apache.linkis.ujes.client.UJESClient;
2727
import org.apache.linkis.ujes.client.request.ResultSetAction;
2828
import org.apache.linkis.ujes.client.request.ResultSetListAction;
2929
import org.apache.linkis.ujes.client.response.JobExecuteResult;
3030
import org.apache.linkis.ujes.client.response.JobInfoResult;
3131
import org.apache.linkis.ujes.client.response.JobLogResult;
3232
import org.apache.linkis.ujes.client.response.ResultSetListResult;
33-
import org.apache.commons.lang.StringUtils;
3433
import org.slf4j.Logger;
3534
import org.slf4j.LoggerFactory;
3635

@@ -73,33 +72,33 @@ public static String packageCodeToExecute(String executeCode, String metaDataInf
7372

7473
public static Map<String,Object> getMetaDataInfoByExecute(String user,
7574
String executeCode,
76-
Map<String, Object> Params,
75+
Map<String, Object> params,
7776
String scriptPath) throws Exception {
78-
Map<String, String> props = new HashMap<>();
7977
Map<String,Object> resultMap = new HashMap<>();
80-
UJESClient client = LinkisJobSubmit.getClient(props);
78+
UJESClient client = LinkisJobSubmit.getClient();
8179
ApiServiceExecuteJob job = new DefaultApiServiceJob();
8280
//sql代码封装成scala执行
8381
job.setCode(ExecuteCodeHelper.packageCodeToRelease(executeCode));
8482
job.setEngineType("spark");
8583
job.setRunType("scala");
8684
job.setUser(user);
8785
job.setParams(null);
88-
job.setRuntimeParams((Map<String,Object>)Params.get("variable"));// pattern注入
86+
// pattern注入
87+
job.setRuntimeParams((Map<String,Object>) params.get("variable"));
8988
job.setScriptePath(scriptPath);
90-
JobExecuteResult jobExecuteResult = LinkisJobSubmit.execute(job,client, "IDE");
89+
JobExecuteResult jobExecuteResult = LinkisJobSubmit.execute(job, client, "IDE");
9190
job.setJobExecuteResult(jobExecuteResult);
9291
try {
93-
waitForComplete(job,client);
92+
waitForComplete(job, client);
9493
} catch (Exception e) {
9594
LOGGER.warn("Failed to execute job", e);
96-
String reason = getLog(job,client);
95+
String reason = getLog(job, client);
9796
LOGGER.error("Reason for failure: " + reason);
9897
throw new ApiExecuteException(800024,"获取库表信息失败,执行脚本出错!");
9998
}
100-
int resultSize = getResultSize(job,client);
99+
int resultSize = getResultSize(job, client);
101100
for(int i =0; i < resultSize; i++){
102-
String result = getResult(job, i, ApiServiceConfiguration.RESULT_PRINT_SIZE.getValue().intValue(),client);
101+
String result = getResult(job, i, ApiServiceConfiguration.RESULT_PRINT_SIZE.getValue(),client);
103102
LOGGER.info("The content of the " + (i + 1) + "th resultset is :"
104103
+ result);
105104
resultMap.put(Integer.toString(i),result);
@@ -111,10 +110,10 @@ public static Map<String,Object> getMetaDataInfoByExecute(String user,
111110
}
112111

113112

114-
public static void waitForComplete(ApiServiceExecuteJob job,UJESClient client) throws Exception {
113+
public static void waitForComplete(ApiServiceExecuteJob job, UJESClient client) throws Exception {
115114
JobInfoResult jobInfo = client.getJobInfo(job.getJobExecuteResult());
116115
while (!jobInfo.isCompleted()) {
117-
LOGGER.info("Update Progress info:" + getProgress(job,client));
116+
LOGGER.info("Update Progress info:" + getProgress(job, client));
118117
LOGGER.info("<----linkis log ---->");
119118
Utils.sleepQuietly(ApiServiceConfiguration.LINKIS_JOB_REQUEST_STATUS_TIME.getValue(job.getJobProps()));
120119
jobInfo = client.getJobInfo(job.getJobExecuteResult());
@@ -125,7 +124,7 @@ public static void waitForComplete(ApiServiceExecuteJob job,UJESClient client)
125124
}
126125

127126

128-
public static void cancel(ApiServiceExecuteJob job,UJESClient client) throws Exception {
127+
public static void cancel(ApiServiceExecuteJob job,UJESClient client) {
129128
client.kill(job.getJobExecuteResult());
130129
}
131130

@@ -139,10 +138,10 @@ public static Boolean isCompleted(ApiServiceExecuteJob job,UJESClient client) {
139138
return client.getJobInfo(job.getJobExecuteResult()).isCompleted();
140139
}
141140

142-
public static String getResult(ApiServiceExecuteJob job, int index, int maxSize,UJESClient client) {
141+
public static String getResult(ApiServiceExecuteJob job, int index, int maxSize, UJESClient client) {
143142
String resultContent = null;
144143
JobInfoResult jobInfo = client.getJobInfo(job.getJobExecuteResult());
145-
String[] resultSetList = jobInfo.getResultSetList(LinkisJobSubmit.getClient(job.getJobProps()));
144+
String[] resultSetList = jobInfo.getResultSetList(client);
146145
if (resultSetList != null && resultSetList.length > 0) {
147146
Object fileContent = client.resultSet(ResultSetAction.builder()
148147
.setPath(resultSetList[index])
@@ -159,10 +158,10 @@ public static String getResult(ApiServiceExecuteJob job, int index, int maxSize
159158
}
160159

161160

162-
public static int getResultSize(ApiServiceExecuteJob job,UJESClient client) {
161+
public static int getResultSize(ApiServiceExecuteJob job, UJESClient client) {
163162
JobInfoResult jobInfo = client.getJobInfo(job.getJobExecuteResult());
164163
if (jobInfo.isSucceed()) {
165-
String[] resultSetList = jobInfo.getResultSetList(LinkisJobSubmit.getClient(job.getJobProps()));
164+
String[] resultSetList = jobInfo.getResultSetList(client);
166165
if (resultSetList != null && resultSetList.length > 0) {
167166
return resultSetList.length;
168167
}
@@ -196,14 +195,11 @@ public static String getResultList(JobExecuteResult executeResult,UJESClient cl
196195

197196

198197

199-
public static String getResultContent(String user, String path, int maxSize,UJESClient client) {
200-
201-
String fileContent = client.resultSet(ResultSetAction.builder()
198+
public static String getResultContent(String user, String path, int maxSize, UJESClient client) {
199+
return client.resultSet(ResultSetAction.builder()
202200
.setPath(path)
203201
.setUser(user)
204202
.setPageSize(maxSize).build()).getResponseBody();
205-
206-
return fileContent;
207203
}
208204

209205
public static InputStream downloadResultSet(String user,
@@ -236,10 +232,7 @@ public static InputStream downloadResultSet(String user,
236232

237233

238234
public static Map<String, Object> getTaskInfoById(JobExecuteResult jobExecuteResult, UJESClient client) {
239-
240-
Map<String, Object> taskInfo = (Map<String, Object>)client.getJobInfo(jobExecuteResult).getTask();
241-
242-
return taskInfo;
235+
return (Map<String, Object>) client.getJobInfo(jobExecuteResult).getTask();
243236
}
244237

245238

dss-apps/dss-apiservice-server/src/main/java/com/webank/wedatasphere/dss/apiservice/core/execute/LinkisJobSubmit.java

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,26 +33,32 @@
3333

3434
public class LinkisJobSubmit {
3535

36-
37-
38-
public static UJESClient getClient(Map<String, String> props) {
39-
40-
UJESClient client = getUJESClient(
41-
Configuration.GATEWAY_URL().getValue(props),
42-
ApiServiceConfiguration.LINKIS_ADMIN_USER.getValue(props),
43-
ApiServiceConfiguration.LINKIS_AUTHOR_USER_TOKEN.getValue(props),
44-
props);
45-
46-
return client;
36+
private static final Map<String, UJESClient> ujesClientMap = new HashMap<>();
37+
38+
public static UJESClient getClient() {
39+
return getUJESClient(
40+
Configuration.GATEWAY_URL().getValue(),
41+
ApiServiceConfiguration.LINKIS_ADMIN_USER.getValue(),
42+
ApiServiceConfiguration.LINKIS_AUTHOR_USER_TOKEN.getValue(),
43+
new HashMap<>(0));
4744
}
4845

4946

5047
public static UJESClient getUJESClient(String url, String user, String token, Map<String, String> jobProps){
51-
return new UJESClientImpl(getClientConfig(url,user,token, jobProps));
48+
String key = url + user + token;
49+
if(ujesClientMap.containsKey(key)) {
50+
return ujesClientMap.get(key);
51+
}
52+
synchronized (LinkisJobSubmit.class) {
53+
if(!ujesClientMap.containsKey(key)) {
54+
ujesClientMap.put(key, new UJESClientImpl(getClientConfig(url,user,token, jobProps)));
55+
}
56+
}
57+
return ujesClientMap.get(key);
5258
}
5359

5460
public static DWSClientConfig getClientConfig(String url, String user, String token, Map<String, String> jobProps){
55-
DWSClientConfig clientConfig = ((DWSClientConfigBuilder) (DWSClientConfigBuilder.newBuilder()
61+
return ((DWSClientConfigBuilder) (DWSClientConfigBuilder.newBuilder()
5662
.addServerUrl(url)
5763
.connectionTimeout(ApiServiceConfiguration.LINKIS_CONNECTION_TIMEOUT.getValue(jobProps))
5864
.discoveryEnabled(false).discoveryFrequency(1, TimeUnit.MINUTES)
@@ -62,7 +68,6 @@ public static DWSClientConfig getClientConfig(String url, String user, String to
6268
.setAuthenticationStrategy(new TokenAuthenticationStrategy())
6369
.setAuthTokenKey(user).setAuthTokenValue(token)))
6470
.setDWSVersion(ApiServiceConfiguration.LINKIS_API_VERSION.getValue(jobProps)).build();
65-
return clientConfig;
6671
}
6772

6873

0 commit comments

Comments
 (0)