Skip to content

Commit 7d6a0dc

Browse files
Merge pull request #1297 from data-integrations/PLUGIN-1647-cherrypick
[🍒][PLUGIN-1647] upgrade gcs hadoop2 connector to 2.2.9
2 parents 641858c + 9f678f6 commit 7d6a0dc

File tree

11 files changed

+233
-44
lines changed

11 files changed

+233
-44
lines changed

pom.xml

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
<groupId>io.cdap.plugin</groupId>
2222
<artifactId>google-cloud</artifactId>
23-
<version>0.22.2</version>
23+
<version>0.22.3-SNAPSHOT</version>
2424
<name>Google Cloud Plugins</name>
2525
<packaging>jar</packaging>
2626
<description>Plugins for Google Big Query</description>
@@ -70,13 +70,13 @@
7070
<jee.version>7</jee.version>
7171
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
7272
<avro.version>1.8.2</avro.version>
73-
<bigquery.connector.hadoop2.version>hadoop2-1.0.0</bigquery.connector.hadoop2.version>
73+
<bigquery.connector.hadoop2.version>hadoop2-1.2.0</bigquery.connector.hadoop2.version>
7474
<commons.codec.version>1.4</commons.codec.version>
7575
<cdap.version>6.9.1</cdap.version>
7676
<cdap.plugin.version>2.11.0</cdap.plugin.version>
7777
<dropwizard.metrics-core.version>3.2.6</dropwizard.metrics-core.version>
78-
<flogger.system.backend.version>0.3.1</flogger.system.backend.version>
79-
<gcs.connector.version>hadoop2-2.0.0</gcs.connector.version>
78+
<flogger.system.backend.version>0.7.1</flogger.system.backend.version>
79+
<gcs.connector.version>hadoop2-2.2.9</gcs.connector.version>
8080
<google.cloud.bigtable.version>1.17.1</google.cloud.bigtable.version>
8181
<google.cloud.bigquery.version>1.137.1</google.cloud.bigquery.version>
8282
<google.cloud.kms.version>2.0.2</google.cloud.kms.version>
@@ -341,6 +341,22 @@
341341
<groupId>com.google.flogger</groupId>
342342
<artifactId>flogger-log4j-backend</artifactId>
343343
</exclusion>
344+
<exclusion>
345+
<artifactId>guava</artifactId>
346+
<groupId>com.google.guava</groupId>
347+
</exclusion>
348+
<exclusion>
349+
<groupId>com.google.flogger</groupId>
350+
<artifactId>flogger</artifactId>
351+
</exclusion>
352+
<exclusion>
353+
<groupId>com.google.cloud.bigdataoss</groupId>
354+
<artifactId>util-hadoop</artifactId>
355+
</exclusion>
356+
<exclusion>
357+
<groupId>com.google.cloud.bigdataoss</groupId>
358+
<artifactId>util</artifactId>
359+
</exclusion>
344360
</exclusions>
345361
</dependency>
346362
<dependency>
@@ -557,12 +573,42 @@
557573
<groupId>com.google.cloud.bigdataoss</groupId>
558574
<artifactId>util-hadoop</artifactId>
559575
<version>${gcs.connector.version}</version>
576+
<exclusions>
577+
<exclusion>
578+
<groupId>com.google.flogger</groupId>
579+
<artifactId>flogger</artifactId>
580+
</exclusion>
581+
<exclusion>
582+
<groupId>com.google.flogger</groupId>
583+
<artifactId>flogger-log4j-backend</artifactId>
584+
</exclusion>
585+
<exclusion>
586+
<artifactId>guava</artifactId>
587+
<groupId>com.google.guava</groupId>
588+
</exclusion>
589+
</exclusions>
560590
</dependency>
561591
<dependency>
562592
<groupId>com.google.cloud.bigdataoss</groupId>
563593
<artifactId>gcs-connector</artifactId>
564594
<version>${gcs.connector.version}</version>
565595
<exclusions>
596+
<exclusion>
597+
<artifactId>grpc-api</artifactId>
598+
<groupId>io.grpc</groupId>
599+
</exclusion>
600+
<exclusion>
601+
<artifactId>grpc-census</artifactId>
602+
<groupId>io.grpc</groupId>
603+
</exclusion>
604+
<exclusion>
605+
<artifactId>guava</artifactId>
606+
<groupId>com.google.guava</groupId>
607+
</exclusion>
608+
<exclusion>
609+
<groupId>com.google.flogger</groupId>
610+
<artifactId>flogger</artifactId>
611+
</exclusion>
566612
<exclusion>
567613
<groupId>com.google.flogger</groupId>
568614
<artifactId>flogger-log4j-backend</artifactId>

