Skip to content

Commit e584e5b

Browse files
authored
[GOBBLIN-2182] Cleanup workdir in Gobblin-on-Temporal execution (#4085)
* added cleanup for got controlled with a config
1 parent a0cef28 commit e584e5b

File tree

2 files changed

+16
-1
lines changed

2 files changed

+16
-1
lines changed

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ public interface GobblinTemporalConfigurationKeys {
4343

4444
String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX = GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX + "arg.";
4545
String GOBBLIN_TEMPORAL_JOB_LAUNCHER_CONFIG_OVERRIDES = GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX + "config.overrides";
46+
String GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED = PREFIX + "work.dir.cleanup.enabled";
47+
String DEFAULT_GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED = "true";
4648

4749
/**
4850
* Suffix for metrics emitted by GobblinTemporalJobLauncher for preventing collisions with prod jobs

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@
5959
import org.apache.gobblin.source.workunit.WorkUnit;
6060
import org.apache.gobblin.util.ConfigUtils;
6161
import org.apache.gobblin.util.ParallelRunner;
62+
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
63+
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
6264

6365
/**
6466
* An implementation of {@link JobLauncher} that launches a Gobblin job using the Temporal task framework.
@@ -134,7 +136,11 @@ public void close() throws IOException {
134136
try {
135137
executeCancellation();
136138
} finally {
137-
super.close();
139+
try {
140+
cleanupWorkingDirectory();
141+
} finally {
142+
super.close();
143+
}
138144
}
139145
}
140146

@@ -277,6 +283,13 @@ protected void cleanupWorkingDirectory() throws IOException {
277283
GobblinClusterUtils.getJobStateFilePath(false, this.appWorkDir, this.jobContext.getJobId());
278284
this.fs.delete(jobStateFilePath, false);
279285
}
286+
287+
if (Boolean.parseBoolean(this.jobProps.getProperty(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED,
288+
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED))) {
289+
Path workDirRootPath = JobStateUtils.getWorkDirRoot(this.jobContext.getJobState());
290+
log.info("Cleaning up work directory : {} for job : {}", workDirRootPath, this.jobContext.getJobId());
291+
this.fs.delete(workDirRootPath, true);
292+
}
280293
}
281294
}
282295

0 commit comments

Comments
 (0)