Skip to content

Commit ab3c3b3

Browse files
psainicsvikasrathee-cs
authored andcommitted
JSON Support
1 parent 08fa1f8 commit ab3c3b3

16 files changed

+641
-27
lines changed

docs/BigQueryTable-batchsink.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,13 @@ bucket will be created and then deleted after the run finishes.
5252

5353
**GCS Upload Request Chunk Size**: GCS upload request chunk size in bytes. Default value is 8388608 bytes.
5454

55+
**JSON String**: List of fields to be written to BigQuery as a JSON string.
56+
The fields must be of type STRING. To target nested fields, use dot notation.
57+
For example, 'name.first' will target the 'first' field in the 'name' record. (Macro Enabled)
58+
59+
Use a comma-separated list to specify multiple fields in macro format.
60+
Example: "nestedObject.nestedArray.raw, nestedArray.raw".
61+
5562
**Operation**: Type of write operation to perform. This can be set to Insert, Update or Upsert.
5663
* Insert - all records will be inserted in destination table.
5764
* Update - records that match on Table Key will be updated in the table. Records that do not match

src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.google.cloud.bigquery.BigQuery;
2020
import com.google.cloud.bigquery.Dataset;
2121
import com.google.cloud.bigquery.DatasetId;
22+
import com.google.cloud.bigquery.Table;
2223
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
2324
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema;
2425
import com.google.cloud.kms.v1.CryptoKeyName;
@@ -47,7 +48,11 @@
4748
import org.slf4j.LoggerFactory;
4849

4950
import java.io.IOException;
51+
import java.util.ArrayList;
52+
import java.util.Arrays;
53+
import java.util.HashSet;
5054
import java.util.List;
55+
import java.util.Set;
5156
import java.util.UUID;
5257
import java.util.stream.Collectors;
5358

