Skip to content

Commit a632613

Browse files
vsinghal85Vaibhav Singhal
authored andcommitted
Evaluate data quality only when Policies are applied (#4127)
* add shouldEvaluate data quality check at dataset level * Refactor checks for invoking evaluateAndEmitDataQuality --------- Co-authored-by: Vaibhav Singhal <vaibsing@vaibsing-mn7618.linkedin.biz>
1 parent ccf8ce3 commit a632613

File tree

1 file changed

+17
-9
lines changed

1 file changed

+17
-9
lines changed

gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -94,14 +94,8 @@ public Void call()
9494
metricContext = Instrumented.getMetricContext(datasetState, SafeDatasetCommit.class);
9595

9696
finalizeDatasetStateBeforeCommit(this.datasetState);
97-
// evaluate data quality at the dataset commit level, only when commit source is CommitActivityImpl
98-
if (SafeDatasetCommit.COMMIT_SRC_COMMIT_ACTIVITY_IMPL.equals(this.datasetCommitSrc)) {
99-
log.info("Evaluating data quality for commit activity for dataset {}.", this.datasetUrn);
100-
evaluateAndEmitDatasetQuality();
101-
} else {
102-
log.info("Skipping data quality evaluation for dataset {} as commit source is {}", this.datasetUrn,
103-
this.datasetCommitSrc);
104-
}
97+
// evaluate data quality at the dataset commit level
98+
evaluateAndEmitDatasetQuality();
10599
Class<? extends DataPublisher> dataPublisherClass;
106100
try (Closer closer = Closer.create()) {
107101
dataPublisherClass = JobContext.getJobDataPublisherClass(this.jobContext.getJobState())
@@ -455,9 +449,23 @@ private static Optional<CommitStep> buildDatasetStateCommitStep(String datasetUr
455449
* This method handles the business logic of data quality evaluation
456450
* at the dataset commit level, which is more appropriate than having
457451
* it in the JobState data container.
452+
*
453+
* Data quality evaluation is only performed when:
454+
* 1. Commit source is CommitActivityImpl
455+
* 2. Data quality policies are applied to the job
458456
*/
459457
private void evaluateAndEmitDatasetQuality() {
460-
DataQualityEvaluator.evaluateAndReportDatasetQuality(this.datasetState, this.jobContext.getJobState());
458+
JobState jobState = this.jobContext.getJobState();
459+
String policiesApplied = jobState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_LIST, StringUtils.EMPTY);
460+
log.info("Policies applied: {}", policiesApplied);
461+
boolean shouldEvaluateDataQuality = !policiesApplied.isEmpty();
462+
if (shouldEvaluateDataQuality && SafeDatasetCommit.COMMIT_SRC_COMMIT_ACTIVITY_IMPL.equals(this.datasetCommitSrc)) {
463+
log.info("Evaluating data quality for commit activity for dataset {}.", this.datasetUrn);
464+
DataQualityEvaluator.evaluateAndReportDatasetQuality(this.datasetState, jobState);
465+
} else {
466+
log.info("Skipping data quality evaluation for dataset {} as commit source is {} and policies applied are {}",
467+
this.datasetUrn, this.datasetCommitSrc, policiesApplied);
468+
}
461469
}
462470

463471
}

0 commit comments

Comments
 (0)