Skip to content

Commit 150f697

Browse files
vsinghal85Vaibhav Singhal
authored andcommitted
[GOBBLIN-2204] Implement FileSize Data Quality for FileBasedCopy
* Compute data quality and update task states * Retry for failed data quality check * Only do data quality check if data quality flag is enabled * Computing overall dataquality of data set and adding it to dataset summary * Changes for testing failed data quality changes * Updating task state and computing overall data quality * Refactor and checkstyle fix * Propogate data quality from fork to task state * Refactoring and unit tests * Add unit tests for FileAwareInputStreamDataWriterTest * refactor changes * Compute task level data quality * Some refactoring and code cleanup * Update tests * Fix checkstyle failures * Code cleanup * Adding metrics for overall data quality * Address PR comments and add metric for no of files evaluated for data quality * Refactor dq evaluation * Address PR comments * Introduce data quality flag, to control and manage job failures due to data quality * Address PR comments * Switch back to CounterBuilder approach for metrics * Evaluate data quality only for commit via CommitActivity * Remove unused enum * move getDataQuality to JobState * Address PR comments * Fix the policy evaluation logic when both optional and mandatory policies are provided * Remove codestyle errors and remove flag check at task level --------- Co-authored-by: Vaibhav Singhal <vaibsing@vaibsing-mn7618.linkedin.biz>
1 parent 20ccf5c commit 150f697

File tree

34 files changed

+1244
-137
lines changed

34 files changed

+1244
-137
lines changed

gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,9 @@ public class ConfigurationKeys {
328328
public static final String DATASET_URN_KEY = "dataset.urn";
329329
public static final String GLOBAL_WATERMARK_DATASET_URN = "__globalDatasetWatermark";
330330
public static final String DEFAULT_DATASET_URN = "";
331+
public static final String DATASET_QUALITY_STATUS_KEY = "dataset.quality.status";
332+
public static final String ENFORCE_DATA_QUALITY_FAILURE_KEY = "data.quality.enforce.failure";
333+
public static final Boolean DEFAULT_ENFORCE_DATA_QUALITY_FAILURE = false;
331334

332335
/**
333336
* Work unit related configuration properties.
@@ -498,6 +501,7 @@ public class ConfigurationKeys {
498501
* Configuration properties used by the quality checker.
499502
*/
500503
public static final String QUALITY_CHECKER_PREFIX = "qualitychecker";
504+
public static final String TASK_LEVEL_POLICY_RESULT_KEY = "gobblin.task.level.policy.result";
501505
public static final String TASK_LEVEL_POLICY_LIST = QUALITY_CHECKER_PREFIX + ".task.policies";
502506
public static final String TASK_LEVEL_POLICY_LIST_TYPE = QUALITY_CHECKER_PREFIX + ".task.policy.types";
503507
public static final String ROW_LEVEL_POLICY_LIST = QUALITY_CHECKER_PREFIX + ".row.policies";
@@ -938,7 +942,7 @@ public class ConfigurationKeys {
938942
public static final Boolean DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED = false;
939943

940944
public static final String METRICS_REPORTING_OPENTELEMETRY_ENDPOINT = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "endpoint";
941-
945+
public static final String METRICS_REPORTING_OPENTELEMETRY_FABRIC = METRICS_REPORTING_OPENTELEMETRY_CONFIGS_PREFIX + "fabric";
942946
// Headers to add to the OpenTelemetry HTTP Exporter, formatted as a JSON String with string keys and values
943947
public static final String METRICS_REPORTING_OPENTELEMETRY_HEADERS = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "headers";
944948

gobblin-api/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323

2424
public abstract class TaskLevelPolicy {
25-
private final State state;
25+
protected final State state;
2626
private final Type type;
2727

2828
public enum Type {
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.gobblin.policies.size;
19+
20+
import java.util.Optional;
21+
22+
import lombok.Getter;
23+
import lombok.extern.slf4j.Slf4j;
24+
25+
import org.apache.gobblin.configuration.State;
26+
import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
27+
28+
29+
/**
30+
* A task-level policy that checks if the bytes read matches the bytes written for a file copy operation.
31+
*/
32+
@Slf4j
33+
public class FileSizePolicy extends TaskLevelPolicy {
34+
35+
public static final String COPY_PREFIX = "gobblin.copy";
36+
public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead";
37+
public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten";
38+
39+
public FileSizePolicy(State state, TaskLevelPolicy.Type type) {
40+
super(state, type);
41+
}
42+
43+
@Override
44+
public Result executePolicy() {
45+
TransferBytes transferBytes = getBytesReadAndWritten(this.state).orElse(null);
46+
if (transferBytes == null) {
47+
return Result.FAILED;
48+
}
49+
Long bytesRead = transferBytes.getBytesRead();
50+
Long bytesWritten = transferBytes.getBytesWritten();
51+
52+
Long sizeDifference = Math.abs(bytesRead - bytesWritten);
53+
54+
if (sizeDifference == 0) {
55+
return Result.PASSED;
56+
}
57+
58+
log.warn("File size check failed - bytes read: {}, bytes written: {}, difference: {}", bytesRead, bytesWritten,
59+
sizeDifference);
60+
return Result.FAILED;
61+
}
62+
63+
@Override
64+
public String toString() {
65+
TransferBytes transferBytes = getBytesReadAndWritten(this.state).orElse(null);
66+
if (transferBytes != null) {
67+
return String.format("FileSizePolicy [bytesRead=%s, bytesWritten=%s]", transferBytes.getBytesRead(),
68+
transferBytes.getBytesWritten());
69+
} else {
70+
return "FileSizePolicy [bytesRead=null, bytesWritten=null]";
71+
}
72+
}
73+
74+
/**
75+
* Helper class to hold transfer bytes information
76+
*/
77+
@Getter
78+
private static class TransferBytes {
79+
final long bytesRead;
80+
final long bytesWritten;
81+
82+
TransferBytes(long bytesRead, long bytesWritten) {
83+
this.bytesRead = bytesRead;
84+
this.bytesWritten = bytesWritten;
85+
}
86+
}
87+
88+
/**
89+
* Extracts bytesRead and bytesWritten from the given state.
90+
* Returns Empty Optional if parsing fails.
91+
*/
92+
private Optional<TransferBytes> getBytesReadAndWritten(State state) {
93+
String bytesReadString = state.getProp(BYTES_READ_KEY);
94+
String bytesWrittenString = state.getProp(BYTES_WRITTEN_KEY);
95+
if (bytesReadString == null || bytesWrittenString == null) {
96+
log.error("Missing value(s): bytesReadStr={}, bytesWrittenStr={}", bytesReadString, bytesWrittenString);
97+
return Optional.empty();
98+
}
99+
try {
100+
long bytesRead = Long.parseLong(bytesReadString);
101+
long bytesWritten = Long.parseLong(bytesWrittenString);
102+
return Optional.of(new TransferBytes(bytesRead, bytesWritten));
103+
} catch (NumberFormatException e) {
104+
log.error("Invalid number format for bytesRead or bytesWritten: bytesRead='{}', bytesWritten='{}'",
105+
bytesReadString, bytesWrittenString, e);
106+
return Optional.empty();
107+
}
108+
}
109+
}

gobblin-core/src/main/java/org/apache/gobblin/publisher/TaskPublisher.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.gobblin.publisher;
1919

20+
import java.util.Set;
2021
import org.apache.gobblin.qualitychecker.task.TaskLevelPolicyCheckResults;
2122
import java.util.Map;
2223

@@ -63,8 +64,8 @@ public PublisherState canPublish() throws Exception {
6364
* Returns true if all tests from the PolicyChecker pass, false otherwise
6465
*/
6566
public boolean passedAllTests() {
66-
for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry : this.results.getPolicyResults().entrySet()) {
67-
if (entry.getKey().equals(TaskLevelPolicy.Result.FAILED) && entry.getValue().equals(TaskLevelPolicy.Type.FAIL)) {
67+
for (Map.Entry<TaskLevelPolicy.Result, Set<TaskLevelPolicy.Type>> entry : this.results.getPolicyResults().entrySet()) {
68+
if (entry.getKey().equals(TaskLevelPolicy.Result.FAILED) && entry.getValue().contains(TaskLevelPolicy.Type.FAIL)) {
6869
return false;
6970
}
7071
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.gobblin.qualitychecker;
19+
20+
import lombok.extern.slf4j.Slf4j;
21+
22+
23+
/**
24+
* An enumeration for possible statuses for Data quality checks.
25+
* Its values will be:
26+
* - PASSED: When all data quality checks pass
27+
* - FAILED: When any data quality check fails
28+
* - NOT_EVALUATED: When data quality check evaluation is not performed
29+
* - UNKNOWN: when the data quality status is not recognized or invalid
30+
*/
31+
@Slf4j
32+
public enum DataQualityStatus {
33+
PASSED,
34+
FAILED,
35+
NOT_EVALUATED,
36+
UNKNOWN;
37+
38+
public static DataQualityStatus fromString(String value) {
39+
if (value == null) {
40+
return NOT_EVALUATED;
41+
}
42+
try {
43+
return DataQualityStatus.valueOf(value.toUpperCase());
44+
} catch (IllegalArgumentException e) {
45+
log.error("Invalid DataQualityStatus value: {}. Returning UNKNOWN.", value, e);
46+
return UNKNOWN;
47+
}
48+
}
49+
}

gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyCheckResults.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,20 @@
1919

2020
import java.util.HashMap;
2121
import java.util.Map;
22+
import java.util.Set;
2223

2324

2425
/**
2526
* Wrapper around a Map of PolicyResults and Policy.Type
2627
*/
2728
public class TaskLevelPolicyCheckResults {
28-
private final Map<TaskLevelPolicy.Result, TaskLevelPolicy.Type> results;
29+
private final Map<TaskLevelPolicy.Result, Set<TaskLevelPolicy.Type>> results;
2930

3031
public TaskLevelPolicyCheckResults() {
3132
this.results = new HashMap<>();
3233
}
3334

34-
public Map<TaskLevelPolicy.Result, TaskLevelPolicy.Type> getPolicyResults() {
35+
public Map<TaskLevelPolicy.Result, Set<TaskLevelPolicy.Type>> getPolicyResults() {
3536
return this.results;
3637
}
3738
}

gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyChecker.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717

1818
package org.apache.gobblin.qualitychecker.task;
1919

20+
import java.util.EnumSet;
2021
import java.util.List;
21-
22+
import lombok.Getter;
2223
import org.slf4j.Logger;
2324
import org.slf4j.LoggerFactory;
2425

@@ -28,30 +29,25 @@
2829
* executes each one, and then stores the output
2930
* in a PolicyCheckResults object
3031
*/
32+
@Getter
3133
public class TaskLevelPolicyChecker {
32-
/**
33-
* An enumeration for possible statuses for Data quality checks,
34-
* its values will be PASSED, FAILED, in case if data quality check
35-
* evaluation is not performed for Job, it will be NOT_EVALUATED
36-
*/
37-
public enum DataQualityStatus {
38-
PASSED,
39-
FAILED,
40-
NOT_EVALUATED
41-
}
4234
private final List<TaskLevelPolicy> list;
4335
private static final Logger LOG = LoggerFactory.getLogger(TaskLevelPolicyChecker.class);
4436

37+
public static final String TASK_LEVEL_POLICY_RESULT_KEY = "gobblin.task.level.policy.result";
38+
4539
public TaskLevelPolicyChecker(List<TaskLevelPolicy> list) {
4640
this.list = list;
4741
}
4842

4943
public TaskLevelPolicyCheckResults executePolicies() {
5044
TaskLevelPolicyCheckResults results = new TaskLevelPolicyCheckResults();
45+
5146
for (TaskLevelPolicy p : this.list) {
5247
TaskLevelPolicy.Result result = p.executePolicy();
53-
results.getPolicyResults().put(result, p.getType());
54-
LOG.info("TaskLevelPolicy " + p + " of type " + p.getType() + " executed with result " + result);
48+
results.getPolicyResults().computeIfAbsent(result, r -> EnumSet.noneOf(TaskLevelPolicy.Type.class))
49+
.add(p.getType());
50+
LOG.info("TaskLevelPolicy {} of type {} executed with result {}", p, p.getType(), result);
5551
}
5652
return results;
5753
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.gobblin.policies.size;
19+
20+
import org.apache.gobblin.configuration.State;
21+
import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
22+
import org.testng.Assert;
23+
import org.testng.annotations.Test;
24+
25+
public class FileSizePolicyTest {
26+
27+
@Test
28+
public void testPolicyPass() {
29+
State state = new State();
30+
state.setProp(FileSizePolicy.BYTES_READ_KEY, 1000L);
31+
state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, 1000L);
32+
33+
FileSizePolicy policy = new FileSizePolicy(state, TaskLevelPolicy.Type.FAIL);
34+
Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.PASSED);
35+
}
36+
37+
@Test
38+
public void testPolicyFail() {
39+
State state = new State();
40+
state.setProp(FileSizePolicy.BYTES_READ_KEY, 1000L);
41+
state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, 900L);
42+
43+
FileSizePolicy policy = new FileSizePolicy(state, TaskLevelPolicy.Type.FAIL);
44+
Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.FAILED);
45+
}
46+
47+
@Test
48+
public void testMissingProperties() {
49+
State state = new State();
50+
// No properties set at all
51+
FileSizePolicy policy = new FileSizePolicy(state, TaskLevelPolicy.Type.FAIL);
52+
Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.FAILED);
53+
}
54+
55+
@Test
56+
public void testPartiallySetProperties() {
57+
State state = new State();
58+
// Only set bytes read, not bytes written
59+
state.setProp(FileSizePolicy.BYTES_READ_KEY, 1000L);
60+
61+
FileSizePolicy policy = new FileSizePolicy(state, TaskLevelPolicy.Type.FAIL);
62+
Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.FAILED);
63+
64+
// Reset state and only set bytes written, not bytes read
65+
state = new State();
66+
state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, 1000L);
67+
68+
policy = new FileSizePolicy(state, TaskLevelPolicy.Type.FAIL);
69+
Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.FAILED);
70+
}
71+
72+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.gobblin.qualitychecker;
19+
20+
import org.apache.gobblin.configuration.State;
21+
import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
22+
23+
24+
public class FailingTaskLevelPolicy extends TaskLevelPolicy {
25+
public FailingTaskLevelPolicy(State state, Type type) {
26+
super(state, type);
27+
}
28+
29+
@Override
30+
public Result executePolicy() {
31+
return Result.FAILED;
32+
}
33+
}

0 commit comments

Comments
 (0)