Skip to content

Commit 77d1435

Browse files
psainicsvikasrathee-cs
authored andcommitted
Allow flexible column names
1 parent eddc342 commit 77d1435

21 files changed

+1478
-287
lines changed

docs/BigQueryMultiTable-batchsink.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,3 +255,9 @@ GET https://www.googleapis.com/bigquery/v2/projects/xxxx/datasets/mysql_bq_perm?
255255
have the permission to read the dataset you specified in this plugin. You must grant "BigQuery Data Editor" role on the
256256
project identified by the `Dataset Project ID` you specified in this plugin to the service account. If you think you
257257
already granted the role, check if you granted the role on the wrong project (for example the one identified by the `Project ID`).
258+
259+
Column Names
260+
------------
261+
A column name can contain the letters (a-z, A-Z), numbers (0-9), or underscores (_), and it must start with a letter or
262+
underscore. For more flexible column name support, see
263+
[flexible column names](https://cloud.google.com/bigquery/docs/schemas#flexible-column-names).

docs/BigQueryTable-batchsink.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,3 +298,9 @@ GET https://www.googleapis.com/bigquery/v2/projects/xxxx/datasets/mysql_bq_perm?
298298
have the permission to read the dataset you specified in this plugin. You must grant "BigQuery Data Editor" role on the
299299
project identified by the `Dataset Project ID` you specified in this plugin to the service account. If you think you
300300
already granted the role, check if you granted the role on the wrong project (for example the one identified by the `Project ID`).
301+
302+
Column Names
303+
------------
304+
A column name can contain the letters (a-z, A-Z), numbers (0-9), or underscores (_), and it must start with a letter or
305+
underscore. For more flexible column name support, see
306+
[flexible column names](https://cloud.google.com/bigquery/docs/schemas#flexible-column-names).

src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.google.cloud.bigquery.DatasetId;
2222
import com.google.cloud.bigquery.Table;
2323
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
24-
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema;
2524
import com.google.cloud.kms.v1.CryptoKeyName;
2625
import com.google.cloud.storage.Bucket;
2726
import com.google.cloud.storage.Storage;
@@ -36,12 +35,12 @@
3635
import io.cdap.cdap.etl.api.batch.BatchSink;
3736
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
3837
import io.cdap.plugin.common.Asset;
38+
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema;
3939
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
4040
import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize;
4141
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
4242
import io.cdap.plugin.gcp.common.CmekUtils;
4343
import io.cdap.plugin.gcp.common.GCPUtils;
44-
4544
import org.apache.hadoop.conf.Configuration;
4645
import org.apache.hadoop.io.NullWritable;
4746
import org.slf4j.Logger;
@@ -55,7 +54,6 @@
5554
import java.util.Set;
5655
import java.util.UUID;
5756
import java.util.stream.Collectors;
58-
5957
import javax.annotation.Nullable;
6058

6159
/**

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import com.google.cloud.hadoop.io.bigquery.BigQueryFactory;
5252
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat;
5353
import com.google.cloud.hadoop.io.bigquery.BigQueryHelper;
54-
import com.google.cloud.hadoop.io.bigquery.BigQueryStrings;
5554
import com.google.cloud.hadoop.io.bigquery.BigQueryUtils;
5655
import com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration;
5756
import com.google.cloud.hadoop.io.bigquery.output.ForwardingBigQueryFileOutputCommitter;
@@ -62,6 +61,7 @@
6261
import com.google.common.base.Strings;
6362
import com.google.common.collect.Lists;
6463
import io.cdap.cdap.api.data.format.StructuredRecord;
64+
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryStrings;
6565
import io.cdap.plugin.gcp.bigquery.source.BigQueryFactoryWithScopes;
6666
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
6767
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import com.google.cloud.bigquery.JobStatistics;
2626
import com.google.cloud.bigquery.Table;
2727
import com.google.cloud.bigquery.TimePartitioning;
28-
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema;
2928
import com.google.common.collect.ImmutableMap;
3029
import com.google.gson.Gson;
3130
import io.cdap.cdap.api.annotation.Description;
@@ -44,6 +43,7 @@
4443
import io.cdap.cdap.etl.api.engine.sql.SQLEngineOutput;
4544
import io.cdap.cdap.etl.common.Constants;
4645
import io.cdap.plugin.gcp.bigquery.connector.BigQueryConnector;
46+
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema;
4747
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQuerySQLEngine;
4848
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQueryWrite;
4949
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ public final class BigQuerySinkConfig extends AbstractBigQuerySinkConfig {
5858
private static final String WHERE = "WHERE";
5959
public static final Set<Schema.Type> SUPPORTED_CLUSTERING_TYPES =
6060
ImmutableSet.of(Schema.Type.INT, Schema.Type.LONG, Schema.Type.STRING, Schema.Type.BOOLEAN, Schema.Type.BYTES);
61-
private static final Pattern FIELD_PATTERN = Pattern.compile("[a-zA-Z0-9_]+");
61+
// Read More : https://cloud.google.com/bigquery/docs/schemas#flexible-column-names
62+
private static final Pattern FIELD_PATTERN = Pattern.compile("[\\p{L}\\p{M}\\p{N}\\p{Pc}\\p{Pd}&%+=:'<>#| ]+");
6263

6364
public static final String NAME_TABLE = "table";
6465
public static final String NAME_SCHEMA = "schema";
@@ -75,6 +76,8 @@ public final class BigQuerySinkConfig extends AbstractBigQuerySinkConfig {
7576
public static final String NAME_RANGE_INTERVAL = "rangeInterval";
7677

7778
public static final int MAX_NUMBER_OF_COLUMNS = 4;
79+
// As defined in https://cloud.google.com/bigquery/docs/schemas#column_names
80+
private static final int MAX_LENGTH_OF_COLUMN_NAME = 300;
7881

7982
@Name(NAME_TABLE)
8083
@Macro
@@ -345,9 +348,18 @@ public void validate(@Nullable Schema inputSchema, @Nullable Schema outputSchema
345348
String name = field.getName();
346349
// BigQuery column names only allow alphanumeric characters and _
347350
// https://cloud.google.com/bigquery/docs/schemas#column_names
351+
// Allow support for Flexible column names
352+
// https://cloud.google.com/bigquery/docs/schemas#flexible-column-names
348353
if (!FIELD_PATTERN.matcher(name).matches()) {
349-
collector.addFailure(String.format("Output field '%s' must only contain alphanumeric characters and '_'.",
350-
name), null).withOutputSchemaField(name);
354+
collector.addFailure(String.format("Output field '%s' contains invalid characters. " +
355+
"Check column names docs for more details.",
356+
name), null).withOutputSchemaField(name);
357+
}
358+
359+
// Check if the field name exceeds the maximum length of 300 characters.
360+
if (name.length() > MAX_LENGTH_OF_COLUMN_NAME) {
361+
collector.addFailure(String.format("Output field '%s' exceeds the maximum length of 300 characters.",
362+
name), null).withOutputSchemaField(name);
351363
}
352364

353365
// check if the required fields are present in the input schema.

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtils.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,6 @@
2828
import com.google.cloud.bigquery.Table;
2929
import com.google.cloud.bigquery.TableId;
3030
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat;
31-
import com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration;
32-
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema;
33-
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableSchema;
3431
import com.google.cloud.kms.v1.CryptoKeyName;
3532
import com.google.cloud.storage.Bucket;
3633
import com.google.cloud.storage.Storage;
@@ -43,6 +40,9 @@
4340
import io.cdap.cdap.etl.api.validation.ValidationFailure;
4441
import io.cdap.plugin.common.Asset;
4542
import io.cdap.plugin.common.LineageRecorder;
43+
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryOutputConfiguration;
44+
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema;
45+
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableSchema;
4646
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
4747
import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize.Numeric;
4848
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
@@ -62,6 +62,7 @@
6262
import java.util.Objects;
6363
import java.util.Set;
6464
import java.util.function.Supplier;
65+
import java.util.regex.Pattern;
6566
import java.util.stream.Collectors;
6667
import javax.annotation.Nullable;
6768

@@ -611,6 +612,13 @@ private static BigQueryFileFormat getFileFormat(List<BigQueryTableFieldSchema> f
611612
if (DATETIME.equals(field.getType())) {
612613
return BigQueryFileFormat.NEWLINE_DELIMITED_JSON;
613614
}
615+
// If the field name is not in english characters, then we will use json format
616+
// We do this as the avro load job in BQ does not support non-english characters in field names for now
617+
String fieldName = field.getName();
618+
final String englishCharactersRegex = "[\\w]+";
619+
if (!Pattern.matches(englishCharactersRegex, fieldName)) {
620+
return BigQueryFileFormat.NEWLINE_DELIMITED_JSON;
621+
}
614622
// If the field is a record we have to check its subfields.
615623
if (RECORD.equals(field.getType())) {
616624
if (getFileFormat(field.getFields()) == BigQueryFileFormat.NEWLINE_DELIMITED_JSON) {

src/main/java/io/cdap/plugin/gcp/bigquery/sink/DelegatingMultiSinkOutputCommitter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
package io.cdap.plugin.gcp.bigquery.sink;
1818

1919
import com.google.cloud.bigquery.DatasetId;
20-
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema;
2120
import io.cdap.cdap.api.data.schema.Schema;
21+
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema;
2222
import org.apache.hadoop.mapreduce.JobContext;
2323
import org.apache.hadoop.mapreduce.JobStatus;
2424
import org.apache.hadoop.mapreduce.OutputCommitter;

src/main/java/io/cdap/plugin/gcp/bigquery/sink/DelegatingMultiSinkRecordWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
package io.cdap.plugin.gcp.bigquery.sink;
1818

1919
import com.google.cloud.bigquery.DatasetId;
20-
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema;
2120
import io.cdap.cdap.api.data.format.StructuredRecord;
2221
import io.cdap.cdap.api.data.schema.Schema;
22+
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema;
2323
import org.apache.hadoop.io.NullWritable;
2424
import org.apache.hadoop.mapreduce.OutputCommitter;
2525
import org.apache.hadoop.mapreduce.RecordWriter;

0 commit comments

Comments
 (0)