2323import com .webank .wedatasphere .dss .apiservice .core .bo .LinkisExecuteResult ;
2424import com .webank .wedatasphere .dss .apiservice .core .config .ApiServiceConfiguration ;
2525import com .webank .wedatasphere .dss .apiservice .core .constant .ParamType ;
26- import com .webank .wedatasphere .dss .apiservice .core .constant .ParamTypeEnum ;
2726import com .webank .wedatasphere .dss .apiservice .core .constant .RequireEnum ;
2827import com .webank .wedatasphere .dss .apiservice .core .dao .*;
29- import com .webank .wedatasphere .dss .apiservice .core .exception .ApiExecuteException ;
3028import com .webank .wedatasphere .dss .apiservice .core .exception .ApiServiceQueryException ;
3129import com .webank .wedatasphere .dss .apiservice .core .execute .ApiServiceExecuteJob ;
3230import com .webank .wedatasphere .dss .apiservice .core .execute .DefaultApiServiceJob ;
3735import com .webank .wedatasphere .dss .apiservice .core .util .DateUtil ;
3836import com .webank .wedatasphere .dss .apiservice .core .util .SQLCheckUtil ;
3937import com .webank .wedatasphere .dss .apiservice .core .vo .*;
40- //import com.webank.wedatasphere.dss.oneservice.core.jdbc.JdbcUtil;
4138import com .webank .wedatasphere .dss .apiservice .core .exception .ApiServiceRuntimeException ;
4239import com .webank .wedatasphere .dss .apiservice .core .service .ApiServiceQueryService ;
4340import com .webank .wedatasphere .dss .apiservice .core .util .AssertUtil ;
4441import com .webank .wedatasphere .dss .apiservice .core .util .ModelMapperUtil ;
45- //import com.webank.wedatasphere.dss.oneservice.core.vo.*;
4642import com .webank .wedatasphere .dss .apiservice .core .vo .ApiServiceVo ;
4743import org .apache .linkis .bml .client .BmlClient ;
4844import org .apache .linkis .bml .client .BmlClientFactory ;
4945import org .apache .linkis .bml .protocol .BmlDownloadResponse ;
5046import org .apache .linkis .common .io .FsPath ;
5147import org .apache .linkis .storage .source .FileSource ;
52- import org .apache .linkis .storage .source .FileSource$ ;
5348import org .apache .linkis .ujes .client .UJESClient ;
5449import org .apache .linkis .ujes .client .response .JobExecuteResult ;
55- import org .apache .commons .collections4 .MapUtils ;
5650import org .apache .commons .lang .StringUtils ;
5751import org .apache .commons .math3 .util .Pair ;
5852import org .slf4j .Logger ;
@@ -141,11 +135,9 @@ public class ApiServiceQueryServiceImpl implements ApiServiceQueryService {
141135 @ Autowired
142136 private ApiServiceTokenManagerDao apiServiceTokenManagerDao ;
143137
144-
145138 @ Autowired
146139 private ApiService apiService ;
147140
148-
149141 @ Autowired
150142 private ApiServiceAccessDao apiServiceAccessDao ;
151143
@@ -169,9 +161,7 @@ public LinkisExecuteResult query(String path,
169161 ApiServiceToken tokenDetail ,
170162 String loginUser ) {
171163 // 根据path查询resourceId和version
172-
173164 // 得到metadata
174-
175165 // 执行查询
176166 //path 必须唯一
177167 ApiServiceVo apiServiceVo = apiServiceDao .queryByPath (path );
@@ -241,10 +231,6 @@ public LinkisExecuteResult query(String path,
241231 }
242232 });
243233
244-
245- // AssertUtil.isTrue(MapUtils.isNotEmpty((Map) collect.getKey()), "数据源不能为空");
246-
247-
248234 ApiServiceExecuteJob job = new DefaultApiServiceJob ();
249235 //sql代码封装成scala执行
250236 job .setCode (ExecuteCodeHelper .packageCodeToExecute (executeCode , maxApiVersionVo .getMetadataInfo ()));
@@ -270,7 +256,6 @@ public LinkisExecuteResult query(String path,
270256 apiAccessVo .setAccessTime (DateUtil .getNow ());
271257 apiServiceAccessDao .addAccessRecord (apiAccessVo );
272258
273-
274259 JobExecuteResult jobExecuteResult = LinkisJobSubmit .execute (job ,ujesClient );
275260
276261 //记录执行任务用户和代理用户关系,没有代理用户的统一设置为登录用户
@@ -280,17 +265,13 @@ public LinkisExecuteResult query(String path,
280265 apiServiceJob .setJobExecuteResult (jobExecuteResult );
281266 runJobs .put (jobExecuteResult .getTaskID (),apiServiceJob );
282267
283-
284268 LinkisExecuteResult linkisExecuteResult = new LinkisExecuteResult (jobExecuteResult .getTaskID (), jobExecuteResult .getExecID ());
285269 return linkisExecuteResult ;
286270 } catch (IOException e ) {
287271 throw new ApiServiceRuntimeException (e .getMessage (), e );
288272 }
289273 }
290274
291-
292-
293-
294275 @ Override
295276 public ApiServiceVo queryByVersionId (String userName ,Long versionId ) throws ApiServiceQueryException {
296277 ApiVersionVo apiVersionVo = apiServiceVersionDao .queryApiVersionByVersionId (versionId );
@@ -325,10 +306,8 @@ public List<QueryParamVo> queryParamList(String scriptPath, Long versionId) {
325306
326307 AssertUtil .notNull (targetApiVersionVo , "目标参数版本不存在,path=" + scriptPath +",version:" +versionId );
327308
328- // todo~!
329309 List <ParamVo > paramVoList = apiServiceParamDao .queryByVersionId (targetApiVersionVo .getId ());
330310
331-
332311 List <QueryParamVo > queryParamVoList = new ArrayList <>();
333312
334313 Map <String , ParamVo > paramMap = paramVoList .stream ()
@@ -390,16 +369,16 @@ private Pair<Object, ArrayList<String[]>> queryBml(String userName, String resou
390369
391370 InputStream inputStream = resource .inputStream ();
392371
393- try (FileSource fileSource = FileSource$ . MODULE$ .create (new FsPath (scriptPath ), inputStream )) {
372+ try (FileSource fileSource = FileSource .create (new FsPath (scriptPath ), inputStream )) {
394373 //todo 数组取了第一个
395- collect = fileSource .collect ()[0 ];
374+ Pair <Object , List <String []>> sourcePair = fileSource .collect ()[0 ];
375+ collect = new Pair <>(sourcePair .getKey (), new ArrayList <>(sourcePair .getValue ()));
396376 bmlCache .put (key , collect );
397377 }
398378 }
399379 }
400380 }
401381
402-
403382 return collect ;
404383 }
405384
@@ -425,60 +404,6 @@ private Map<String, String> queryConfigParam(long apiId, String version) {
425404 return collect ;
426405 }
427406
428-
429-
430-
431- // private Tuple3 getDatasourceInfo(final Map<String, Object> datasourceMap) {
432- // Tuple3 tuple3 = datasourceCache.getIfPresent(datasourceMap);
433- //
434- // if (tuple3 == null) {
435- // synchronized (this) {
436- // tuple3 = datasourceCache.getIfPresent(datasourceMap);
437- // if (tuple3 == null) {
438- // tuple3 = JdbcUtil.getDatasourceInfo(datasourceMap);
439- // datasourceCache.put(datasourceMap, tuple3);
440- // }
441- // }
442- // }
443- //
444- // return tuple3;
445- // }
446-
447- // private List<Map<String, Object>> executeJob(String executeCode,
448- // Object datasourceMap, Map<String, Object> params) {
449- //
450- //// Tuple3 tuple3 = getDatasourceInfo((Map<String, Object>) datasourceMap);
451- //// final String jdbcUrl = tuple3._1().toString();
452- //// final String username = tuple3._2().toString();
453- //// final String password = tuple3._3().toString();
454- //
455- //// NamedParameterJdbcTemplate namedParameterJdbcTemplate = datasourceService.getNamedParameterJdbcTemplate(jdbcUrl, username, password);
456- //
457- // String namedSql = genNamedSql(executeCode, params);
458- //
459- //// return namedParameterJdbcTemplate.query(namedSql, new MapSqlParameterSource(params), new ColumnAliasMapRowMapper());
460- //
461- // }
462-
463- private static String genNamedSql (String executeCode , Map <String , Object > params ) {
464- // 没有参数,无需生成namedSql
465- if (MapUtils .isEmpty (params )) {
466- return executeCode ;
467- }
468-
469- for (String paramName : params .keySet ()) {
470- for (String $name : new String []{"'${" + paramName + "}'" , "${" + paramName + "}" , "\" ${" + paramName + "}\" " }) {
471- if (executeCode .contains ($name )) {
472- executeCode = StringUtils .replace (executeCode , $name , ":" + paramName );
473- break ;
474- }
475- }
476- }
477-
478- return executeCode ;
479- }
480-
481-
482407 public static class ColumnAliasMapRowMapper implements RowMapper <Map <String , Object >> {
483408 @ Override
484409 public Map <String , Object > mapRow (ResultSet rs , int rowNum ) throws SQLException {
@@ -520,27 +445,7 @@ protected Object getColumnValue(ResultSet rs, int index) throws SQLException {
520445
521446 @ Override
522447 public ApiServiceJob getJobByTaskId (String taskId ){
523- ApiServiceJob apiServiceJob =runJobs .get (taskId );
524- return apiServiceJob ;
448+ return runJobs .get (taskId );
525449 }
526450
527-
528- private static String getRunTypeFromScriptsPath (String scriptsPath ) {
529-
530- String res = "sql" ;
531- String fileFlag = scriptsPath .substring (scriptsPath .lastIndexOf ("." ) + 1 );
532- switch (fileFlag ) {
533- case "sh" :
534- res = "shell" ;
535- break ;
536- case "py" :
537- res = "pyspark" ;
538- break ;
539- default :
540- res = fileFlag ;
541- break ;
542- }
543- return res ;
544-
545- }
546451}
0 commit comments