Skip to content

Commit 8dff1f8

Browse files
authored
Shutdown client properly (#252)
* Shutdown client properly Signed-off-by: Hongxin Liang <[email protected]>
1 parent c656650 commit 8dff1f8

File tree

1 file changed

+41
-38
lines changed

1 file changed

+41
-38
lines changed

jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java

Lines changed: 41 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -203,44 +203,47 @@ static DynamicJobSpec rewrite(
203203
Map<TaskIdentifier, TaskTemplate> taskTemplates,
204204
Map<WorkflowIdentifier, WorkflowTemplate> workflowTemplates) {
205205

206-
WorkflowNodeVisitor workflowNodeVisitor =
207-
IdentifierRewrite.builder()
208-
.domain(executionConfig.domain())
209-
.project(executionConfig.project())
210-
.version(executionConfig.version())
211-
.adminClient(
212-
FlyteAdminClient.create(config.platformUrl(), config.platformInsecure(), null))
213-
.build()
214-
.visitor();
215-
216-
List<Node> rewrittenNodes =
217-
spec.nodes().stream().map(workflowNodeVisitor::visitNode).collect(toUnmodifiableList());
218-
219-
Map<WorkflowIdentifier, WorkflowTemplate> usedSubWorkflows =
220-
ProjectClosure.collectSubWorkflows(rewrittenNodes, workflowTemplates);
221-
222-
Map<TaskIdentifier, TaskTemplate> usedTaskTemplates =
223-
ProjectClosure.collectTasks(rewrittenNodes, taskTemplates);
224-
225-
// FIXME one sub-workflow can use more sub-workflows, we should recursively collect used tasks
226-
// and workflows
227-
228-
Map<WorkflowIdentifier, WorkflowTemplate> rewrittenUsedSubWorkflows =
229-
mapValues(usedSubWorkflows, workflowNodeVisitor::visitWorkflowTemplate);
230-
231-
return spec.toBuilder()
232-
.nodes(rewrittenNodes)
233-
.subWorkflows(
234-
ImmutableMap.<WorkflowIdentifier, WorkflowTemplate>builder()
235-
.putAll(spec.subWorkflows())
236-
.putAll(rewrittenUsedSubWorkflows)
237-
.build())
238-
.tasks(
239-
ImmutableMap.<TaskIdentifier, TaskTemplate>builder()
240-
.putAll(spec.tasks())
241-
.putAll(usedTaskTemplates)
242-
.build())
243-
.build();
206+
try (FlyteAdminClient flyteAdminClient =
207+
FlyteAdminClient.create(config.platformUrl(), config.platformInsecure(), null)) {
208+
209+
WorkflowNodeVisitor workflowNodeVisitor =
210+
IdentifierRewrite.builder()
211+
.domain(executionConfig.domain())
212+
.project(executionConfig.project())
213+
.version(executionConfig.version())
214+
.adminClient(flyteAdminClient)
215+
.build()
216+
.visitor();
217+
218+
List<Node> rewrittenNodes =
219+
spec.nodes().stream().map(workflowNodeVisitor::visitNode).collect(toUnmodifiableList());
220+
221+
Map<WorkflowIdentifier, WorkflowTemplate> usedSubWorkflows =
222+
ProjectClosure.collectSubWorkflows(rewrittenNodes, workflowTemplates);
223+
224+
Map<TaskIdentifier, TaskTemplate> usedTaskTemplates =
225+
ProjectClosure.collectTasks(rewrittenNodes, taskTemplates);
226+
227+
// FIXME one sub-workflow can use more sub-workflows, we should recursively collect used tasks
228+
// and workflows
229+
230+
Map<WorkflowIdentifier, WorkflowTemplate> rewrittenUsedSubWorkflows =
231+
mapValues(usedSubWorkflows, workflowNodeVisitor::visitWorkflowTemplate);
232+
233+
return spec.toBuilder()
234+
.nodes(rewrittenNodes)
235+
.subWorkflows(
236+
ImmutableMap.<WorkflowIdentifier, WorkflowTemplate>builder()
237+
.putAll(spec.subWorkflows())
238+
.putAll(rewrittenUsedSubWorkflows)
239+
.build())
240+
.tasks(
241+
ImmutableMap.<TaskIdentifier, TaskTemplate>builder()
242+
.putAll(spec.tasks())
243+
.putAll(usedTaskTemplates)
244+
.build())
245+
.build();
246+
}
244247
}
245248

246249
private static DynamicWorkflowTask getDynamicWorkflowTask(String name) {

0 commit comments

Comments
 (0)