3030import org .apache .dolphinscheduler .plugin .task .api .parameters .AbstractParameters ;
3131import org .apache .dolphinscheduler .plugin .task .api .parameters .resource .DataSourceParameters ;
3232import org .apache .dolphinscheduler .plugin .task .api .parameters .resource .ResourceParametersHelper ;
33+ import org .apache .dolphinscheduler .plugin .task .api .utils .RetryUtils ;
3334import org .apache .dolphinscheduler .spi .enums .DbType ;
3435
3536import org .apache .commons .lang3 .StringUtils ;
3940import java .util .HashMap ;
4041import java .util .List ;
4142import java .util .Map ;
43+ import java .util .stream .Collectors ;
4244
4345import lombok .extern .slf4j .Slf4j ;
4446
4547import com .aliyun .emr_serverless_spark20230808 .Client ;
4648import com .aliyun .emr_serverless_spark20230808 .models .CancelJobRunRequest ;
4749import com .aliyun .emr_serverless_spark20230808 .models .GetJobRunRequest ;
4850import com .aliyun .emr_serverless_spark20230808 .models .GetJobRunResponse ;
51+ import com .aliyun .emr_serverless_spark20230808 .models .GetTemplateRequest ;
52+ import com .aliyun .emr_serverless_spark20230808 .models .GetTemplateResponse ;
4953import com .aliyun .emr_serverless_spark20230808 .models .JobDriver ;
5054import com .aliyun .emr_serverless_spark20230808 .models .StartJobRunRequest ;
5155import com .aliyun .emr_serverless_spark20230808 .models .StartJobRunResponse ;
@@ -64,6 +68,12 @@ public class AliyunServerlessSparkTask extends AbstractRemoteTask {
6468
6569 private AliyunServerlessSparkConnectionParam aliyunServerlessSparkConnectionParam ;
6670
71+ private String templateConf ;
72+
73+ private String templateDisplayReleaseVersion ;
74+
75+ private Boolean templateFusion ;
76+
6777 private String jobRunId ;
6878
6979 private RunState currentState ;
@@ -116,6 +126,26 @@ public void init() {
116126
117127 @ Override
118128 public void handle (TaskCallBack taskCallBack ) throws TaskException {
129+ try {
130+ GetTemplateResponse getTemplateResponse = aliyunServerlessSparkClient .getTemplate (
131+ aliyunServerlessSparkParameters .getWorkspaceId (),
132+ buildGetTemplateRequest ());
133+
134+ if (getTemplateResponse != null ) {
135+ templateConf = getTemplateResponse .getBody ()
136+ .getData ()
137+ .getSparkConf ()
138+ .stream ()
139+ .map (item -> "--conf " + item .getKey () + "=" + item .getValue ())
140+ .collect (Collectors .joining (" " ));
141+
142+ templateDisplayReleaseVersion = getTemplateResponse .getBody ().getData ().getDisplaySparkVersion ();
143+ templateFusion = getTemplateResponse .getBody ().getData ().getFusion ();
144+ }
145+ } catch (Exception e ) {
146+ throw new AliyunServerlessSparkTaskException ("Failed to get serverless spark template!" );
147+ }
148+
119149 try {
120150 StartJobRunRequest startJobRunRequest = buildStartJobRunRequest (aliyunServerlessSparkParameters );
121151 RuntimeOptions runtime = new RuntimeOptions ();
@@ -128,8 +158,17 @@ public void handle(TaskCallBack taskCallBack) throws TaskException {
128158
129159 while (!RunState .isFinal (currentState )) {
130160 GetJobRunRequest getJobRunRequest = buildGetJobRunRequest ();
131- GetJobRunResponse getJobRunResponse = aliyunServerlessSparkClient
132- .getJobRun (aliyunServerlessSparkParameters .getWorkspaceId (), jobRunId , getJobRunRequest );
161+
162+ GetJobRunResponse getJobRunResponse = RetryUtils .retryFunction (() -> {
163+ try {
164+ return aliyunServerlessSparkClient
165+ .getJobRun (aliyunServerlessSparkParameters .getWorkspaceId (), jobRunId ,
166+ getJobRunRequest );
167+ } catch (Exception e ) {
168+ throw new AliyunServerlessSparkTaskException ("Failed to get job run!" , e );
169+ }
170+ }, new RetryUtils .RetryPolicy (10 , 1000L ));
171+
133172 currentState = RunState .valueOf (getJobRunResponse .getBody ().getJobRun ().getState ());
134173 log .info ("job - {} state - {}" , jobRunId , currentState );
135174 Thread .sleep (10 * 1000L );
@@ -199,16 +238,26 @@ protected Client buildAliyunServerlessSparkClient(String accessKeyId, String acc
199238 }
200239
201240 protected StartJobRunRequest buildStartJobRunRequest (AliyunServerlessSparkParameters aliyunServerlessSparkParameters ) {
241+ if (templateConf != null ) {
242+ aliyunServerlessSparkParameters .setSparkSubmitParameters (
243+ templateConf + " " + aliyunServerlessSparkParameters .getSparkSubmitParameters ());
244+ }
245+
202246 StartJobRunRequest startJobRunRequest = new StartJobRunRequest ();
203247 startJobRunRequest .setRegionId (regionId );
204248 startJobRunRequest .setResourceQueueId (aliyunServerlessSparkParameters .getResourceQueueId ());
205249 startJobRunRequest .setCodeType (aliyunServerlessSparkParameters .getCodeType ());
206250 startJobRunRequest .setName (aliyunServerlessSparkParameters .getJobName ());
251+
207252 String engineReleaseVersion = aliyunServerlessSparkParameters .getEngineReleaseVersion ();
208- engineReleaseVersion =
209- StringUtils .isEmpty (engineReleaseVersion ) ? AliyunServerlessSparkConstants .DEFAULT_ENGINE
210- : engineReleaseVersion ;
211- startJobRunRequest .setReleaseVersion (engineReleaseVersion );
253+
254+ if (engineReleaseVersion != null && !engineReleaseVersion .isEmpty ()) {
255+ startJobRunRequest .setReleaseVersion (engineReleaseVersion );
256+ } else if (templateDisplayReleaseVersion != null && templateFusion != null ) {
257+ startJobRunRequest .setDisplayReleaseVersion (templateDisplayReleaseVersion );
258+ startJobRunRequest .setFusion (templateFusion );
259+ }
260+
212261 Tag envTag = new Tag ();
213262 envTag .setKey (AliyunServerlessSparkConstants .ENV_KEY );
214263 String envType = aliyunServerlessSparkParameters .isProduction () ? AliyunServerlessSparkConstants .ENV_PROD
@@ -243,4 +292,14 @@ protected CancelJobRunRequest buildCancelJobRunRequest() {
243292 cancelJobRunRequest .setRegionId (regionId );
244293 return cancelJobRunRequest ;
245294 }
295+
296+ protected GetTemplateRequest buildGetTemplateRequest () {
297+ GetTemplateRequest getTemplateRequest = new GetTemplateRequest ();
298+
299+ if (aliyunServerlessSparkParameters .getTemplateId () != null ) {
300+ getTemplateRequest .setTemplateBizId (aliyunServerlessSparkParameters .getTemplateId ());
301+ }
302+
303+ return getTemplateRequest ;
304+ }
246305}
0 commit comments