Skip to content

Commit feb870c

Browse files
committed
Close temporal metrics scope on job completion
1 parent adef734 commit feb870c

File tree

3 files changed

+30
-10
lines changed

3 files changed

+30
-10
lines changed

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ public class GobblinTemporalTaskRunner implements StandardMetricsBridge {
126126
private final boolean isMetricReportingFailureFatal;
127127
private final boolean isEventReportingFailureFatal;
128128
private final List<TemporalWorker> workers;
129+
private final WorkflowServiceStubs workflowServiceStubs;
129130

130131
public GobblinTemporalTaskRunner(String applicationName,
131132
String applicationId,
@@ -163,6 +164,9 @@ public GobblinTemporalTaskRunner(String applicationName,
163164
ConfigurationKeys.DEFAULT_GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL);
164165
this.workers = new ArrayList<>();
165166

167+
String connectionUri = clusterConfig.getString(GobblinTemporalConfigurationKeys.TEMPORAL_CONNECTION_STRING);
168+
this.workflowServiceStubs = TemporalWorkflowClientFactory.createServiceInstance(connectionUri);
169+
166170
logger.info("GobblinTaskRunner({}): applicationName {}, applicationId {}, taskRunnerId {}, config {}, appWorkDir {}",
167171
this.isTaskDriver ? "taskDriver" : "worker",
168172
applicationName,
@@ -241,12 +245,9 @@ public void start()
241245
private TemporalWorker initiateWorker() throws Exception {
242246
logger.info("Starting Temporal Worker");
243247

244-
String connectionUri = clusterConfig.getString(GobblinTemporalConfigurationKeys.TEMPORAL_CONNECTION_STRING);
245-
WorkflowServiceStubs service = TemporalWorkflowClientFactory.createServiceInstance(connectionUri);
246-
247248
String namespace = ConfigUtils.getString(clusterConfig, GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_NAMESPACE,
248249
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE);
249-
WorkflowClient client = TemporalWorkflowClientFactory.createClientInstance(service, namespace);
250+
WorkflowClient client = TemporalWorkflowClientFactory.createClientInstance(workflowServiceStubs, namespace);
250251

251252
String workerClassName = ConfigUtils.getString(clusterConfig,
252253
GobblinTemporalConfigurationKeys.WORKER_CLASS, GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS);
@@ -292,6 +293,12 @@ public synchronized void stop() {
292293
this.containerMetrics.get().stopMetricsReporting();
293294
}
294295

296+
try {
297+
this.workflowServiceStubs.getOptions().getMetricsScope().close();
298+
} catch (Exception e) {
299+
logger.error("Exception occurred while closing MetricsScope", e);
300+
}
301+
295302
workers.forEach(TemporalWorker::shutdown);
296303

297304
logger.info("All services are stopped.");

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.gobblin.temporal.joblauncher;
1919

20+
import java.io.IOException;
2021
import java.util.List;
2122
import java.util.Properties;
2223
import java.util.concurrent.ConcurrentHashMap;
@@ -188,4 +189,15 @@ protected void removeTasksFromCurrentJob(List<String> workUnitIdsToRemove) {
188189
protected void addTasksToCurrentJob(List<WorkUnit> workUnitsToAdd) {
189190
log.warn("NOT IMPLEMENTED: Temporal addTasksToCurrentJob");
190191
}
192+
193+
@Override
194+
public void close() throws IOException {
195+
try {
196+
this.workflowServiceStubs.getOptions().getMetricsScope().close();
197+
} catch (Exception e) {
198+
log.error("Exception occurred while closing MetricsScope ", e);
199+
} finally {
200+
super.close();
201+
}
202+
}
191203
}

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -210,15 +210,16 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
210210
LOGGER.info("No job schedule found, so running job " + jobUri);
211211
GobblinTemporalJobLauncherListener listener = new GobblinTemporalJobLauncherListener(this.launcherMetrics);
212212
JobLauncher launcher = buildJobLauncher(newJobArrival.getJobConfig());
213-
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
213+
try {
214+
launcher.launchJob(listener);
215+
} finally {
214216
try {
215217
launcher.cancelJob(listener);
216-
} catch (JobException e) {
217-
LOGGER.error("Failed to cancel the job during shutdown", e);
218-
throw new RuntimeException(e);
219218
}
220-
}));
221-
launcher.launchJob(listener);
219+
finally {
220+
launcher.close();
221+
}
222+
}
222223
}
223224
} catch (Exception je) {
224225
LOGGER.error("Failed to schedule or run job " + jobUri, je);

0 commit comments

Comments
 (0)