src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ private Configuration getBaseConfiguration(@Nullable CryptoKeyName cmekKeyName)
227227
cmekKeyName, config.getServiceAccountType());
228228
baseConfiguration.setBoolean(BigQueryConstants.CONFIG_ALLOW_SCHEMA_RELAXATION,
229229
config.isAllowSchemaRelaxation());
230-
baseConfiguration.setStrings(BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION_KEY,
230+
baseConfiguration.setStrings(BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION.getKey(),
231231
config.getWriteDisposition().name());
232232
// this setting is needed because gcs has default chunk size of 64MB. This is large default chunk size which can
233233
// cause OOM issue if there are many tables being written. See this - CDAP-16670

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java

Lines changed: 53 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -62,16 +62,21 @@
6262
import com.google.common.base.Strings;
6363
import com.google.common.collect.Lists;
6464
import io.cdap.cdap.api.data.format.StructuredRecord;
65+
import io.cdap.plugin.gcp.bigquery.source.BigQueryFactoryWithScopes;
6566
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
6667
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
6768
import io.cdap.plugin.gcp.common.GCPUtils;
6869
import org.apache.hadoop.conf.Configuration;
70+
import org.apache.hadoop.fs.FileSystem;
71+
import org.apache.hadoop.fs.Path;
6972
import org.apache.hadoop.io.NullWritable;
73+
import org.apache.hadoop.mapred.FileAlreadyExistsException;
7074
import org.apache.hadoop.mapreduce.JobContext;
7175
import org.apache.hadoop.mapreduce.JobStatus;
7276
import org.apache.hadoop.mapreduce.OutputCommitter;
7377
import org.apache.hadoop.mapreduce.RecordWriter;
7478
import org.apache.hadoop.mapreduce.TaskAttemptContext;
79+
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
7580
import org.apache.hadoop.util.Progressable;
7681
import org.slf4j.Logger;
7782
import org.slf4j.LoggerFactory;
@@ -136,6 +141,44 @@ public OutputCommitter createCommitter(TaskAttemptContext context) throws IOExce
136141
return new BigQueryOutputCommitter(context, delegateCommitter);
137142
}
138143

