Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,31 @@
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks

default_stages: [commit, push]
default_stages: [pre-commit, pre-push]
default_language_version:
# force all python hooks to run python3
python: python3
repos:
# Python API Hooks
- repo: https://github.com/pycqa/isort
rev: 5.10.1
rev: 6.0.1
hooks:
- id: isort
name: isort (python)
- repo: https://github.com/psf/black
rev: 22.3.0
rev: 25.1.0
hooks:
- id: black
- repo: https://github.com/pycqa/flake8
rev: 4.0.1
rev: 7.3.0
hooks:
- id: flake8
additional_dependencies: [
'flake8-docstrings>=1.6',
'flake8-black>=0.2',
]
- repo: https://github.com/pycqa/autoflake
rev: v1.4
rev: v2.3.1
hooks:
- id: autoflake
args: [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>emr_serverless_spark20230808</artifactId>
<version>1.0.0</version>
<version>2.4.1</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class AliyunServerlessSparkParameters extends AbstractParameters {

private String sparkSubmitParameters;

private String templateId;

@JsonProperty("isProduction")
boolean isProduction;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import org.apache.dolphinscheduler.plugin.task.api.utils.RetryUtils;
import org.apache.dolphinscheduler.spi.enums.DbType;

import org.apache.commons.lang3.StringUtils;
Expand All @@ -39,13 +40,16 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;

import com.aliyun.emr_serverless_spark20230808.Client;
import com.aliyun.emr_serverless_spark20230808.models.CancelJobRunRequest;
import com.aliyun.emr_serverless_spark20230808.models.GetJobRunRequest;
import com.aliyun.emr_serverless_spark20230808.models.GetJobRunResponse;
import com.aliyun.emr_serverless_spark20230808.models.GetTemplateRequest;
import com.aliyun.emr_serverless_spark20230808.models.GetTemplateResponse;
import com.aliyun.emr_serverless_spark20230808.models.JobDriver;
import com.aliyun.emr_serverless_spark20230808.models.StartJobRunRequest;
import com.aliyun.emr_serverless_spark20230808.models.StartJobRunResponse;
Expand All @@ -64,6 +68,12 @@ public class AliyunServerlessSparkTask extends AbstractRemoteTask {

private AliyunServerlessSparkConnectionParam aliyunServerlessSparkConnectionParam;

private String templateConf;

private String templateDisplayReleaseVersion;

private Boolean templateFusion;

private String jobRunId;

private RunState currentState;
Expand Down Expand Up @@ -116,6 +126,26 @@ public void init() {

@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
GetTemplateResponse getTemplateResponse = aliyunServerlessSparkClient.getTemplate(
aliyunServerlessSparkParameters.getWorkspaceId(),
buildGetTemplateRequest());

if (getTemplateResponse != null) {
templateConf = getTemplateResponse.getBody()
.getData()
.getSparkConf()
.stream()
.map(item -> "--conf " + item.getKey() + "=" + item.getValue())
.collect(Collectors.joining(" "));

templateDisplayReleaseVersion = getTemplateResponse.getBody().getData().getDisplaySparkVersion();
templateFusion = getTemplateResponse.getBody().getData().getFusion();
}
} catch (Exception e) {
throw new AliyunServerlessSparkTaskException("Failed to get serverless spark template!");
}

try {
StartJobRunRequest startJobRunRequest = buildStartJobRunRequest(aliyunServerlessSparkParameters);
RuntimeOptions runtime = new RuntimeOptions();
Expand All @@ -128,8 +158,17 @@ public void handle(TaskCallBack taskCallBack) throws TaskException {

while (!RunState.isFinal(currentState)) {
GetJobRunRequest getJobRunRequest = buildGetJobRunRequest();
GetJobRunResponse getJobRunResponse = aliyunServerlessSparkClient
.getJobRun(aliyunServerlessSparkParameters.getWorkspaceId(), jobRunId, getJobRunRequest);

GetJobRunResponse getJobRunResponse = RetryUtils.retryFunction(() -> {
try {
return aliyunServerlessSparkClient
.getJobRun(aliyunServerlessSparkParameters.getWorkspaceId(), jobRunId,
getJobRunRequest);
} catch (Exception e) {
throw new AliyunServerlessSparkTaskException("Failed to get job run!", e);
}
}, new RetryUtils.RetryPolicy(10, 1000L));

currentState = RunState.valueOf(getJobRunResponse.getBody().getJobRun().getState());
log.info("job - {} state - {}", jobRunId, currentState);
Thread.sleep(10 * 1000L);
Expand Down Expand Up @@ -199,16 +238,26 @@ protected Client buildAliyunServerlessSparkClient(String accessKeyId, String acc
}

protected StartJobRunRequest buildStartJobRunRequest(AliyunServerlessSparkParameters aliyunServerlessSparkParameters) {
if (templateConf != null) {
aliyunServerlessSparkParameters.setSparkSubmitParameters(
templateConf + " " + aliyunServerlessSparkParameters.getSparkSubmitParameters());
}

StartJobRunRequest startJobRunRequest = new StartJobRunRequest();
startJobRunRequest.setRegionId(regionId);
startJobRunRequest.setResourceQueueId(aliyunServerlessSparkParameters.getResourceQueueId());
startJobRunRequest.setCodeType(aliyunServerlessSparkParameters.getCodeType());
startJobRunRequest.setName(aliyunServerlessSparkParameters.getJobName());

String engineReleaseVersion = aliyunServerlessSparkParameters.getEngineReleaseVersion();
engineReleaseVersion =
StringUtils.isEmpty(engineReleaseVersion) ? AliyunServerlessSparkConstants.DEFAULT_ENGINE
: engineReleaseVersion;
startJobRunRequest.setReleaseVersion(engineReleaseVersion);

if (engineReleaseVersion != null && !engineReleaseVersion.isEmpty()) {
startJobRunRequest.setReleaseVersion(engineReleaseVersion);
} else if (templateDisplayReleaseVersion != null && templateFusion != null) {
startJobRunRequest.setDisplayReleaseVersion(templateDisplayReleaseVersion);
startJobRunRequest.setFusion(templateFusion);
}

Tag envTag = new Tag();
envTag.setKey(AliyunServerlessSparkConstants.ENV_KEY);
String envType = aliyunServerlessSparkParameters.isProduction() ? AliyunServerlessSparkConstants.ENV_PROD
Expand Down Expand Up @@ -243,4 +292,14 @@ protected CancelJobRunRequest buildCancelJobRunRequest() {
cancelJobRunRequest.setRegionId(regionId);
return cancelJobRunRequest;
}

protected GetTemplateRequest buildGetTemplateRequest() {
GetTemplateRequest getTemplateRequest = new GetTemplateRequest();

if (aliyunServerlessSparkParameters.getTemplateId() != null) {
getTemplateRequest.setTemplateBizId(aliyunServerlessSparkParameters.getTemplateId());
}

return getTemplateRequest;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import org.apache.dolphinscheduler.spi.enums.DbType;

import java.util.Collections;

import lombok.extern.slf4j.Slf4j;

import org.junit.jupiter.api.Assertions;
Expand All @@ -51,9 +53,12 @@
import com.aliyun.emr_serverless_spark20230808.models.GetJobRunRequest;
import com.aliyun.emr_serverless_spark20230808.models.GetJobRunResponse;
import com.aliyun.emr_serverless_spark20230808.models.GetJobRunResponseBody;
import com.aliyun.emr_serverless_spark20230808.models.GetTemplateResponse;
import com.aliyun.emr_serverless_spark20230808.models.GetTemplateResponseBody;
import com.aliyun.emr_serverless_spark20230808.models.StartJobRunRequest;
import com.aliyun.emr_serverless_spark20230808.models.StartJobRunResponse;
import com.aliyun.emr_serverless_spark20230808.models.StartJobRunResponseBody;
import com.aliyun.emr_serverless_spark20230808.models.Template;

@Slf4j
@ExtendWith(MockitoExtension.class)
Expand Down Expand Up @@ -90,6 +95,9 @@ public class AliyunServerlessSparkTaskTest {
@Mock
private CancelJobRunResponse mockCancelJobRunResponse;

@Mock
private GetTemplateResponse mockGetTemplateResponse;

@InjectMocks
@Spy
private AliyunServerlessSparkTask aliyunServerlessSparkTask;
Expand Down Expand Up @@ -124,6 +132,8 @@ public class AliyunServerlessSparkTaskTest {

private static final String mockEntryPointArguments = "10";

private static final String mockTemplateId = "TPL-XXXXX";

@BeforeEach
public void before() {
when(mockTaskExecutionContext.getTaskParams()).thenReturn(taskParamsString);
Expand Down Expand Up @@ -167,6 +177,12 @@ public void testHandle() {
() -> doReturn(mockGetJobRunResponse).when(mockAliyunServerlessSparkClient).getJobRun(any(), any(),
any()));

mockGetTemplateResponse = new GetTemplateResponse()
.setBody(new GetTemplateResponseBody().setData(new Template().setSparkConf(Collections.emptyList())));
Assertions.assertDoesNotThrow(
() -> doReturn(mockGetTemplateResponse).when(mockAliyunServerlessSparkClient).getTemplate(any(),
any()));

aliyunServerlessSparkTask.init();
aliyunServerlessSparkTask.handle(mockTaskCallBack);
verify(aliyunServerlessSparkTask).setAppIds(mockJobRunId);
Expand All @@ -190,6 +206,8 @@ public void testCancelApplication() throws Exception {
public void testBuildStartJobRunRequest() {
AliyunServerlessSparkParameters mockAliyunServerlessSparkParameters =
mock(AliyunServerlessSparkParameters.class);
doReturn(mockWorkspaceId).when(mockAliyunServerlessSparkParameters).getWorkspaceId();
doReturn(mockTemplateId).when(mockAliyunServerlessSparkParameters).getTemplateId();
doReturn(mockResourceQueueId).when(mockAliyunServerlessSparkParameters).getResourceQueueId();
doReturn("JAR").when(mockAliyunServerlessSparkParameters).getCodeType();
doReturn("ds-test").when(mockAliyunServerlessSparkParameters).getJobName();
Expand All @@ -198,12 +216,10 @@ public void testBuildStartJobRunRequest() {
doReturn(mockEntryPointArguments).when(mockAliyunServerlessSparkParameters).getEntryPointArguments();

aliyunServerlessSparkTask.buildStartJobRunRequest(mockAliyunServerlessSparkParameters);

verify(mockAliyunServerlessSparkParameters).getResourceQueueId();
verify(mockAliyunServerlessSparkParameters).getCodeType();
verify(mockAliyunServerlessSparkParameters).getJobName();
verify(mockAliyunServerlessSparkParameters).getEngineReleaseVersion();
verify(mockAliyunServerlessSparkParameters).isProduction();
}

}
2 changes: 2 additions & 0 deletions dolphinscheduler-ui/src/locales/en_US/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,8 @@ export default {
entry_point_arguments_tips: 'entry point arguments',
spark_submit_parameters: 'spark submit parameters',
spark_submit_parameters_tips: 'spark submit parameters',
template_id: 'spark template id',
template_id_tips: 'spark template id',
is_production: 'is production',
is_production_tips: 'is production',
json_format_tips: 'Json parameters format is abnormal'
Expand Down
2 changes: 2 additions & 0 deletions dolphinscheduler-ui/src/locales/zh_CN/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,8 @@ export default {
entry_point_arguments_tips: 'entry point arguments',
spark_submit_parameters: 'spark submit parameters',
spark_submit_parameters_tips: 'spark submit parameters',
template_id: 'spark template id',
template_id_tips: 'spark template id',
is_production: 'is production',
is_production_tips: 'is production',
json_format_tips: 'JSON参数格式异常'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,15 @@ export function useAliyunServerlessSpark(model: {
}
},

{
type: 'input',
field: 'templateId',
name: t('project.node.template_id'),
props: {
placeholder: t('project.node.template_id_tips')
}
},

{
type: 'switch',
field: 'isProduction',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ export function formatParams(data: INodeData): {
taskParams.codeType = data.codeType
taskParams.jobName = data.jobName
taskParams.engineReleaseVersion = data.engineReleaseVersion
taskParams.templateId = data.templateId
taskParams.entryPoint = data.entryPoint
taskParams.entryPointArguments = data.entryPointArguments
taskParams.sparkSubmitParameters = data.sparkSubmitParameters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ interface ITaskParams {
resourceQueueId?: string
codeType?: string
engineReleaseVersion?: string
templateId?: string
entryPoint?: string
entryPointArguments?: string
sparkSubmitParameters?: string
Expand Down
Loading