Skip to content

Commit f7fe367

Browse files
vsinghal85Vaibhav Singhal
andauthored
DQ handling for no bytes read scenarios and metrics for bytes read/written (#4135)
* introduce not evaluated flag for null bytes read case and bytes read/written metrics --------- Co-authored-by: Vaibhav Singhal <vaibsing@vaibsing-mn7618.linkedin.biz>
1 parent 08272be commit f7fe367

File tree

7 files changed

+102
-24
lines changed

7 files changed

+102
-24
lines changed

gobblin-api/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicy.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ public enum Type {
3232

3333
public enum Result {
3434
PASSED, // The test passed
35-
FAILED // The test failed
35+
FAILED, // The test failed
36+
NOT_EVALUATED // The test was not evaluated
3637
}
3738

3839
public TaskLevelPolicy(State state, TaskLevelPolicy.Type type) {

gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,12 @@ public FileSizePolicy(State state, TaskLevelPolicy.Type type) {
4444
public Result executePolicy() {
4545
TransferBytes transferBytes = getBytesReadAndWritten(this.state).orElse(null);
4646
if (transferBytes == null) {
47-
return Result.FAILED;
47+
return Result.NOT_EVALUATED;
4848
}
49-
Long bytesRead = transferBytes.getBytesRead();
50-
Long bytesWritten = transferBytes.getBytesWritten();
49+
long bytesRead = transferBytes.getBytesRead();
50+
long bytesWritten = transferBytes.getBytesWritten();
5151

52-
Long sizeDifference = Math.abs(bytesRead - bytesWritten);
52+
long sizeDifference = Math.abs(bytesRead - bytesWritten);
5353

5454
if (sizeDifference == 0) {
5555
return Result.PASSED;
@@ -67,7 +67,7 @@ public String toString() {
6767
return String.format("FileSizePolicy [bytesRead=%s, bytesWritten=%s]", transferBytes.getBytesRead(),
6868
transferBytes.getBytesWritten());
6969
} else {
70-
return "FileSizePolicy [bytesRead=null, bytesWritten=null]";
70+
return "Transfer bytes information not available";
7171
}
7272
}
7373

@@ -87,18 +87,28 @@ private static class TransferBytes {
8787

8888
/**
8989
* Extracts bytesRead and bytesWritten from the given state.
90+
* If bytesRead is null/zero, skip data quality check by returning Optional.empty().
9091
* Returns Empty Optional if parsing fails.
9192
*/
9293
private Optional<TransferBytes> getBytesReadAndWritten(State state) {
9394
String bytesReadString = state.getProp(BYTES_READ_KEY);
9495
String bytesWrittenString = state.getProp(BYTES_WRITTEN_KEY);
95-
if (bytesReadString == null || bytesWrittenString == null) {
96-
log.error("Missing value(s): bytesReadStr={}, bytesWrittenStr={}", bytesReadString, bytesWrittenString);
96+
if (bytesReadString == null) {
97+
log.error("Missing value(s): bytesReadStr=null, bytesWrittenStr={}", bytesWrittenString);
9798
return Optional.empty();
9899
}
99100
try {
100101
long bytesRead = Long.parseLong(bytesReadString);
101-
long bytesWritten = Long.parseLong(bytesWrittenString);
102+
if (bytesRead == 0) {
103+
log.warn("Bytes read is zero, skipping file size check.");
104+
return Optional.empty();
105+
}
106+
long bytesWritten = 0;
107+
if (bytesWrittenString == null) {
108+
log.error("Missing bytesWritten value: bytesWrittenStr=null, assuming 0 bytes written.");
109+
} else {
110+
bytesWritten = Long.parseLong(bytesWrittenString);
111+
}
102112
return Optional.of(new TransferBytes(bytesRead, bytesWritten));
103113
} catch (NumberFormatException e) {
104114
log.error("Invalid number format for bytesRead or bytesWritten: bytesRead='{}', bytesWritten='{}'",

gobblin-core/src/test/java/org/apache/gobblin/policies/size/FileSizePolicyTest.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public void testMissingProperties() {
4949
State state = new State();
5050
// No properties set at all
5151
FileSizePolicy policy = new FileSizePolicy(state, TaskLevelPolicy.Type.FAIL);
52-
Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.FAILED);
52+
Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.NOT_EVALUATED);
5353
}
5454

5555
@Test
@@ -64,9 +64,20 @@ public void testPartiallySetProperties() {
6464
// Reset state and only set bytes written, not bytes read
6565
state = new State();
6666
state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, 1000L);
67-
67+
// bytes read is null
6868
policy = new FileSizePolicy(state, TaskLevelPolicy.Type.FAIL);
69-
Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.FAILED);
69+
Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.NOT_EVALUATED);
70+
}
71+
72+
@Test
73+
public void testEmptyDirectoryHandling() {
74+
State state = new State();
75+
// Test case: Empty directory with 0 bytes read, no bytes written (directory case)
76+
state.setProp(FileSizePolicy.BYTES_READ_KEY, 0L);
77+
// Don't set BYTES_WRITTEN_KEY to simulate directory copy scenario
78+
79+
FileSizePolicy policy = new FileSizePolicy(state, TaskLevelPolicy.Type.FAIL);
80+
Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.NOT_EVALUATED);
7081
}
7182

7283
}

gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ public class ServiceMetricNames {
2828
public static final String DATA_QUALITY_SUCCESS_FILE_COUNT = "dataQualitySuccessFileCount";
2929
public static final String DATA_QUALITY_FAILURE_FILE_COUNT = "dataQualityFailureFileCount";
3030
public static final String DATA_QUALITY_NON_EVALUATED_FILE_COUNT = "dataQualityNonEvaluatedFileCount";
31+
public static final String DATA_QUALITY_BYTES_READ = "dataQualityBytesRead";
32+
public static final String DATA_QUALITY_BYTES_WRITTEN = "dataQualityBytesWritten";
3133

3234
// Flow Compilation Meters and Timer
3335
public static final String FLOW_COMPILATION_SUCCESSFUL_METER = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowCompilation.successful";

gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
3333
import org.apache.gobblin.metrics.ServiceMetricNames;
3434
import org.apache.gobblin.metrics.event.TimingEvent;
35+
import org.apache.gobblin.policies.size.FileSizePolicy;
3536
import org.apache.gobblin.qualitychecker.DataQualityStatus;
3637
import org.apache.gobblin.runtime.JobState;
3738
import org.apache.gobblin.runtime.TaskState;
@@ -66,6 +67,10 @@ public static class DataQualityEvaluationResult {
6667
private final int failedFiles;
6768
// Number of files that were not evaluated for data quality for example files not found or not processed
6869
private final int nonEvaluatedFiles;
70+
// total bytes read
71+
private final long bytesRead;
72+
// total bytes written
73+
private final long bytesWritten;
6974
}
7075

7176
/**
@@ -85,7 +90,7 @@ public static DataQualityEvaluationResult evaluateAndReportDatasetQuality(JobSta
8590
jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, result.getQualityStatus().name());
8691
// Emit dataset-specific metrics
8792
emitMetrics(jobState, result.getQualityStatus(), result.getTotalFiles(), result.getPassedFiles(),
88-
result.getFailedFiles(), result.getNonEvaluatedFiles(), datasetState.getDatasetUrn());
93+
result.getFailedFiles(), result.getNonEvaluatedFiles(), result.getBytesRead(), result.getBytesWritten(), datasetState.getDatasetUrn());
8994

9095
return result;
9196
}
@@ -103,17 +108,19 @@ public static DataQualityEvaluationResult evaluateDataQuality(List<TaskState> ta
103108
int failedFilesCount = 0;
104109
int passedFilesCount = 0;
105110
int nonEvaluatedFilesCount = 0;
111+
long bytesRead = 0;
112+
long bytesWritten = 0;
106113

107114
for (TaskState taskState : taskStates) {
108115
totalFiles++;
109-
110116
// Handle null task states gracefully
111117
if (taskState == null) {
112118
log.warn("Encountered null task state, skipping data quality evaluation for this task");
113119
nonEvaluatedFilesCount++;
114120
continue;
115121
}
116-
122+
bytesRead += taskState.getPropAsLong(FileSizePolicy.BYTES_READ_KEY, 0L);
123+
bytesWritten += taskState.getPropAsLong(FileSizePolicy.BYTES_WRITTEN_KEY, 0L);
117124
DataQualityStatus taskDataQuality = null;
118125
String result = taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
119126
taskDataQuality = DataQualityStatus.fromString(result);
@@ -138,11 +145,11 @@ public static DataQualityEvaluationResult evaluateDataQuality(List<TaskState> ta
138145
log.info("Data quality evaluation summary - Total: {}, Passed: {}, Failed: {}, Not Evaluated: {}", totalFiles,
139146
passedFilesCount, failedFilesCount, nonEvaluatedFilesCount);
140147
return new DataQualityEvaluationResult(jobDataQualityStatus, totalFiles, passedFilesCount, failedFilesCount,
141-
nonEvaluatedFilesCount);
148+
nonEvaluatedFilesCount, bytesRead, bytesWritten);
142149
}
143150

144151
private static void emitMetrics(JobState jobState, final DataQualityStatus jobDataQuality, final int totalFiles,
145-
final int passedFilesCount, final int failedFilesCount, final int nonEvaluatedFilesCount,
152+
final int passedFilesCount, final int failedFilesCount, final int nonEvaluatedFilesCount, final long bytesRead, final long bytesWritten,
146153
final String datasetUrn) {
147154
try {
148155
// Check if OpenTelemetry is enabled
@@ -187,6 +194,17 @@ private static void emitMetrics(JobState jobState, final DataQualityStatus jobDa
187194
meter.counterBuilder(ServiceMetricNames.DATA_QUALITY_NON_EVALUATED_FILE_COUNT)
188195
.setDescription("Number of files that did not have data quality evaluation").build()
189196
.add(nonEvaluatedFilesCount, tags);
197+
198+
// Emit bytes read
199+
meter.counterBuilder(ServiceMetricNames.DATA_QUALITY_BYTES_READ)
200+
.setDescription("Total bytes read").build()
201+
.add(bytesRead, tags);
202+
203+
// Emit bytes written
204+
meter.counterBuilder(ServiceMetricNames.DATA_QUALITY_BYTES_WRITTEN)
205+
.setDescription("Total bytes written").build()
206+
.add(bytesWritten, tags);
207+
190208
} catch (Exception e) {
191209
log.error("Error in emitMetrics for job: {}", jobState.getJobName(), e);
192210
}

gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,8 @@ private void completeShutdown() {
320320

321321
private void computeAndUpdateTaskDataQuality() {
322322
DataQualityStatus overallTaskDataQuality = DataQualityStatus.PASSED;
323+
boolean hasEvaluatedForks = false;
324+
323325
for (Optional<Fork> fork : this.forks.keySet()) {
324326
if (fork.isPresent()) {
325327
TaskState forkTaskState = fork.get().getTaskState();
@@ -329,17 +331,37 @@ private void computeAndUpdateTaskDataQuality() {
329331
try {
330332
if (result != null) {
331333
forkDataQualityStatus = DataQualityStatus.valueOf(result);
334+
} else {
335+
forkDataQualityStatus = DataQualityStatus.NOT_EVALUATED;
332336
}
333337
} catch (IllegalArgumentException e) {
334338
Log.warn("Unknown data quality status encountered: " + result);
335339
forkDataQualityStatus = DataQualityStatus.UNKNOWN;
336340
}
337-
if (DataQualityStatus.FAILED == forkDataQualityStatus) {
338-
overallTaskDataQuality = DataQualityStatus.FAILED;
341+
/*
342+
* If any fork fails, overall status should be FAILED
343+
* FAILED status cannot be overridden by subsequent successes
344+
* Handle NOT_EVALUATED and UNKNOWN appropriately
345+
* If forkDataQualityStatus is PASSED and overall is not FAILED/UNKNOWN, keep PASSED
346+
*/
347+
if (forkDataQualityStatus != DataQualityStatus.NOT_EVALUATED) {
348+
hasEvaluatedForks = true;
349+
if (forkDataQualityStatus == DataQualityStatus.FAILED) {
350+
overallTaskDataQuality = DataQualityStatus.FAILED;
351+
} else if (forkDataQualityStatus == DataQualityStatus.UNKNOWN &&
352+
overallTaskDataQuality != DataQualityStatus.FAILED) {
353+
overallTaskDataQuality = DataQualityStatus.UNKNOWN;
354+
}
339355
}
340356
}
341357
}
342358
}
359+
360+
// If no forks were evaluated, set overall task status to NOT_EVALUATED
361+
if (!hasEvaluatedForks) {
362+
overallTaskDataQuality = DataQualityStatus.NOT_EVALUATED;
363+
}
364+
343365
LOG.info("Data quality state of the task is {}", overallTaskDataQuality);
344366
this.taskState.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY, overallTaskDataQuality.name());
345367
}

gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -630,11 +630,7 @@ private boolean checkDataQuality(Optional<Object> schema)
630630
TaskLevelPolicyCheckResults taskResults =
631631
this.taskContext.getTaskLevelPolicyChecker(this.forkTaskState, this.branches > 1 ? this.index : -1)
632632
.executePolicies();
633-
boolean hasFailureForMandatoryPolicy = taskResults.getPolicyResults()
634-
.getOrDefault(TaskLevelPolicy.Result.FAILED, java.util.Collections.emptySet())
635-
.contains(TaskLevelPolicy.Type.FAIL);
636-
forkTaskState.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY,
637-
hasFailureForMandatoryPolicy ? DataQualityStatus.FAILED.name() : DataQualityStatus.PASSED.name());
633+
this.computeAndUpdateForkDataQuality(taskResults);
638634
TaskPublisher publisher = this.taskContext.getTaskPublisher(this.forkTaskState, taskResults);
639635
switch (publisher.canPublish()) {
640636
case SUCCESS:
@@ -659,6 +655,24 @@ private boolean checkDataQuality(Optional<Object> schema)
659655
}
660656
}
661657

658+
private void computeAndUpdateForkDataQuality(TaskLevelPolicyCheckResults taskResults) {
659+
boolean hasFailureForMandatoryPolicy =
660+
taskResults.getPolicyResults().getOrDefault(TaskLevelPolicy.Result.FAILED, java.util.Collections.emptySet())
661+
.contains(TaskLevelPolicy.Type.FAIL);
662+
boolean hasNotEvaluatedForMandatoryPolicy = taskResults.getPolicyResults()
663+
.getOrDefault(TaskLevelPolicy.Result.NOT_EVALUATED, java.util.Collections.emptySet())
664+
.contains(TaskLevelPolicy.Type.FAIL);
665+
String forkLevelDataQualityResult;
666+
if (hasFailureForMandatoryPolicy) {
667+
forkLevelDataQualityResult = DataQualityStatus.FAILED.name();
668+
} else if (hasNotEvaluatedForMandatoryPolicy) {
669+
forkLevelDataQualityResult = DataQualityStatus.NOT_EVALUATED.name();
670+
} else {
671+
forkLevelDataQualityResult = DataQualityStatus.PASSED.name();
672+
}
673+
forkTaskState.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY, forkLevelDataQualityResult);
674+
}
675+
662676
/**
663677
* Commit task data.
664678
*/

0 commit comments

Comments
 (0)