144+
/**
145+
* This method is copied from
146+
* {@link ForwardingBigQueryFileOutputFormat#checkOutputSpecs(JobContext)} to override
147+
* {@link BigQueryFactory} with {@link BigQueryFactoryWithScopes}.
148+
*/
149+
@Override
150+
public void checkOutputSpecs(JobContext job) throws FileAlreadyExistsException, IOException {
151+
Configuration conf = job.getConfiguration();
152+
153+
// Validate the output configuration.
154+
BigQueryOutputConfiguration.validateConfiguration(conf);
155+
156+
// Get the output path.
157+
Path outputPath = BigQueryOutputConfiguration.getGcsOutputPath(conf);
158+
LOG.info("Using output path '%s'.", outputPath);
159+
160+
// Error if the output path already exists.
161+
FileSystem outputFileSystem = outputPath.getFileSystem(conf);
162+
if (outputFileSystem.exists(outputPath)) {
163+
throw new IOException("The output path '" + outputPath + "' already exists.");
164+
}
165+
166+
// Error if compression is set as there's mixed support in BigQuery.
167+
if (FileOutputFormat.getCompressOutput(job)) {
168+
throw new IOException("Compression isn't supported for this OutputFormat.");
169+
}
170+
171+
// Error if unable to create a BigQuery helper.
172+
try {
173+
new BigQueryFactoryWithScopes(GCPUtils.BIGQUERY_SCOPES).getBigQueryHelper(conf);
174+
} catch (GeneralSecurityException gse) {
175+
throw new IOException("Failed to create BigQuery client", gse);
176+
}
177+
178+
// Let delegate process its checks.
179+
getDelegate(conf).checkOutputSpecs(job);
180+
}
181+
139182
/**
140183
* BigQuery Output committer.
141184
*/
@@ -157,7 +200,7 @@ public static class BigQueryOutputCommitter extends ForwardingBigQueryFileOutput
157200
BigQueryOutputCommitter(TaskAttemptContext context, OutputCommitter delegate) throws IOException {
158201
super(context, delegate);
159202
try {
160-
BigQueryFactory bigQueryFactory = new BigQueryFactory();
203+
BigQueryFactory bigQueryFactory = new BigQueryFactoryWithScopes(GCPUtils.BIGQUERY_SCOPES);
161204
this.bigQueryHelper = bigQueryFactory.getBigQueryHelper(context.getConfiguration());
162205
} catch (GeneralSecurityException e) {
163206
throw new IOException("Failed to create Bigquery client.", e);
@@ -487,7 +530,7 @@ private static void waitForJobCompletion(BigQueryHelper bigQueryHelper, String p
487530
.setLocation(jobReference.getLocation());
488531

489532
Job pollJob = ResilientOperation.retry(
490-
ResilientOperation.getGoogleRequestCallable(get),
533+
get::execute,
491534
operationBackOff,
492535
RetryDeterminer.RATE_LIMIT_ERRORS,
493536
IOException.class,
@@ -546,10 +589,10 @@ private static void waitForJobCompletion(BigQueryHelper bigQueryHelper, String p
546589
private static TableReference getTableReference(Configuration conf) throws IOException {
547590
// Ensure the BigQuery output information is valid.
548591
String projectId = BigQueryOutputConfiguration.getProjectId(conf);
549-
String datasetId =
550-
ConfigurationUtil.getMandatoryConfig(conf, BigQueryConfiguration.OUTPUT_DATASET_ID_KEY);
551-
String tableId =
552-
ConfigurationUtil.getMandatoryConfig(conf, BigQueryConfiguration.OUTPUT_TABLE_ID_KEY);
592+
String datasetId = ConfigurationUtil.getMandatoryConfig(conf,
593+
BigQueryConfiguration.OUTPUT_DATASET_ID);
594+
String tableId = ConfigurationUtil.getMandatoryConfig(conf,
595+
BigQueryConfiguration.OUTPUT_TABLE_ID);
553596

554597
return new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId);
555598
}
@@ -559,14 +602,14 @@ private static TableReference getTableReference(Configuration conf) throws IOExc
559602
* Optional<TableSchema> instead of Optional<BigQueryTableSchema>.
560603
*/
561604
private static Optional<TableSchema> getTableSchema(Configuration conf) throws IOException {
562-
String fieldsJson = conf.get(BigQueryConfiguration.OUTPUT_TABLE_SCHEMA_KEY);
605+
String fieldsJson = conf.get(BigQueryConfiguration.OUTPUT_TABLE_SCHEMA.getKey());
563606
if (!Strings.isNullOrEmpty(fieldsJson)) {
564607
try {
565608
TableSchema tableSchema = createTableSchemaFromFields(fieldsJson);
566609
return Optional.of(tableSchema);
567610
} catch (IOException e) {
568611
throw new IOException(
569-
"Unable to parse key '" + BigQueryConfiguration.OUTPUT_TABLE_SCHEMA_KEY + "'.", e);
612+
"Unable to parse key '" + BigQueryConfiguration.OUTPUT_TABLE_SCHEMA.getKey() + "'.", e);
570613
}
571614
}
572615
return Optional.empty();
@@ -748,7 +791,8 @@ private RangePartitioning createRangePartitioning(@Nullable String partitionByFi
748791
}
749792

750793
private static BigQuery getBigQuery(Configuration config) throws IOException {
751-
String projectId = ConfigurationUtil.getMandatoryConfig(config, BigQueryConfiguration.PROJECT_ID_KEY);
794+
String projectId = ConfigurationUtil.getMandatoryConfig(config,
795+
BigQueryConfiguration.PROJECT_ID);
752796
Credentials credentials = GCPUtils.loadCredentialsFromConf(config);
753797
return GCPUtils.getBigQuery(projectId, credentials);
754798
}

src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQueryFactoryWithScopes.java

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717
package io.cdap.plugin.gcp.bigquery.source;
1818

1919
import com.google.api.client.auth.oauth2.Credential;
20+
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
2021
import com.google.cloud.hadoop.io.bigquery.BigQueryFactory;
21-
import com.google.cloud.hadoop.util.AccessTokenProviderClassFromConfigFactory;
22+
import com.google.cloud.hadoop.util.AccessTokenProvider;
2223
import com.google.cloud.hadoop.util.CredentialFromAccessTokenProviderClassFactory;
2324
import com.google.cloud.hadoop.util.HadoopCredentialConfiguration;
25+
import com.google.common.collect.ImmutableList;
26+
import io.cdap.plugin.gcp.gcs.ServiceAccountAccessTokenProvider;
2427
import org.apache.hadoop.conf.Configuration;
2528

2629
import java.io.IOException;
@@ -41,18 +44,27 @@ public BigQueryFactoryWithScopes(List<String> scopes) {
4144
@Override
4245
public Credential createBigQueryCredential(Configuration config) throws GeneralSecurityException, IOException {
4346
Credential credential =
44-
CredentialFromAccessTokenProviderClassFactory.credential(
45-
new AccessTokenProviderClassFromConfigFactory().withOverridePrefix("mapred.bq"),
46-
config,
47-
scopes);
47+
CredentialFromAccessTokenProviderClassFactory.credential(getAccessTokenProvider(config),
48+
scopes);
4849
if (credential != null) {
4950
return credential;
5051
}
5152

52-
return HadoopCredentialConfiguration.newBuilder()
53-
.withConfiguration(config)
54-
.withOverridePrefix(BIGQUERY_CONFIG_PREFIX)
55-
.build()
56-
.getCredential(scopes);
53+
return HadoopCredentialConfiguration.getCredentialFactory(
54+
config, String.valueOf(ImmutableList.of(BigQueryConfiguration.BIGQUERY_CONFIG_PREFIX)))
55+
.getCredential(scopes);
56+
}
57+
58+
/**
59+
* returns the {@link AccessTokenProvider} that uses the newer GoogleCredentials
60+
* library to get the credentials.
61+
*
62+
* @param config Hadoop {@link Configuration}
63+
* @return {@link ServiceAccountAccessTokenProvider}
64+
*/
65+
private AccessTokenProvider getAccessTokenProvider(Configuration config) {
66+
AccessTokenProvider accessTokenProvider = new ServiceAccountAccessTokenProvider();
67+
accessTokenProvider.setConf(config);
68+
return accessTokenProvider;
5769
}
5870
}

src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.google.cloud.hadoop.io.bigquery.BigQueryUtils;
1919
import com.google.cloud.hadoop.io.bigquery.ExportFileFormat;
2020
import com.google.cloud.hadoop.util.ConfigurationUtil;
21+
import com.google.cloud.hadoop.util.HadoopConfigurationProperty;
2122
import com.google.common.annotations.VisibleForTesting;
2223
import com.google.common.base.Preconditions;
2324
import com.google.common.base.Strings;
@@ -36,6 +37,7 @@
3637

3738
import java.io.IOException;
3839
import java.security.GeneralSecurityException;
40+
import java.util.ArrayList;
3941
import java.util.List;
4042
import java.util.Map;
4143
import java.util.Objects;
@@ -95,14 +97,17 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc
9597
} catch (GeneralSecurityException gse) {
9698
throw new IOException("Failed to create BigQuery client", gse);
9799
}
100+
101+
List<HadoopConfigurationProperty<?>> hadoopConfigurationProperties = new ArrayList<>(
102+
BigQueryConfiguration.MANDATORY_CONFIG_PROPERTIES_INPUT);
98103
Map<String, String> mandatoryConfig = ConfigurationUtil.getMandatoryConfig(
99-
configuration, BigQueryConfiguration.MANDATORY_CONFIG_PROPERTIES_INPUT);
100-
String projectId = mandatoryConfig.get(BigQueryConfiguration.PROJECT_ID_KEY);
101-
String datasetProjectId = mandatoryConfig.get(BigQueryConfiguration.INPUT_PROJECT_ID_KEY);
102-
String datasetId = mandatoryConfig.get(BigQueryConfiguration.INPUT_DATASET_ID_KEY);
103-
String tableName = mandatoryConfig.get(BigQueryConfiguration.INPUT_TABLE_ID_KEY);
104+
configuration, hadoopConfigurationProperties);
105+
String projectId = mandatoryConfig.get(BigQueryConfiguration.PROJECT_ID.getKey());
106+
String datasetProjectId = mandatoryConfig.get(BigQueryConfiguration.INPUT_PROJECT_ID.getKey());
107+
String datasetId = mandatoryConfig.get(BigQueryConfiguration.INPUT_DATASET_ID.getKey());
108+
String tableName = mandatoryConfig.get(BigQueryConfiguration.INPUT_TABLE_ID.getKey());
104109
String serviceAccount = configuration.get(BigQueryConstants.CONFIG_SERVICE_ACCOUNT, null);
105-
Boolean isServiceAccountFilePath = configuration.getBoolean(BigQueryConstants.CONFIG_SERVICE_ACCOUNT_IS_FILE,
110+
boolean isServiceAccountFilePath = configuration.getBoolean(BigQueryConstants.CONFIG_SERVICE_ACCOUNT_IS_FILE,
106111
true);
107112

108113
String partitionFromDate = configuration.get(BigQueryConstants.CONFIG_PARTITION_FROM_DATE, null);
@@ -131,11 +136,11 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc
131136
runQuery(configuration, bigQueryHelper, projectId, exportTableReference, query, location);
132137

133138
// Default values come from BigquerySource config, and can be overridden by config.
134-
configuration.set(BigQueryConfiguration.INPUT_PROJECT_ID_KEY,
139+
configuration.set(BigQueryConfiguration.INPUT_PROJECT_ID.getKey(),
135140
configuration.get(BigQueryConstants.CONFIG_VIEW_MATERIALIZATION_PROJECT));
136-
configuration.set(BigQueryConfiguration.INPUT_DATASET_ID_KEY,
141+
configuration.set(BigQueryConfiguration.INPUT_DATASET_ID.getKey(),
137142
configuration.get(BigQueryConstants.CONFIG_VIEW_MATERIALIZATION_DATASET));
138-
configuration.set(BigQueryConfiguration.INPUT_TABLE_ID_KEY, temporaryTableName);
143+
configuration.set(BigQueryConfiguration.INPUT_TABLE_ID.getKey(), temporaryTableName);
139144
}
140145
}
141146

src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,9 +174,10 @@ public static Configuration getBigQueryConfig(@Nullable String serviceAccountInf
174174
configuration.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS");
175175
configuration.set("fs.gs.project.id", projectId);
176176
configuration.set("fs.gs.working.dir", GCSPath.ROOT_DIR);
177-
configuration.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);
177+
configuration.set(BigQueryConfiguration.PROJECT_ID.getKey(), projectId);
178178
if (cmekKeyName != null) {
179-
configuration.set(BigQueryConfiguration.OUTPUT_TABLE_KMS_KEY_NAME_KEY, cmekKeyName.toString());
179+
configuration.set(BigQueryConfiguration.OUTPUT_TABLE_KMS_KEY_NAME.getKey(),
180+
cmekKeyName.toString());
180181
}
181182
return configuration;
182183
}

src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626
import com.google.cloud.bigquery.BigQueryOptions;
2727
import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS;
2828
import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
29-
import com.google.cloud.hadoop.util.AccessTokenProviderClassFromConfigFactory;
3029
import com.google.cloud.hadoop.util.CredentialFactory;
30+
import com.google.cloud.hadoop.util.HadoopCredentialConfiguration;
3131
import com.google.cloud.kms.v1.CryptoKeyName;
3232
import com.google.cloud.storage.BucketInfo;
3333
import com.google.cloud.storage.Storage;
@@ -129,7 +129,7 @@ public static InputStream openServiceAccount(String serviceAccount, boolean isFi
129129
*/
130130
public static Map<String, String> generateGCSAuthProperties(@Nullable String serviceAccount,
131131
String serviceAccountType) {
132-
return generateAuthProperties(serviceAccount, serviceAccountType, CredentialFactory.GCS_SCOPES, GCS_PREFIX);
132+
return generateAuthProperties(serviceAccount, serviceAccountType, CredentialFactory.DEFAULT_SCOPES, GCS_PREFIX);
133133
}
134134

135135
/**
@@ -142,7 +142,7 @@ public static Map<String, String> generateGCSAuthProperties(@Nullable String ser
142142
*/
143143
public static Map<String, String> generateBigQueryAuthProperties(@Nullable String serviceAccount,
144144
String serviceAccountType) {
145-
List<String> scopes = new ArrayList<>(CredentialFactory.GCS_SCOPES);
145+
List<String> scopes = new ArrayList<>(CredentialFactory.DEFAULT_SCOPES);
146146
scopes.addAll(BIGQUERY_SCOPES);
147147
return generateAuthProperties(serviceAccount, serviceAccountType, scopes, GCS_PREFIX, BQ_PREFIX);
148148
}
@@ -172,12 +172,13 @@ private static Map<String, String> generateAuthProperties(@Nullable String servi
172172
// AccessTokenProviderClassFromConfigFactory will by default look for
173173
// google.cloud.auth.access.token.provider.impl
174174
// but can be configured to also look for the conf with other prefixes like
175-
// gs.fs.auth.access.token.provider.impl
175+
// fs.gs.auth.access.token.provider.impl
176176
// mapred.bq.auth.access.token.provider.impl
177177
// for use by GCS and BQ.
178178
for (String prefix : prefixes) {
179-
properties.put(prefix + AccessTokenProviderClassFromConfigFactory.ACCESS_TOKEN_PROVIDER_IMPL_SUFFIX,
180-
ServiceAccountAccessTokenProvider.class.getName());
179+
properties.put(
180+
prefix + HadoopCredentialConfiguration.ACCESS_TOKEN_PROVIDER_IMPL_SUFFIX.getKey(),
181+
ServiceAccountAccessTokenProvider.class.getName());
181182
}
182183
return properties;
183184
}

0 commit comments

Comments
 (0)