Skip to content

Commit a0fb913

Browse files
author
niumy
committed
[Improvement-17725][TaskPlugin] SagemakerTask resource leak repair
1 parent 3a96e4f commit a0fb913

File tree

2 files changed

+41
-7
lines changed
  • dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src

2 files changed

+41
-7
lines changed

dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@
2828
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
2929
import org.apache.dolphinscheduler.plugin.datasource.sagemaker.param.SagemakerConnectionParam;
3030
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
31+
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
3132
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
3233
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
3334
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
35+
import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
3436
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
3537
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
3638
import org.apache.dolphinscheduler.spi.enums.DbType;
@@ -108,6 +110,42 @@ public void init() {
108110
utils = new PipelineUtils();
109111
}
110112

113+
/**
114+
* If appIds is empty, submit a new remote application; otherwise, just track application status.
115+
*
116+
* @param taskCallBack
117+
* @throws TaskException
118+
*/
119+
@Override
120+
public void handle(TaskCallBack taskCallBack) throws TaskException {
121+
try {
122+
// if appIds is not empty, just track application status, avoid resubmitting remote task
123+
if (StringUtils.isNotEmpty(taskRequest.getAppIds())) {
124+
setAppIds(taskRequest.getAppIds());
125+
trackApplicationStatus();
126+
return;
127+
}
128+
129+
// submit a remote application
130+
submitApplication();
131+
132+
if (StringUtils.isNotEmpty(getAppIds())) {
133+
taskRequest.setAppIds(getAppIds());
134+
// callback to update remote application info
135+
taskCallBack.updateRemoteApplicationInfo(taskRequest.getTaskInstanceId(),
136+
new ApplicationInfo(getAppIds()));
137+
}
138+
139+
// keep tracking application status
140+
trackApplicationStatus();
141+
} finally {
142+
// shutdown client
143+
if (client != null) {
144+
client.shutdown();
145+
}
146+
}
147+
}
148+
111149
@Override
112150
public void submitApplication() throws TaskException {
113151
try {
@@ -120,6 +158,7 @@ public void submitApplication() throws TaskException {
120158
setAppIds(JSONUtils.toJsonString(pipelineId));
121159
} catch (Exception e) {
122160
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
161+
log.error("SageMaker task submit application error: {}", e.getMessage(), e);
123162
throw new TaskException("SageMaker task submit error", e);
124163
}
125164
}
@@ -150,11 +189,6 @@ public void trackApplicationStatus() throws TaskException {
150189
} catch (Exception e) {
151190
log.error("SageMaker task track application error: {}", e.getMessage(), e);
152191
throw new TaskException("SageMaker task track application error: " + e.getMessage(), e);
153-
} finally {
154-
// shutdown client
155-
if (client != null) {
156-
client.shutdown();
157-
}
158192
}
159193
}
160194

dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,6 @@ public void testTrackApplicationStatus_InitPipelineIdThrowsException() throws Ex
155155
});
156156

157157
assertEquals("SageMaker task track application error: sagemaker applicationID is null", exception.getMessage());
158-
verify(client, times(1)).shutdown();
159158
}
160159

161160
@Test
@@ -169,7 +168,8 @@ public void testCancelApplication_InitPipelineIdThrowsException() {
169168
});
170169

171170
// Verify the exception message
172-
assertEquals("SageMaker task cancel application error: sagemaker applicationID is null", exception.getMessage());
171+
assertEquals("SageMaker task cancel application error: sagemaker applicationID is null",
172+
exception.getMessage());
173173

174174
// Verify that client.shutdown() was called
175175
verify(client, times(1)).shutdown();

0 commit comments

Comments
 (0)