Skip to content

Commit f157898

Browse files
authored
Merge pull request #1431 from cloudsufi/patch/array-logical-type-validation
[PLUGIN-1803] Add validation for logical type (array/repeated)
2 parents 74dd583 + 14d3260 commit f157898

File tree

2 files changed

+47
-5
lines changed

2 files changed

+47
-5
lines changed

src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -438,15 +438,28 @@ public static ValidationFailure validateFieldSchemaMatches(Field bqField, Schema
438438
if (bqField.getMode() == Field.Mode.REPEATED) {
439439
fieldSchema = fieldSchema.getComponentSchema();
440440
type = fieldSchema.getType();
441+
logicalType = fieldSchema.getLogicalType();
441442
}
442443
}
443444

445+
String[] incompatibleFieldErrorMessage = {
446+
String.format("Field '%s' of type '%s' is incompatible with column '%s' of type '%s' in BigQuery table '%s.%s'.",
447+
field.getName(), fieldSchema.getDisplayName(), bqField.getName(),
448+
BQ_TYPE_MAP.get(bqField.getType()), dataset, table) ,
449+
String.format("It must be of type '%s'.", BQ_TYPE_MAP.get(bqField.getType()))
450+
};
451+
if (logicalType != null) {
452+
if (LOGICAL_TYPE_MAP.get(logicalType) != null && !LOGICAL_TYPE_MAP.get(logicalType).contains(bqField.getType())) {
453+
return collector.addFailure(incompatibleFieldErrorMessage[0], incompatibleFieldErrorMessage[1]);
454+
}
455+
456+
// Return once logical types are validated. This is because logical types are represented as primitive types
457+
// internally.
458+
return null;
459+
}
460+
444461
if (TYPE_MAP.get(type) != null && !TYPE_MAP.get(type).contains(bqField.getType())) {
445-
return collector.addFailure(
446-
String.format("Field '%s' of type '%s' is incompatible with column '%s' of type '%s' " +
447-
"in BigQuery table '%s.%s'.", field.getName(), fieldSchema.getDisplayName(), bqField.getName(),
448-
BQ_TYPE_MAP.get(bqField.getType()), dataset, table),
449-
String.format("It must be of type '%s'.", BQ_TYPE_MAP.get(bqField.getType())));
462+
return collector.addFailure(incompatibleFieldErrorMessage[0], incompatibleFieldErrorMessage[1]);
450463
}
451464
return null;
452465
}

src/test/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtilTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.cdap.cdap.etl.api.FailureCollector;
2424
import io.cdap.cdap.etl.api.validation.ValidationFailure;
2525
import io.cdap.cdap.etl.mock.validation.MockFailureCollector;
26+
import io.cdap.plugin.gcp.bigquery.source.BigQuerySourceConfig;
2627
import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize.BigNumeric;
2728
import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize.Numeric;
2829
import io.cdap.plugin.gcp.common.GCPUtils;
@@ -478,4 +479,32 @@ public void testConvertFieldTypeJsonToString() {
478479
Schema result = BigQueryUtil.convertFieldType(field, null, null);
479480
Assert.assertEquals(expectedSchema, result);
480481
}
482+
483+
@Test
484+
public void testValidateFieldSchemaMatchesDate() {
485+
MockFailureCollector collector = new MockFailureCollector();
486+
Field bigQueryField = Field.newBuilder("testFieldRepeatedDate", StandardSQLTypeName.DATE)
487+
.setMode(Field.Mode.REPEATED).build();
488+
Schema.Field schemaField = Schema.Field.of("testFieldRepeatedDate",
489+
Schema.nullableOf(Schema.arrayOf(Schema.of(Schema.LogicalType.DATE))));
490+
ValidationFailure failure = BigQueryUtil.validateFieldSchemaMatches(bigQueryField, schemaField, "dataset",
491+
"table", BigQuerySourceConfig.SUPPORTED_TYPES, collector);
492+
Assert.assertNull(failure);
493+
Assert.assertEquals(0, collector.getValidationFailures().size());
494+
}
495+
496+
@Test
497+
public void testValidateFieldSchemaNotMatchesDate() {
498+
MockFailureCollector collector = new MockFailureCollector();
499+
Field bigQueryField = Field.newBuilder("testFieldRepeatedDate", StandardSQLTypeName.DATE)
500+
.setMode(Field.Mode.REPEATED).build();
501+
Schema.Field schemaField = Schema.Field.of("testFieldRepeatedDate",
502+
Schema.nullableOf(Schema.arrayOf(Schema.of(Schema.Type.STRING))));
503+
ValidationFailure failure = BigQueryUtil.validateFieldSchemaMatches(bigQueryField, schemaField, "dataset",
504+
"table", BigQuerySourceConfig.SUPPORTED_TYPES, collector);
505+
Assert.assertNotNull(failure);
506+
Assert.assertEquals(1, collector.getValidationFailures().size());
507+
Assert.assertEquals("Field 'testFieldRepeatedDate' of type 'string' is incompatible with" +
508+
" column 'testFieldRepeatedDate' of type 'date' in BigQuery table 'dataset.table'.", failure.getMessage());
509+
}
481510
}

0 commit comments

Comments
 (0)