Skip to content

Commit 66c341f

Browse files
authored
Merge pull request #73 from cloudsufi/patch/code-refactor
Code Refactor
2 parents 2244d45 + ffc19ed commit 66c341f

File tree

12 files changed

+44
-40
lines changed

12 files changed

+44
-40
lines changed

src/main/java/io/cdap/plugin/google/common/GoogleAuthBaseConfig.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ public abstract class GoogleAuthBaseConfig extends PluginConfig {
156156
* @param collector the failure collector is provided
157157
* @return The ValidationResult
158158
*/
159-
public ValidationResult validate(FailureCollector collector) {
159+
public ValidationResult getValidationResult(FailureCollector collector) {
160160
IdUtils.validateReferenceName(referenceName, collector);
161161
checkIfDirectoryOrFileIdentifierExists(collector);
162162
ValidationResult validationResult = new ValidationResult();
@@ -171,7 +171,7 @@ public ValidationResult validate(FailureCollector collector) {
171171
propertiesAreValid = validateServiceAccount(collector);
172172
break;
173173
default:
174-
collector.addFailure(String.format("'%s' is not processed value.", authType.toString()), null)
174+
collector.addFailure(String.format("'%s' is not processed value.", authType), null)
175175
.withConfigProperty(AUTH_TYPE);
176176
return validationResult;
177177
}
@@ -240,7 +240,7 @@ private boolean validateServiceAccount(FailureCollector collector) {
240240
.withConfigProperty(NAME_SERVICE_ACCOUNT_JSON);
241241
}
242242
}
243-
return collector.getValidationFailures().size() == 0;
243+
return collector.getValidationFailures().isEmpty();
244244
}
245245

246246
private boolean isDirectoryOrFileAccessible(FailureCollector collector, GoogleDriveClient driveClient)

src/main/java/io/cdap/plugin/google/common/GoogleFilteringSourceConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,8 @@ public GoogleFilteringSourceConfig() {
8282
* @param collector the failure collector is provided
8383
* @return The ValidationResult
8484
*/
85-
public ValidationResult validate(FailureCollector collector) {
86-
ValidationResult validationResult = super.validate(collector);
85+
public ValidationResult getValidationResult(FailureCollector collector) {
86+
ValidationResult validationResult = super.getValidationResult(collector);
8787
if (validateModificationDateRange(collector)
8888
&& getModificationDateRangeType().equals(ModifiedDateRangeType.CUSTOM)) {
8989
if (checkPropertyIsSet(collector, startDate, START_DATE, START_DATE_LABEL)) {

src/main/java/io/cdap/plugin/google/drive/sink/GoogleDriveSinkConfig.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ public class GoogleDriveSinkConfig extends GoogleInputSchemaFieldsUsageConfig {
6060
* @param schema the schema to check compatibility
6161
*/
6262
public void validate(FailureCollector collector, Schema schema) {
63-
super.validate(collector);
63+
super.getValidationResult(collector);
64+
collector.getOrThrowException();
6465

6566
// validate body field is in schema and has valid format
6667
validateSchemaField(collector, schema, SCHEMA_BODY_FIELD_NAME, schemaBodyFieldName,

src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveInputFormat.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
import com.github.rholder.retry.RetryException;
2020
import com.google.api.services.drive.model.File;
21+
import io.cdap.cdap.api.data.format.StructuredRecord;
2122
import org.apache.hadoop.conf.Configuration;
23+
import org.apache.hadoop.io.NullWritable;
2224
import org.apache.hadoop.mapreduce.InputFormat;
2325
import org.apache.hadoop.mapreduce.InputSplit;
2426
import org.apache.hadoop.mapreduce.JobContext;
@@ -33,7 +35,7 @@
3335
/**
3436
* Input format class which generates splits for each query.
3537
*/
36-
public class GoogleDriveInputFormat extends InputFormat {
38+
public class GoogleDriveInputFormat extends InputFormat<NullWritable, StructuredRecord> {
3739

3840
@Override
3941
public List<InputSplit> getSplits(JobContext jobContext) throws IOException {
@@ -75,7 +77,8 @@ private GoogleDriveSplit getSplitWithUnlimitedPartitionSize(String fileId) {
7577
}
7678

7779
@Override
78-
public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) {
80+
public RecordReader<NullWritable, StructuredRecord> createRecordReader(InputSplit inputSplit,
81+
TaskAttemptContext taskAttemptContext) {
7982
return new GoogleDriveRecordReader();
8083
}
8184
}

src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveRecordReader.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package io.cdap.plugin.google.drive.source;
1818

1919
import com.github.rholder.retry.RetryException;
20-
import io.cdap.plugin.google.drive.common.FileFromFolder;
20+
import io.cdap.cdap.api.data.format.StructuredRecord;
2121
import org.apache.hadoop.conf.Configuration;
2222
import org.apache.hadoop.io.NullWritable;
2323
import org.apache.hadoop.mapreduce.InputSplit;
@@ -28,12 +28,13 @@
2828
import java.util.concurrent.ExecutionException;
2929

3030
/**
31-
* RecordReader implementation, which reads {@link FileFromFolder} wrappers from Google Drive using
31+
* RecordReader implementation, which reads {@link StructuredRecord} wrappers from Google Drive using
3232
* Google Drive API.
3333
*/
34-
public class GoogleDriveRecordReader extends RecordReader<NullWritable, FileFromFolder> {
34+
public class GoogleDriveRecordReader extends RecordReader<NullWritable, StructuredRecord> {
3535

3636
private GoogleDriveSourceClient googleDriveSourceClient;
37+
private GoogleDriveSourceConfig googleDriveSourceConfig;
3738
private String fileId;
3839
private long bytesFrom;
3940
private long bytesTo;
@@ -43,6 +44,7 @@ public class GoogleDriveRecordReader extends RecordReader<NullWritable, FileFrom
4344
@Override
4445
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException {
4546
Configuration conf = taskAttemptContext.getConfiguration();
47+
googleDriveSourceConfig = GoogleDriveInputFormatProvider.extractPropertiesFromConfig(conf);
4648
GoogleDriveSourceConfig googleDriveSourceConfig =
4749
GoogleDriveInputFormatProvider.extractPropertiesFromConfig(conf);
4850
googleDriveSourceClient = new GoogleDriveSourceClient(googleDriveSourceConfig);
@@ -62,18 +64,20 @@ public boolean nextKeyValue() {
6264

6365
@Override
6466
public NullWritable getCurrentKey() {
65-
return null;
67+
return NullWritable.get();
6668
}
6769

6870
@Override
69-
public FileFromFolder getCurrentValue() throws IOException {
71+
public StructuredRecord getCurrentValue() throws IOException {
7072
// read file and content
7173
isFileProcessed = true;
7274
try {
7375
if (isPartitioned) {
74-
return googleDriveSourceClient.getFilePartition(fileId, bytesFrom, bytesTo);
76+
return FilesFromFolderTransformer.transform(
77+
googleDriveSourceClient.getFilePartition(fileId, bytesFrom, bytesTo), googleDriveSourceConfig.getSchema());
7578
} else {
76-
return googleDriveSourceClient.getFile(fileId);
79+
return FilesFromFolderTransformer.transform(googleDriveSourceClient.getFile(fileId),
80+
googleDriveSourceConfig.getSchema());
7781
}
7882
} catch (ExecutionException | RetryException e) {
7983
throw new RuntimeException("Exception during file or file part reading.", e);
@@ -88,6 +92,6 @@ public float getProgress() {
8892

8993
@Override
9094
public void close() {
91-
95+
// no-op
9296
}
9397
}

src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveSource.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
@Plugin(type = BatchSource.PLUGIN_TYPE)
4242
@Name(GoogleDriveSource.NAME)
4343
@Description("Reads fileset from specified Google Drive directory.")
44-
public class GoogleDriveSource extends BatchSource<NullWritable, FileFromFolder, StructuredRecord> {
44+
public class GoogleDriveSource extends BatchSource<NullWritable, StructuredRecord, StructuredRecord> {
4545
public static final String NAME = "GoogleDrive";
4646

4747
private final GoogleDriveSourceConfig config;
@@ -53,7 +53,7 @@ public GoogleDriveSource(GoogleDriveSourceConfig config) {
5353
@Override
5454
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
5555
FailureCollector failureCollector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
56-
config.validate(failureCollector);
56+
config.getValidationResult(failureCollector);
5757
failureCollector.getOrThrowException();
5858

5959
pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema());
@@ -62,7 +62,7 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
6262
@Override
6363
public void prepareRun(BatchSourceContext context) {
6464
FailureCollector failureCollector = context.getFailureCollector();
65-
config.validate(failureCollector);
65+
config.getValidationResult(failureCollector);
6666
failureCollector.getOrThrowException();
6767

6868
LineageRecorder lineageRecorder = new LineageRecorder(context, config.getReferenceName());
@@ -74,9 +74,4 @@ public void prepareRun(BatchSourceContext context) {
7474

7575
context.setInput(Input.of(config.getReferenceName(), new GoogleDriveInputFormatProvider(config)));
7676
}
77-
78-
@Override
79-
public void transform(KeyValue<NullWritable, FileFromFolder> input, Emitter<StructuredRecord> emitter) {
80-
emitter.emit(FilesFromFolderTransformer.transform(input.getValue(), config.getSchema()));
81-
}
8277
}

src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveSourceConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,8 @@ public Schema getSchema() {
136136
* @param collector the failure collector is provided
137137
* @return The ValidationResult
138138
*/
139-
public ValidationResult validate(FailureCollector collector) {
140-
ValidationResult validationResult = super.validate(collector);
139+
public ValidationResult getValidationResult(FailureCollector collector) {
140+
ValidationResult validationResult = super.getValidationResult(collector);
141141

142142
validateFileTypesToPull(collector);
143143

src/main/java/io/cdap/plugin/google/sheets/sink/GoogleSheetsSinkConfig.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,8 @@ public class GoogleSheetsSinkConfig extends GoogleInputSchemaFieldsUsageConfig {
167167
* @param schema the schema to check compatibility
168168
*/
169169
public void validate(FailureCollector collector, Schema schema) {
170-
super.validate(collector);
170+
super.getValidationResult(collector);
171+
collector.getOrThrowException();
171172

172173
// validate spreadsheet name field is in schema and has valid format
173174
validateSchemaField(collector, schema, SCHEMA_SPREAD_SHEET_NAME_FIELD_NAME, schemaSpreadsheetNameFieldName,

src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public GoogleSheetsSource(GoogleSheetsSourceConfig config) {
5454
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
5555
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
5656
FailureCollector failureCollector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
57-
config.validate(failureCollector);
57+
config.getValidationResult(failureCollector);
5858
failureCollector.getOrThrowException();
5959

6060
Schema configuredSchema = config.getSchema(failureCollector);
@@ -64,7 +64,7 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
6464
@Override
6565
public void prepareRun(BatchSourceContext context) {
6666
FailureCollector failureCollector = context.getFailureCollector();
67-
config.validate(failureCollector);
67+
config.getValidationResult(failureCollector);
6868
failureCollector.getOrThrowException();
6969

7070
Schema configSchema = config.getSchema(failureCollector);

src/main/java/io/cdap/plugin/google/sheets/source/GoogleSheetsSourceConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,8 +318,8 @@ private boolean shouldGetSchema() {
318318
* @param collector the failure collector is provided
319319
* @return The ValidationResult
320320
*/
321-
public ValidationResult validate(FailureCollector collector) {
322-
ValidationResult validationResult = super.validate(collector);
321+
public ValidationResult getValidationResult(FailureCollector collector) {
322+
ValidationResult validationResult = super.getValidationResult(collector);
323323

324324
// reset current headers info
325325
dataSchemaInfo = new LinkedHashMap<>();

0 commit comments

Comments
 (0)