@@ -145,14 +150,27 @@ public void transform(StructuredRecord input, Emitter<KeyValue<StructuredRecord,
145150
*/
146151
protected final void initOutput(BatchSinkContext context, BigQuery bigQuery, String outputName, String fqn,
147152
String tableName, @Nullable Schema tableSchema, String bucket,
148-
FailureCollector collector, @Nullable String marker) throws IOException {
153+
FailureCollector collector, @Nullable String marker,
154+
Table table) throws IOException {
149155
LOG.debug("Init output for table '{}' with schema: {}", tableName, tableSchema);
150156

151157
List<BigQueryTableFieldSchema> fields = BigQuerySinkUtils.getBigQueryTableFields(bigQuery, tableName, tableSchema,
152158
getConfig().isAllowSchemaRelaxation(), getConfig().getDatasetProject(),
153159
getConfig().getDataset(), getConfig().isTruncateTableSet(), collector);
154160

155161
Configuration configuration = new Configuration(baseConfiguration);
162+
if (table != null) {
163+
com.google.cloud.bigquery.Schema bqSchema = table.getDefinition().getSchema();
164+
if (bqSchema != null) {
165+
String jsonStringFields = BigQuerySinkUtils.getJsonStringFieldsFromBQSchema(bqSchema);
166+
configuration.set(BigQueryConstants.CONFIG_JSON_STRING_FIELDS, jsonStringFields);
167+
BigQuerySinkUtils.setJsonStringFields(fields, jsonStringFields);
168+
}
169+
}
170+
171+
if (getConfig().getJsonStringFields() != null && !getConfig().getJsonStringFields().isEmpty()) {
172+
BigQuerySinkUtils.setJsonStringFields(fields, getConfig().getJsonStringFields());
173+
}
156174

157175
// Build GCS storage path for this bucket output.
158176
String temporaryGcsPath = BigQuerySinkUtils.getTemporaryGcsPath(bucket, runUUID.toString(), tableName);
@@ -229,6 +247,7 @@ private Configuration getBaseConfiguration(@Nullable CryptoKeyName cmekKeyName)
229247
config.isAllowSchemaRelaxation());
230248
baseConfiguration.setStrings(BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION.getKey(),
231249
config.getWriteDisposition().name());
250+
baseConfiguration.setStrings(BigQueryConstants.CONFIG_JSON_STRING_FIELDS, config.getJsonStringFields());
232251
// this setting is needed because gcs has default chunk size of 64MB. This is large default chunk size which can
233252
// cause OOM issue if there are many tables being written. See this - CDAP-16670
234253
String gcsChunkSize = "8388608";
@@ -310,4 +329,82 @@ protected Configuration getOutputConfiguration() throws IOException {
310329
return configuration;
311330
}
312331

332+
/**
333+
* Validates that the fields to be converted to JSON strings are present in the Output Schema.
334+
* @param schema Output Schema.
335+
* @param jsonStringFields List of fields to be converted to JSON strings comma separated.
336+
* @param collector FailureCollector to collect errors.
337+
*/
338+
public void validateJsonStringFields(Schema schema,
339+
String jsonStringFields, FailureCollector collector) {
340+
Set<String> jsonFields = new HashSet<>(Arrays.asList(jsonStringFields.split(",")));
341+
Set<String> jsonFieldsValidated = new HashSet<>();
342+
validateJsonStringFields(schema, jsonFields, new ArrayList<>(), collector, jsonFieldsValidated);
343+
jsonFields.removeAll(jsonFieldsValidated);
344+
if (!jsonFields.isEmpty()) {
345+
collector.addFailure(String.format("Field(s) '%s' are not present in the Output Schema.",
346+
String.join(", ", jsonFields)),
347+
"Remove the field(s) from the list of fields to be converted to JSON strings.")
348+
.withConfigProperty(AbstractBigQuerySinkConfig.NAME_JSON_STRING_FIELDS);
349+
}
350+
}
351+
352+
private void validateJsonStringFields(Schema schema, Set<String> jsonFields, ArrayList<String> path,
353+
FailureCollector collector, Set<String> jsonFieldsValidated) {
354+
String fieldPath = String.join(".", path);
355+
String actionMessage = "Only type 'STRING' is supported.";
356+
357+
Schema.LogicalType logicalType = schema.isNullable() ? schema.getNonNullable().getLogicalType() :
358+
schema.getLogicalType();
359+
if (logicalType != null && jsonFields.contains(fieldPath)) {
360+
collector.addFailure(
361+
String.format("Field '%s' is of type '%s' which is not supported for conversion to JSON string.",
362+
fieldPath, logicalType),
363+
actionMessage).withConfigProperty(AbstractBigQuerySinkConfig.NAME_JSON_STRING_FIELDS);
364+
return;
365+
}
366+
Schema.Type type = getEffectiveType(schema);
367+
List<Schema.Field> fields = getEffectiveFields(schema);
368+
String errorMessage = String.format(
369+
"Field '%s' is of type '%s' which is not supported for conversion to JSON string.", fieldPath, type);
370+
371+
if (type == Schema.Type.RECORD && fields != null) {
372+
if (jsonFields.contains(fieldPath)) {
373+
collector.addFailure(errorMessage, actionMessage)
374+
.withConfigProperty(AbstractBigQuerySinkConfig.NAME_JSON_STRING_FIELDS);
375+
}
376+
for (Schema.Field field : fields) {
377+
path.add(field.getName());
378+
validateJsonStringFields(field.getSchema(), jsonFields, path, collector, jsonFieldsValidated);
379+
path.remove(path.size() - 1);
380+
}
381+
} else {
382+
jsonFieldsValidated.add(fieldPath);
383+
if (type != Schema.Type.STRING && jsonFields.contains(fieldPath)) {
384+
collector.addFailure(errorMessage, actionMessage)
385+
.withConfigProperty(AbstractBigQuerySinkConfig.NAME_JSON_STRING_FIELDS);
386+
}
387+
}
388+
}
389+
390+
private static Schema.Type getEffectiveType(Schema schema) {
391+
Schema nonNullableSchema = schema.isNullable() ? schema.getNonNullable() : schema;
392+
if (nonNullableSchema.getType() == Schema.Type.ARRAY && nonNullableSchema.getComponentSchema() != null) {
393+
return nonNullableSchema.getComponentSchema().isNullable() ?
394+
nonNullableSchema.getComponentSchema().getNonNullable().getType() :
395+
nonNullableSchema.getComponentSchema().getType();
396+
}
397+
return nonNullableSchema.getType();
398+
}
399+
400+
private static List<Schema.Field> getEffectiveFields(Schema schema) {
401+
Schema nonNullableSchema = schema.isNullable() ? schema.getNonNullable() : schema;
402+
if (nonNullableSchema.getType() == Schema.Type.ARRAY && nonNullableSchema.getComponentSchema() != null) {
403+
return nonNullableSchema.getComponentSchema().isNullable() ?
404+
nonNullableSchema.getComponentSchema().getNonNullable().getFields() :
405+
nonNullableSchema.getComponentSchema().getFields();
406+
}
407+
return nonNullableSchema.getFields();
408+
}
409+
313410
}

