1717
1818package org .apache .linkis .entrance .interceptor .impl
1919
20+ import org .apache .linkis .common .utils .{Logging , Utils }
2021import org .apache .linkis .common .utils .CodeAndRunTypeUtils .LANGUAGE_TYPE_AI_SQL
21- import org .apache .linkis .common .utils .Logging
2222import org .apache .linkis .entrance .conf .EntranceConfiguration
23+ import org .apache .linkis .entrance .conf .EntranceConfiguration ._
2324import org .apache .linkis .entrance .interceptor .EntranceInterceptor
2425import org .apache .linkis .governance .common .entity .job .{JobAiRequest , JobRequest }
2526import org .apache .linkis .governance .common .protocol .job .JobAiReqInsert
@@ -29,6 +30,8 @@ import org.apache.linkis.manager.label.utils.LabelUtil
2930import org .apache .linkis .protocol .utils .TaskUtils
3031import org .apache .linkis .rpc .Sender
3132
33+ import org .apache .commons .lang3 .StringUtils
34+
3235import org .springframework .beans .BeanUtils
3336
3437import java .{lang , util }
@@ -39,18 +42,19 @@ import scala.collection.JavaConverters._
3942class AISQLTransformInterceptor extends EntranceInterceptor with Logging {
4043
4144 override def apply (jobRequest : JobRequest , logAppender : lang.StringBuilder ): JobRequest = {
42- // TODO 修改为变量
43- val aiSqlEnable = true
44- // 转换为小写
45- val supportAISQLCreator = " IDE"
45+ val aiSqlEnable : Boolean = AI_SQL_ENABLED
46+ val supportAISQLCreator : String = AI_SQL_CREATORS .toLowerCase()
4647 val sqlLanguage : String = LANGUAGE_TYPE_AI_SQL
47- val sparkEngineType = " spark-3.4.4 "
48+ val sparkEngineType : String = AI_SQL_DEFAULT_SPARK_ENGINE_TYPE
4849 val labels : util.List [Label [_]] = jobRequest.getLabels
4950 val codeType : String = LabelUtil .getCodeType(labels)
5051 // engineType and creator have been verified in LabelCheckInterceptor.
5152 val userCreatorOpt : Option [Label [_]] = labels.asScala.find(_.isInstanceOf [UserCreatorLabel ])
5253 val creator : String = userCreatorOpt.get.asInstanceOf [UserCreatorLabel ].getCreator
5354 val engineTypeLabelOpt : Option [Label [_]] = labels.asScala.find(_.isInstanceOf [EngineTypeLabel ])
55+
56+ val startMap : util.Map [String , AnyRef ] = TaskUtils .getStartupMap(jobRequest.getParams)
57+
5458 // aiSql change to spark
5559 var currentEngineType : String =
5660 engineTypeLabelOpt.get.asInstanceOf [EngineTypeLabel ].getEngineType
@@ -59,20 +63,51 @@ class AISQLTransformInterceptor extends EntranceInterceptor with Logging {
5963 .equals(codeType) && supportAISQLCreator.contains(creator.toLowerCase())
6064 ) {
6165 engineTypeLabelOpt.get.asInstanceOf [EngineTypeLabel ].setEngineType(sparkEngineType)
62- // TODO 添加 aisql 标识
66+
67+ startMap.put(AI_SQL_KEY , " true" )
68+
6369 currentEngineType = sparkEngineType
6470
65- // TODO 将转换后的数据保存到数据库
6671 persist(jobRequest);
6772
6873 }
6974 // 开启 spark 动态资源规划, spark3.4.4
7075 if (sparkEngineType.equals(currentEngineType)) {
7176 logger.info(" spark3 add dynamic resource." )
72- val startMap : util. Map [ String , AnyRef ] = TaskUtils .getStartupMap(jobRequest.getParams)
77+
7378 // add spark dynamic resource planning
74- // TODO
75- startMap.put(" " , " " )
79+ startMap.put(
80+ " spark.shuffle.service.enabled" ,
81+ SPARK_SHUFFLE_SERVICE_ENABLED .asInstanceOf [AnyRef ]
82+ )
83+ startMap.put(
84+ " spark.dynamicAllocation.enabled" ,
85+ SPARK_DYNAMIC_ALLOCATION_ENABLED .asInstanceOf [AnyRef ]
86+ )
87+ startMap.put(
88+ " spark.dynamicAllocation.minExecutors" ,
89+ SPARK_DYNAMIC_ALLOCATION_MIN_EXECUTORS .asInstanceOf [AnyRef ]
90+ )
91+ startMap.put(
92+ " spark.dynamicAllocation.maxExecutors" ,
93+ SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS .asInstanceOf [AnyRef ]
94+ )
95+ startMap.put(" spark.executor.cores" , SPARK_EXECUTOR_CORES .asInstanceOf [AnyRef ])
96+ startMap.put(" spark.executor.memory" , SPARK_EXECUTOR_MEMORY .asInstanceOf [AnyRef ])
97+ startMap.put(" spark.executor.instances" , SPARK_EXECUTOR_INSTANCES .asInstanceOf [AnyRef ])
98+
99+ Utils .tryAndWarn {
100+ val extraConfs : String = SPARK_DYNAMIC_ALLOCATION_ADDITIONAL_CONFS
101+ if (StringUtils .isNotBlank(extraConfs)) {
102+ val confs : Array [String ] = extraConfs.split(" ," )
103+ for (conf <- confs) {
104+ val confKey : String = conf.split(" =" )(0 )
105+ val confValue : String = conf.split(" =" )(1 )
106+ startMap.put(confKey, confValue)
107+ }
108+ }
109+ }
110+
76111 }
77112 jobRequest
78113 }
0 commit comments