Skip to content

Commit 23c4481

Browse files
[GOBBLIN-2194] Close temporal metrics scope on job completion (#4097)
Close temporal metrics scope on job completion Override close in GobblinTemporalJobLauncher
1 parent adef734 commit 23c4481

File tree

7 files changed

+101
-29
lines changed

7 files changed

+101
-29
lines changed

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import com.typesafe.config.ConfigValueFactory;
5050

5151
import io.temporal.client.WorkflowClient;
52-
import io.temporal.serviceclient.WorkflowServiceStubs;
5352
import lombok.Getter;
5453
import lombok.Setter;
5554

@@ -74,6 +73,7 @@
7473
import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
7574
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
7675
import org.apache.gobblin.temporal.workflows.client.TemporalWorkflowClientFactory;
76+
import org.apache.gobblin.temporal.workflows.service.ManagedWorkflowServiceStubs;
7777
import org.apache.gobblin.util.ClassAliasResolver;
7878
import org.apache.gobblin.util.ConfigUtils;
7979
import org.apache.gobblin.util.FileUtils;
@@ -126,6 +126,7 @@ public class GobblinTemporalTaskRunner implements StandardMetricsBridge {
126126
private final boolean isMetricReportingFailureFatal;
127127
private final boolean isEventReportingFailureFatal;
128128
private final List<TemporalWorker> workers;
129+
private final ManagedWorkflowServiceStubs managedWorkflowServiceStubs;
129130

130131
public GobblinTemporalTaskRunner(String applicationName,
131132
String applicationId,
@@ -163,6 +164,9 @@ public GobblinTemporalTaskRunner(String applicationName,
163164
ConfigurationKeys.DEFAULT_GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL);
164165
this.workers = new ArrayList<>();
165166

167+
String connectionUri = clusterConfig.getString(GobblinTemporalConfigurationKeys.TEMPORAL_CONNECTION_STRING);
168+
this.managedWorkflowServiceStubs = TemporalWorkflowClientFactory.createServiceInstance(connectionUri);
169+
166170
logger.info("GobblinTaskRunner({}): applicationName {}, applicationId {}, taskRunnerId {}, config {}, appWorkDir {}",
167171
this.isTaskDriver ? "taskDriver" : "worker",
168172
applicationName,
@@ -241,12 +245,10 @@ public void start()
241245
private TemporalWorker initiateWorker() throws Exception {
242246
logger.info("Starting Temporal Worker");
243247

244-
String connectionUri = clusterConfig.getString(GobblinTemporalConfigurationKeys.TEMPORAL_CONNECTION_STRING);
245-
WorkflowServiceStubs service = TemporalWorkflowClientFactory.createServiceInstance(connectionUri);
246-
247248
String namespace = ConfigUtils.getString(clusterConfig, GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_NAMESPACE,
248249
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE);
249-
WorkflowClient client = TemporalWorkflowClientFactory.createClientInstance(service, namespace);
250+
WorkflowClient client = TemporalWorkflowClientFactory.createClientInstance(
251+
managedWorkflowServiceStubs.getWorkflowServiceStubs(), namespace);
250252

251253
String workerClassName = ConfigUtils.getString(clusterConfig,
252254
GobblinTemporalConfigurationKeys.WORKER_CLASS, GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS);
@@ -293,6 +295,7 @@ public synchronized void stop() {
293295
}
294296

295297
workers.forEach(TemporalWorker::shutdown);
298+
managedWorkflowServiceStubs.close();
296299

297300
logger.info("All services are stopped.");
298301

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -137,13 +137,9 @@ public GobblinJobLauncher(Properties jobProps, Path appWorkDir,
137137
@Override
138138
public void close() throws IOException {
139139
try {
140-
executeCancellation();
140+
cleanupWorkingDirectory();
141141
} finally {
142-
try {
143-
cleanupWorkingDirectory();
144-
} finally {
145-
super.close();
146-
}
142+
super.close();
147143
}
148144
}
149145

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.gobblin.temporal.joblauncher;
1919

20+
import java.io.IOException;
2021
import java.util.List;
2122
import java.util.Properties;
2223
import java.util.concurrent.ConcurrentHashMap;
@@ -33,7 +34,6 @@
3334
import io.temporal.client.WorkflowClient;
3435
import io.temporal.client.WorkflowFailedException;
3536
import io.temporal.client.WorkflowStub;
36-
import io.temporal.serviceclient.WorkflowServiceStubs;
3737
import io.temporal.workflow.Workflow;
3838

3939
import org.apache.hadoop.fs.FileSystem;
@@ -48,6 +48,7 @@
4848
import org.apache.gobblin.source.workunit.WorkUnit;
4949
import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
5050
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
51+
import org.apache.gobblin.temporal.workflows.service.ManagedWorkflowServiceStubs;
5152
import org.apache.gobblin.util.ConfigUtils;
5253

5354
import static org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys.*;
@@ -74,7 +75,7 @@ public abstract class GobblinTemporalJobLauncher extends GobblinJobLauncher {
7475
private static final Logger log = Workflow.getLogger(GobblinTemporalJobLauncher.class);
7576
private static final int TERMINATION_TIMEOUT_SECONDS = 3;
7677

77-
protected WorkflowServiceStubs workflowServiceStubs;
78+
protected ManagedWorkflowServiceStubs managedWorkflowServiceStubs;
7879
protected WorkflowClient client;
7980
protected String queueName;
8081
protected String namespace;
@@ -87,10 +88,10 @@ public GobblinTemporalJobLauncher(Properties jobProps, Path appWorkDir,
8788
log.info("GobblinTemporalJobLauncher: appWorkDir {}; jobProps {}", appWorkDir, jobProps);
8889

8990
String connectionUri = jobProps.getProperty(TEMPORAL_CONNECTION_STRING);
90-
this.workflowServiceStubs = createServiceInstance(connectionUri);
91+
this.managedWorkflowServiceStubs = createServiceInstance(connectionUri);
9192

9293
this.namespace = jobProps.getProperty(GOBBLIN_TEMPORAL_NAMESPACE, DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE);
93-
this.client = createClientInstance(workflowServiceStubs, namespace);
94+
this.client = createClientInstance(managedWorkflowServiceStubs.getWorkflowServiceStubs(), namespace);
9495

9596
this.queueName = jobProps.getProperty(GOBBLIN_TEMPORAL_TASK_QUEUE, DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE);
9697

@@ -139,7 +140,8 @@ protected void executeCancellation() {
139140
.setNamespace(this.namespace)
140141
.setExecution(workflowStub.getExecution())
141142
.build();
142-
DescribeWorkflowExecutionResponse response = workflowServiceStubs.blockingStub().describeWorkflowExecution(request);
143+
DescribeWorkflowExecutionResponse response = managedWorkflowServiceStubs.getWorkflowServiceStubs()
144+
.blockingStub().describeWorkflowExecution(request);
143145

144146
WorkflowExecutionStatus status;
145147
try {
@@ -188,4 +190,17 @@ protected void removeTasksFromCurrentJob(List<String> workUnitIdsToRemove) {
188190
protected void addTasksToCurrentJob(List<WorkUnit> workUnitsToAdd) {
189191
log.warn("NOT IMPLEMENTED: Temporal addTasksToCurrentJob");
190192
}
193+
194+
@Override
195+
public void close() throws IOException {
196+
try {
197+
// Calling cancel before calling close on serviceStubs as it will shutdown the service which is required during cancellation.
198+
cancelJob(jobListener);
199+
} catch (Exception e) {
200+
log.error("Exception occurred while cancelling job", e);
201+
} finally {
202+
managedWorkflowServiceStubs.close();
203+
super.close();
204+
}
205+
}
191206
}

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -209,16 +209,17 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
209209
} else {
210210
LOGGER.info("No job schedule found, so running job " + jobUri);
211211
GobblinTemporalJobLauncherListener listener = new GobblinTemporalJobLauncherListener(this.launcherMetrics);
212-
JobLauncher launcher = buildJobLauncher(newJobArrival.getJobConfig());
213-
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
214-
try {
215-
launcher.cancelJob(listener);
216-
} catch (JobException e) {
217-
LOGGER.error("Failed to cancel the job during shutdown", e);
218-
throw new RuntimeException(e);
219-
}
220-
}));
221-
launcher.launchJob(listener);
212+
try (JobLauncher launcher = buildJobLauncher(newJobArrival.getJobConfig())) {
213+
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
214+
try {
215+
launcher.cancelJob(listener);
216+
} catch (JobException e) {
217+
LOGGER.error("Failed to cancel the job during shutdown", e);
218+
throw new RuntimeException(e);
219+
}
220+
}));
221+
launcher.launchJob(listener);
222+
}
222223
}
223224
} catch (Exception je) {
224225
LOGGER.error("Failed to schedule or run job " + jobUri, je);

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,12 @@
4646
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
4747
import org.apache.gobblin.temporal.ddm.work.assistance.MDCContextPropagator;
4848
import org.apache.gobblin.temporal.workflows.metrics.TemporalMetricsHelper;
49+
import org.apache.gobblin.temporal.workflows.service.ManagedWorkflowServiceStubs;
4950
import org.apache.gobblin.util.ConfigUtils;
5051

5152

5253
public class TemporalWorkflowClientFactory {
53-
public static WorkflowServiceStubs createServiceInstance(String connectionUri) throws Exception {
54+
public static ManagedWorkflowServiceStubs createServiceInstance(String connectionUri) throws Exception {
5455
GobblinClusterUtils.setSystemProperties(ConfigFactory.load());
5556
Config config = GobblinClusterUtils.addDynamicConfig(ConfigFactory.load());
5657
String SHARED_KAFKA_CONFIG_PREFIX_WITH_DOT = "gobblin.kafka.sharedConfig.";
@@ -119,7 +120,7 @@ public static WorkflowServiceStubs createServiceInstance(String connectionUri) t
119120
.setSslContext(sslContext)
120121
.setMetricsScope(metricsScope)
121122
.build();
122-
return WorkflowServiceStubs.newServiceStubs(options);
123+
return new ManagedWorkflowServiceStubs(WorkflowServiceStubs.newServiceStubs(options));
123124
}
124125

125126
public static WorkflowClient createClientInstance(WorkflowServiceStubs service, String namespace) {
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.gobblin.temporal.workflows.service;
19+
20+
import java.io.Closeable;
21+
22+
import io.temporal.serviceclient.WorkflowServiceStubs;
23+
import lombok.Getter;
24+
import lombok.extern.slf4j.Slf4j;
25+
26+
/**
27+
* A wrapper class of {@link WorkflowServiceStubs} that implements the Closeable interface.
28+
* It manages the lifecycle of {@link WorkflowServiceStubs}, ensuring proper shutdown and resource cleanup.
29+
*/
30+
@Getter
31+
@Slf4j
32+
public class ManagedWorkflowServiceStubs implements Closeable {
33+
private final WorkflowServiceStubs workflowServiceStubs;
34+
35+
public ManagedWorkflowServiceStubs(WorkflowServiceStubs serviceStubs) {
36+
this.workflowServiceStubs = serviceStubs;
37+
}
38+
39+
@Override
40+
public void close() {
41+
try {
42+
workflowServiceStubs.getOptions().getMetricsScope().close();
43+
}
44+
catch (Exception e) {
45+
log.error("Exception occurred while closing metrics scope", e);
46+
}
47+
try {
48+
workflowServiceStubs.shutdown();
49+
}
50+
catch (Exception e) {
51+
log.error("Exception occurred while shutting down WorkflowServiceStubs", e);
52+
}
53+
}
54+
}

gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.gobblin.source.workunit.WorkUnit;
5050
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
5151
import org.apache.gobblin.temporal.workflows.client.TemporalWorkflowClientFactory;
52+
import org.apache.gobblin.temporal.workflows.service.ManagedWorkflowServiceStubs;
5253
import org.apache.gobblin.util.JobLauncherUtils;
5354

5455
import static org.mockito.Mockito.mock;
@@ -83,6 +84,7 @@ protected void submitJob(List<WorkUnit> workUnits)
8384
@BeforeClass
8485
public void setUp() throws Exception {
8586
mockServiceStubs = mock(WorkflowServiceStubs.class);
87+
ManagedWorkflowServiceStubs managedWorkflowServiceStubs = new ManagedWorkflowServiceStubs(mockServiceStubs);
8688
mockClient = mock(WorkflowClient.class);
8789
mockExecutionInfo = mock(WorkflowExecutionInfo.class);
8890
DescribeWorkflowExecutionResponse mockResponse = mock(DescribeWorkflowExecutionResponse.class);
@@ -93,7 +95,7 @@ public void setUp() throws Exception {
9395

9496
mockWorkflowClientFactory = Mockito.mockStatic(TemporalWorkflowClientFactory.class);
9597
mockWorkflowClientFactory.when(() -> TemporalWorkflowClientFactory.createServiceInstance(Mockito.anyString()))
96-
.thenReturn(mockServiceStubs);
98+
.thenReturn(managedWorkflowServiceStubs);
9799
mockWorkflowClientFactory.when(() -> TemporalWorkflowClientFactory.createClientInstance(Mockito.any(), Mockito.anyString()))
98100
.thenReturn(mockClient);
99101

0 commit comments

Comments
 (0)