src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySinkConfig.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public abstract class AbstractBigQuerySinkConfig extends BigQueryBaseConfig {
5151
private static final String NAME_GCS_CHUNK_SIZE = "gcsChunkSize";
5252
protected static final String NAME_UPDATE_SCHEMA = "allowSchemaRelaxation";
5353
private static final String SCHEME = "gs://";
54+
protected static final String NAME_JSON_STRING_FIELDS = "jsonStringFields";
5455

5556
@Name(Constants.Reference.REFERENCE_NAME)
5657
@Nullable
@@ -84,6 +85,12 @@ public abstract class AbstractBigQuerySinkConfig extends BigQueryBaseConfig {
8485
"This value is ignored if the dataset or temporary bucket already exist.")
8586
protected String location;
8687

88+
@Name(NAME_JSON_STRING_FIELDS)
89+
@Nullable
90+
@Description("Fields in input schema that should be treated as JSON strings. " +
91+
"The schema of these fields should be of type STRING.")
92+
protected String jsonStringFields;
93+
8794
public AbstractBigQuerySinkConfig(BigQueryConnectorConfig connection, String dataset, String cmekKey, String bucket) {
8895
super(connection, dataset, cmekKey, bucket);
8996
}
@@ -114,6 +121,11 @@ public String getGcsChunkSize() {
114121
return gcsChunkSize;
115122
}
116123

124+
@Nullable
125+
public String getJsonStringFields() {
126+
return jsonStringFields;
127+
}
128+
117129
public boolean isAllowSchemaRelaxation() {
118130
return allowSchemaRelaxation == null ? false : allowSchemaRelaxation;
119131
}

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryJsonConverter.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,21 @@
2323

2424
import java.io.IOException;
2525
import java.util.Objects;
26+
import java.util.Set;
2627
import javax.annotation.Nullable;
2728

2829
/**
2930
* BigQueryJsonConverter converts a {@link StructuredRecord} to {@link JsonObject}
3031
*/
3132
public class BigQueryJsonConverter extends RecordConverter<StructuredRecord, JsonObject> {
33+
private Set<String> jsonStringFieldsPaths;
34+
35+
public BigQueryJsonConverter() {
36+
}
37+
38+
public BigQueryJsonConverter(Set<String> jsonStringFieldsPaths) {
39+
this.jsonStringFieldsPaths = jsonStringFieldsPaths;
40+
}
3241

3342
@Override
3443
public JsonObject transform(StructuredRecord input, @Nullable Schema schema) throws IOException {
@@ -40,7 +49,7 @@ public JsonObject transform(StructuredRecord input, @Nullable Schema schema) thr
4049
continue;
4150
}
4251
BigQueryRecordToJson.write(writer, recordField.getName(), input.get(recordField.getName()),
43-
recordField.getSchema());
52+
recordField.getSchema(), jsonStringFieldsPaths);
4453
}
4554
writer.endObject();
4655
return writer.get().getAsJsonObject();

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryMultiSink.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.cdap.cdap.api.data.schema.Schema;
3030
import io.cdap.cdap.etl.api.FailureCollector;
3131
import io.cdap.cdap.etl.api.PipelineConfigurer;
32+
import io.cdap.cdap.etl.api.StageConfigurer;
3233
import io.cdap.cdap.etl.api.batch.BatchSink;
3334
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
3435
import io.cdap.cdap.etl.api.connector.Connector;
@@ -70,6 +71,15 @@ protected BigQueryMultiSinkConfig getConfig() {
7071
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
7172
config.validate(pipelineConfigurer.getStageConfigurer().getFailureCollector());
7273
super.configurePipeline(pipelineConfigurer);
74+
75+
StageConfigurer configurer = pipelineConfigurer.getStageConfigurer();
76+
FailureCollector collector = configurer.getFailureCollector();
77+
Schema inputSchema = configurer.getInputSchema();
78+
String jsonStringFields = config.getJsonStringFields();
79+
if (jsonStringFields != null && inputSchema != null) {
80+
validateJsonStringFields(inputSchema, jsonStringFields, collector);
81+
}
82+
collector.getOrThrowException();
7383
}
7484

7585
@Override
@@ -134,7 +144,7 @@ protected void configureOutputSchemas(BatchSinkContext context,
134144
outputName = sanitizeOutputName(outputName);
135145
initOutput(context, bigQuery, outputName,
136146
BigQueryUtil.getFQN(config.getDatasetProject(), config.getDataset(), tableName),
137-
tableName, tableSchema, bucket, context.getFailureCollector(), tableName);
147+
tableName, tableSchema, bucket, context.getFailureCollector(), tableName, table);
138148
} catch (IOException e) {
139149
collector.addFailure("Invalid schema: " + e.getMessage(), null);
140150
}

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,11 @@
8787
import java.util.Arrays;
8888
import java.util.Collections;
8989
import java.util.HashMap;
90+
import java.util.HashSet;
9091
import java.util.List;
9192
import java.util.Map;
9293
import java.util.Optional;
94+
import java.util.Set;
9395
import java.util.UUID;
9496
import java.util.concurrent.TimeUnit;
9597
import java.util.stream.Collectors;
@@ -121,9 +123,12 @@ public RecordWriter<StructuredRecord, NullWritable> getRecordWriter(TaskAttemptC
121123
io.cdap.cdap.api.data.schema.Schema schema)
122124
throws IOException, InterruptedException {
123125
Configuration configuration = taskAttemptContext.getConfiguration();
126+
String jsonStringFields = configuration.get(BigQueryConstants.CONFIG_JSON_STRING_FIELDS, null);
127+
Set<String> jsonFields = jsonStringFields == null ? Collections.emptySet() :
128+
new HashSet<>(Arrays.asList(jsonStringFields.split(",")));
124129
return new BigQueryRecordWriter(getDelegate(configuration).getRecordWriter(taskAttemptContext),
125130
BigQueryOutputConfiguration.getFileFormat(configuration),
126-
schema);
131+
schema, jsonFields);
127132
}
128133

129134
private io.cdap.cdap.api.data.schema.Schema getOutputSchema(Configuration configuration) throws IOException {

0 commit comments

Comments
 (0)