Skip to content

Commit df06c8a

Browse files
Blazer-007Will-Lo
authored andcommitted
attempt partial commit only in case commit policy is partial commit (#4157)
1 parent 1c9ece0 commit df06c8a

File tree

2 files changed

+24
-1
lines changed

2 files changed

+24
-1
lines changed

gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/JobCommitPolicy.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919

2020
import java.util.Properties;
2121

22+
import com.typesafe.config.Config;
2223
import lombok.Getter;
2324

2425
import org.apache.gobblin.configuration.ConfigurationKeys;
2526
import org.apache.gobblin.configuration.State;
27+
import org.apache.gobblin.util.ConfigUtils;
2628

2729

2830
/**
@@ -107,4 +109,16 @@ public static JobCommitPolicy getCommitPolicy(Properties jobProps) {
107109
public static JobCommitPolicy getCommitPolicy(State state) {
108110
return forName(state.getProp(ConfigurationKeys.JOB_COMMIT_POLICY_KEY, ConfigurationKeys.DEFAULT_JOB_COMMIT_POLICY));
109111
}
112+
113+
/**
114+
* Get a {@link JobCommitPolicy} through its name specified in configuration property
115+
* {@link ConfigurationKeys#JOB_COMMIT_POLICY_KEY}.
116+
*
117+
* @param config a {@link Config} instance carrying job configuration properties
118+
* @return a {@link JobCommitPolicy} with the given name specified in {@link ConfigurationKeys#JOB_COMMIT_POLICY_KEY}
119+
*/
120+
public static JobCommitPolicy getCommitPolicy(Config config) {
121+
return forName(ConfigUtils.getString(config, ConfigurationKeys.JOB_COMMIT_POLICY_KEY,
122+
ConfigurationKeys.DEFAULT_JOB_COMMIT_POLICY));
123+
}
110124
}

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.temporal.workflow.Workflow;
3131

3232
import org.apache.gobblin.configuration.ConfigurationKeys;
33+
import org.apache.gobblin.source.extractor.JobCommitPolicy;
3334
import org.apache.gobblin.temporal.cluster.WorkerConfig;
3435
import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
3536
import org.apache.gobblin.temporal.ddm.work.CommitStats;
@@ -79,7 +80,10 @@ private CommitStats performWork(WUProcessingSpec workSpec, final Properties prop
7980
log.error("ProcessWorkUnits failure - attempting partial commit before re-throwing exception", e);
8081

8182
try {
82-
performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, workunitsProcessed, props);// Attempt partial commit before surfacing the failure
83+
if (shouldAttemptPartialCommit()) {
84+
performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, workunitsProcessed,
85+
props);// Attempt partial commit before surfacing the failure
86+
}
8387
} catch (Exception commitException) {
8488
// Combine current and commit exception messages for a more complete context
8589
String combinedMessage = String.format(
@@ -98,6 +102,11 @@ private CommitStats performWork(WUProcessingSpec workSpec, final Properties prop
98102
return performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, workunitsProcessed, props);
99103
}
100104

105+
private boolean shouldAttemptPartialCommit() {
106+
return JobCommitPolicy.getCommitPolicy(WorkerConfig.of(this).orElse(ConfigFactory.load()))
107+
.isAllowPartialCommit();
108+
}
109+
101110
private CommitStats performCommitIfAnyWorkUnitsProcessed(WUProcessingSpec workSpec,
102111
Map<String, Object> searchAttributes, Optional<Integer> workunitsProcessed, Properties props) {
103112
// we are only inhibiting commit when workunitsProcessed is actually known to be zero

0 commit comments

Comments
 (0)