Skip to content

Commit a09cada

Browse files
committed
Add GDriveFS
1 parent 2244d45 commit a09cada

21 files changed

+1635
-168
lines changed

docs/GoogleDrive-batchsource.md

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,57 @@ Then the File Identifier would be `17W3vOhBwe0i24OdVNsbz8rAMClzUitKeAbumTqWFrkow
3333
They will be a part of output structured record. Descriptions for properties can be view at
3434
[Drive API file reference](https://developers.google.com/drive/api/v3/reference/files).
3535

36+
**Structured Schema Required :** Set this property to `true` if the output should be structured records.
37+
Using structured records allows you to use read files with formats like avro, csv, json, text, xls, etc.
38+
39+
**Format:** Format of the data to read.
40+
The format must be one of 'avro', 'blob', 'csv', 'delimited', 'json', 'parquet', 'text', 'tsv', 'xls', or the
41+
name of any format plugin that you have deployed to your environment.
42+
If the format is a macro, only the pre-packaged formats can be used.
43+
If the format is 'blob', every input file will be read into a separate record.
44+
The 'blob' format also requires a schema that contains a field named 'body' of type 'bytes'.
45+
If the format is 'text', the schema must contain a field named 'body' of type 'string'.
46+
47+
**Get Schema:** Auto-detects schema from file. Supported formats are: avro, parquet, csv, delimited, tsv, blob, xls
48+
and text.
49+
50+
Blob - is set by default as field named 'body' of type bytes.
51+
52+
Text - is set by default as two fields: 'body' of type bytes and 'offset' of type 'long'.
53+
54+
JSON - is not supported, user has to manually provide the output schema.
55+
56+
Parquet - If the path is a directory, the plugin will look for files ending in '.parquet' to read the schema from.
57+
If no such file can be found, an error will be returned.
58+
59+
Avro - If the path is a directory, the plugin will look for files ending in '.avro' to read the schema from.
60+
If no such file can be found, an error will be returned.
61+
62+
**Sample Size:** The maximum number of rows that will get investigated for automatic data type detection.
63+
The default value is 1000. This is used when the format is `xls`, `csv`, `tsv`, `delimited`.
64+
65+
**Override:** A list of columns with the corresponding data types for whom the automatic data type detection gets
66+
skipped. This is used when the format is `xls`, `csv`, `tsv`, `delimited`.
67+
68+
**Delimiter:** Delimiter to use when the format is 'delimited'. This will be ignored for other formats.
69+
70+
**Enable Quoted Values** Whether to treat content between quotes as a value. This value will only be used if the format
71+
is 'csv', 'tsv' or 'delimited'. For example, if this is set to true, a line that looks like `1, "a, b, c"` will output two fields.
72+
The first field will have `1` as its value and the second will have `a, b, c` as its value. The quote characters will be trimmed.
73+
The newline delimiter cannot be within quotes.
74+
75+
It also assumes the quotes are well enclosed. The left quote will match the first following quote right before the delimiter. If there is an
76+
unenclosed quote, an error will occur.
77+
78+
**Use First Row as Header:** Whether to use the first line of each file as the column headers. Supported formats are 'text', 'csv', 'tsv', 'xls', 'delimited'.
79+
80+
**Terminate Reading After Empty Row:** Specify whether to stop reading after encountering the first empty row. Defaults to false. When false the reader will read all rows in the sheet. This is only used when the format is 'xls'.
81+
82+
**Select Sheet Using:** Select the sheet by name or number. Default is 'Sheet Number'. This is only used when the format is 'xls'.
83+
84+
**Sheet Value:** The name/number of the sheet to read from. If not specified, the first sheet will be read.
85+
Sheet Numbers are 0 based, ie first sheet is 0. This is only used when the format is 'xls'.
86+
3687
### Filtering
3788

3889
**Filter:** Filter that can be applied to the files in the selected directory.
@@ -107,6 +158,10 @@ Default 0 value means unlimited. Is not applicable for files in Google formats.
107158

108159
**Body Output Format** Output format for body of file. "Bytes" and "String" values are available.
109160

161+
**File System Properties:** Additional properties to use with the InputFormat when reading the data.
162+
163+
**File Encoding:** The character encoding for the file(s) to be read. The default encoding is UTF-8.
164+
110165
### Exporting
111166

112167
**Google Documents Export Format:** MIME type which is used for Google Documents when converted to structured records.

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
<properties>
3030
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
3131
<cdap.version>6.8.0</cdap.version>
32+
<hydrator.version>2.10.0</hydrator.version>
3233
<cdap.plugin.version>2.10.0</cdap.plugin.version>
3334
<commons-collections.version>3.2.2</commons-collections.version>
3435
<commons-lang3.version>3.9</commons-lang3.version>
@@ -63,6 +64,11 @@
6364
<artifactId>hydrator-common</artifactId>
6465
<version>${cdap.plugin.version}</version>
6566
</dependency>
67+
<dependency>
68+
<groupId>io.cdap.plugin</groupId>
69+
<artifactId>format-common</artifactId>
70+
<version>${hydrator.version}</version>
71+
</dependency>
6672
<dependency>
6773
<groupId>io.cdap.cdap</groupId>
6874
<artifactId>cdap-data-pipeline2_2.11</artifactId>

src/main/java/io/cdap/plugin/google/common/GoogleAuthBaseConfig.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ public abstract class GoogleAuthBaseConfig extends PluginConfig {
156156
* @param collector the failure collector is provided
157157
* @return The ValidationResult
158158
*/
159-
public ValidationResult validate(FailureCollector collector) {
159+
public ValidationResult getValidationResult(FailureCollector collector) {
160160
IdUtils.validateReferenceName(referenceName, collector);
161161
checkIfDirectoryOrFileIdentifierExists(collector);
162162
ValidationResult validationResult = new ValidationResult();
@@ -171,14 +171,13 @@ public ValidationResult validate(FailureCollector collector) {
171171
propertiesAreValid = validateServiceAccount(collector);
172172
break;
173173
default:
174-
collector.addFailure(String.format("'%s' is not processed value.", authType.toString()), null)
174+
collector.addFailure(String.format("'%s' is not processed value.", authType), null)
175175
.withConfigProperty(AUTH_TYPE);
176176
return validationResult;
177177
}
178178
if (propertiesAreValid) {
179179
try {
180-
GoogleDriveClient client = new GoogleDriveClient(this);
181-
180+
GoogleDriveClient<GoogleAuthBaseConfig> client = new GoogleDriveClient<>(this);
182181
// check directory or file access
183182
if (isDirectoryOrFileAccessible(collector, client)) {
184183
validationResult.setDirectoryOrFileAccessible(true);
@@ -240,10 +239,11 @@ private boolean validateServiceAccount(FailureCollector collector) {
240239
.withConfigProperty(NAME_SERVICE_ACCOUNT_JSON);
241240
}
242241
}
243-
return collector.getValidationFailures().size() == 0;
242+
return collector.getValidationFailures().isEmpty();
244243
}
245244

246-
private boolean isDirectoryOrFileAccessible(FailureCollector collector, GoogleDriveClient driveClient)
245+
private boolean isDirectoryOrFileAccessible(FailureCollector collector,
246+
GoogleDriveClient<GoogleAuthBaseConfig> driveClient)
247247
throws IOException {
248248
if (containsMacro(FILE_IDENTIFIER) || containsMacro(DIRECTORY_IDENTIFIER)) {
249249
return false;

src/main/java/io/cdap/plugin/google/common/GoogleDriveClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public GoogleDriveClient(C config) throws IOException {
6868
* @return {@link Drive} client.
6969
* @throws IOException on issues with service account file reading.
7070
*/
71-
protected Drive getDriveClient() throws IOException {
71+
public Drive getDriveClient() throws IOException {
7272
Drive drive;
7373
AuthType authType = config.getAuthType();
7474
switch (authType) {

src/main/java/io/cdap/plugin/google/common/GoogleFilteringSourceConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,8 @@ public GoogleFilteringSourceConfig() {
8282
* @param collector the failure collector is provided
8383
* @return The ValidationResult
8484
*/
85-
public ValidationResult validate(FailureCollector collector) {
86-
ValidationResult validationResult = super.validate(collector);
85+
public ValidationResult getValidationResult(FailureCollector collector) {
86+
ValidationResult validationResult = super.getValidationResult(collector);
8787
if (validateModificationDateRange(collector)
8888
&& getModificationDateRangeType().equals(ModifiedDateRangeType.CUSTOM)) {
8989
if (checkPropertyIsSet(collector, startDate, START_DATE, START_DATE_LABEL)) {

src/main/java/io/cdap/plugin/google/drive/sink/GoogleDriveSinkConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public class GoogleDriveSinkConfig extends GoogleInputSchemaFieldsUsageConfig {
6060
* @param schema the schema to check compatibility
6161
*/
6262
public void validate(FailureCollector collector, Schema schema) {
63-
super.validate(collector);
63+
super.getValidationResult(collector);
6464

6565
// validate body field is in schema and has valid format
6666
validateSchemaField(collector, schema, SCHEMA_BODY_FIELD_NAME, schemaBodyFieldName,
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.google.drive.source;
18+
19+
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
20+
import io.cdap.plugin.format.plugin.AbstractFileSource;
21+
import java.util.HashMap;
22+
import java.util.Map;
23+
24+
/**
25+
* This class gives ability to read files from Google Drive with a structured schema.
26+
*/
27+
public class GoogleDriveFileSource extends AbstractFileSource<GoogleDriveSourceConfig> {
28+
public static final String NAME = "GoogleDrive";
29+
private final GoogleDriveSourceConfig config;
30+
31+
public GoogleDriveFileSource(GoogleDriveSourceConfig config) {
32+
super(config);
33+
this.config = config;
34+
}
35+
36+
@Override
37+
protected Map<String, String> getFileSystemProperties(BatchSourceContext context) {
38+
return new HashMap<>(config.getFileSystemProperties());
39+
}
40+
41+
}

src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveInputFormat.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
import com.github.rholder.retry.RetryException;
2020
import com.google.api.services.drive.model.File;
21+
import io.cdap.cdap.api.data.format.StructuredRecord;
2122
import org.apache.hadoop.conf.Configuration;
23+
import org.apache.hadoop.io.NullWritable;
2224
import org.apache.hadoop.mapreduce.InputFormat;
2325
import org.apache.hadoop.mapreduce.InputSplit;
2426
import org.apache.hadoop.mapreduce.JobContext;
@@ -33,7 +35,7 @@
3335
/**
3436
* Input format class which generates splits for each query.
3537
*/
36-
public class GoogleDriveInputFormat extends InputFormat {
38+
public class GoogleDriveInputFormat extends InputFormat<NullWritable, StructuredRecord> {
3739

3840
@Override
3941
public List<InputSplit> getSplits(JobContext jobContext) throws IOException {
@@ -75,7 +77,8 @@ private GoogleDriveSplit getSplitWithUnlimitedPartitionSize(String fileId) {
7577
}
7678

7779
@Override
78-
public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) {
80+
public RecordReader<NullWritable, StructuredRecord> createRecordReader(InputSplit inputSplit,
81+
TaskAttemptContext taskAttemptContext) {
7982
return new GoogleDriveRecordReader();
8083
}
8184
}

src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveRecordReader.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.cdap.plugin.google.drive.source;
1818

1919
import com.github.rholder.retry.RetryException;
20+
import io.cdap.cdap.api.data.format.StructuredRecord;
2021
import io.cdap.plugin.google.drive.common.FileFromFolder;
2122
import org.apache.hadoop.conf.Configuration;
2223
import org.apache.hadoop.io.NullWritable;
@@ -28,12 +29,13 @@
2829
import java.util.concurrent.ExecutionException;
2930

3031
/**
31-
* RecordReader implementation, which reads {@link FileFromFolder} wrappers from Google Drive using
32+
* RecordReader implementation, which reads {@link StructuredRecord} wrappers from Google Drive using
3233
* Google Drive API.
3334
*/
34-
public class GoogleDriveRecordReader extends RecordReader<NullWritable, FileFromFolder> {
35+
public class GoogleDriveRecordReader extends RecordReader<NullWritable, StructuredRecord> {
3536

3637
private GoogleDriveSourceClient googleDriveSourceClient;
38+
private GoogleDriveSourceConfig googleDriveSourceConfig;
3739
private String fileId;
3840
private long bytesFrom;
3941
private long bytesTo;
@@ -43,8 +45,7 @@ public class GoogleDriveRecordReader extends RecordReader<NullWritable, FileFrom
4345
@Override
4446
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException {
4547
Configuration conf = taskAttemptContext.getConfiguration();
46-
GoogleDriveSourceConfig googleDriveSourceConfig =
47-
GoogleDriveInputFormatProvider.extractPropertiesFromConfig(conf);
48+
googleDriveSourceConfig = GoogleDriveInputFormatProvider.extractPropertiesFromConfig(conf);
4849
googleDriveSourceClient = new GoogleDriveSourceClient(googleDriveSourceConfig);
4950

5051
GoogleDriveSplit split = (GoogleDriveSplit) inputSplit;
@@ -62,18 +63,20 @@ public boolean nextKeyValue() {
6263

6364
@Override
6465
public NullWritable getCurrentKey() {
65-
return null;
66+
return NullWritable.get();
6667
}
6768

6869
@Override
69-
public FileFromFolder getCurrentValue() throws IOException {
70+
public StructuredRecord getCurrentValue() throws IOException {
7071
// read file and content
7172
isFileProcessed = true;
7273
try {
7374
if (isPartitioned) {
74-
return googleDriveSourceClient.getFilePartition(fileId, bytesFrom, bytesTo);
75+
return FilesFromFolderTransformer.transform(
76+
googleDriveSourceClient.getFilePartition(fileId, bytesFrom, bytesTo), googleDriveSourceConfig.getSchema());
7577
} else {
76-
return googleDriveSourceClient.getFile(fileId);
78+
return FilesFromFolderTransformer.transform(googleDriveSourceClient.getFile(fileId),
79+
googleDriveSourceConfig.getSchema());
7780
}
7881
} catch (ExecutionException | RetryException e) {
7982
throw new RuntimeException("Exception during file or file part reading.", e);
@@ -88,6 +91,6 @@ public float getProgress() {
8891

8992
@Override
9093
public void close() {
91-
94+
// no-op
9295
}
9396
}

src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveSource.java

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,11 @@
2323
import io.cdap.cdap.api.data.batch.Input;
2424
import io.cdap.cdap.api.data.format.StructuredRecord;
2525
import io.cdap.cdap.api.data.schema.Schema;
26-
import io.cdap.cdap.api.dataset.lib.KeyValue;
27-
import io.cdap.cdap.etl.api.Emitter;
2826
import io.cdap.cdap.etl.api.FailureCollector;
2927
import io.cdap.cdap.etl.api.PipelineConfigurer;
3028
import io.cdap.cdap.etl.api.batch.BatchSource;
3129
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
3230
import io.cdap.plugin.common.LineageRecorder;
33-
import io.cdap.plugin.google.drive.common.FileFromFolder;
3431
import org.apache.hadoop.io.NullWritable;
3532

3633
import java.util.stream.Collectors;
@@ -41,42 +38,47 @@
4138
@Plugin(type = BatchSource.PLUGIN_TYPE)
4239
@Name(GoogleDriveSource.NAME)
4340
@Description("Reads fileset from specified Google Drive directory.")
44-
public class GoogleDriveSource extends BatchSource<NullWritable, FileFromFolder, StructuredRecord> {
41+
public class GoogleDriveSource extends BatchSource<NullWritable, StructuredRecord, StructuredRecord> {
4542
public static final String NAME = "GoogleDrive";
4643

4744
private final GoogleDriveSourceConfig config;
45+
private final GoogleDriveFileSource delegate;
4846

4947
public GoogleDriveSource(GoogleDriveSourceConfig config) {
5048
this.config = config;
49+
this.delegate = new GoogleDriveFileSource(config);
5150
}
5251

5352
@Override
5453
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
5554
FailureCollector failureCollector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
56-
config.validate(failureCollector);
55+
config.getValidationResult(failureCollector);
5756
failureCollector.getOrThrowException();
5857

59-
pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema());
58+
if (!config.isStructuredSchemaRequired()) {
59+
pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema());
60+
return;
61+
}
62+
delegate.configurePipeline(pipelineConfigurer);
6063
}
6164

6265
@Override
63-
public void prepareRun(BatchSourceContext context) {
66+
public void prepareRun(BatchSourceContext context) throws Exception {
6467
FailureCollector failureCollector = context.getFailureCollector();
65-
config.validate(failureCollector);
68+
config.getValidationResult(failureCollector);
6669
failureCollector.getOrThrowException();
6770

68-
LineageRecorder lineageRecorder = new LineageRecorder(context, config.getReferenceName());
69-
lineageRecorder.createExternalDataset(config.getSchema());
70-
lineageRecorder.recordRead("Read", "Reading Google Drive files",
71-
Preconditions.checkNotNull(config.getSchema().getFields()).stream()
72-
.map(Schema.Field::getName)
73-
.collect(Collectors.toList()));
71+
if (!config.isStructuredSchemaRequired()) {
72+
LineageRecorder lineageRecorder = new LineageRecorder(context, config.getReferenceName());
73+
lineageRecorder.createExternalDataset(config.getSchema());
74+
lineageRecorder.recordRead("Read", "Reading Google Drive files",
75+
Preconditions.checkNotNull(config.getSchema().getFields()).stream().map(Schema.Field::getName)
76+
.collect(Collectors.toList()));
7477

75-
context.setInput(Input.of(config.getReferenceName(), new GoogleDriveInputFormatProvider(config)));
78+
context.setInput(Input.of(config.getReferenceName(), new GoogleDriveInputFormatProvider(config)));
79+
return;
80+
}
81+
delegate.prepareRun(context);
7682
}
7783

78-
@Override
79-
public void transform(KeyValue<NullWritable, FileFromFolder> input, Emitter<StructuredRecord> emitter) {
80-
emitter.emit(FilesFromFolderTransformer.transform(input.getValue(), config.getSchema()));
81-
}
8284
}

0 commit comments

Comments
 (0)