Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions docs/GoogleDrive-batchsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,57 @@ Then the File Identifier would be `17W3vOhBwe0i24OdVNsbz8rAMClzUitKeAbumTqWFrkow
They will be a part of output structured record. Descriptions for properties can be view at
[Drive API file reference](https://developers.google.com/drive/api/v3/reference/files).

**Structured Schema Required :** Set this property to `true` if the output should be structured records.
Using structured records allows you to use read files with formats like avro, csv, json, text, xls, etc.

**Format:** Format of the data to read.
The format must be one of 'avro', 'blob', 'csv', 'delimited', 'json', 'parquet', 'text', 'tsv', 'xls', or the
name of any format plugin that you have deployed to your environment.
If the format is a macro, only the pre-packaged formats can be used.
If the format is 'blob', every input file will be read into a separate record.
The 'blob' format also requires a schema that contains a field named 'body' of type 'bytes'.
If the format is 'text', the schema must contain a field named 'body' of type 'string'.

**Get Schema:** Auto-detects schema from file. Supported formats are: avro, parquet, csv, delimited, tsv, blob, xls
and text.

Blob - is set by default as field named 'body' of type bytes.

Text - is set by default as two fields: 'body' of type bytes and 'offset' of type 'long'.

JSON - is not supported, user has to manually provide the output schema.

Parquet - If the path is a directory, the plugin will look for files ending in '.parquet' to read the schema from.
If no such file can be found, an error will be returned.

Avro - If the path is a directory, the plugin will look for files ending in '.avro' to read the schema from.
If no such file can be found, an error will be returned.

**Sample Size:** The maximum number of rows that will get investigated for automatic data type detection.
The default value is 1000. This is used when the format is `xls`, `csv`, `tsv`, `delimited`.

**Override:** A list of columns with the corresponding data types for whom the automatic data type detection gets
skipped. This is used when the format is `xls`, `csv`, `tsv`, `delimited`.

**Delimiter:** Delimiter to use when the format is 'delimited'. This will be ignored for other formats.

**Enable Quoted Values** Whether to treat content between quotes as a value. This value will only be used if the format
is 'csv', 'tsv' or 'delimited'. For example, if this is set to true, a line that looks like `1, "a, b, c"` will output two fields.
The first field will have `1` as its value and the second will have `a, b, c` as its value. The quote characters will be trimmed.
The newline delimiter cannot be within quotes.

It also assumes the quotes are well enclosed. The left quote will match the first following quote right before the delimiter. If there is an
unenclosed quote, an error will occur.

**Use First Row as Header:** Whether to use the first line of each file as the column headers. Supported formats are 'text', 'csv', 'tsv', 'xls', 'delimited'.

**Terminate Reading After Empty Row:** Specify whether to stop reading after encountering the first empty row. Defaults to false. When false the reader will read all rows in the sheet. This is only used when the format is 'xls'.

**Select Sheet Using:** Select the sheet by name or number. Default is 'Sheet Number'. This is only used when the format is 'xls'.

**Sheet Value:** The name/number of the sheet to read from. If not specified, the first sheet will be read.
Sheet Numbers are 0 based, ie first sheet is 0. This is only used when the format is 'xls'.

### Filtering

**Filter:** Filter that can be applied to the files in the selected directory.
Expand Down Expand Up @@ -107,6 +158,10 @@ Default 0 value means unlimited. Is not applicable for files in Google formats.

**Body Output Format** Output format for body of file. "Bytes" and "String" values are available.

**File System Properties:** Additional properties to use with the InputFormat when reading the data.

**File Encoding:** The character encoding for the file(s) to be read. The default encoding is UTF-8.

### Exporting

**Google Documents Export Format:** MIME type which is used for Google Documents when converted to structured records.
Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@
<artifactId>hydrator-common</artifactId>
<version>${cdap.plugin.version}</version>
</dependency>
<dependency>
<groupId>io.cdap.plugin</groupId>
<artifactId>format-common</artifactId>
<version>${cdap.plugin.version}</version>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-data-pipeline2_2.11</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public abstract class GoogleAuthBaseConfig extends PluginConfig {
* @param collector the failure collector is provided
* @return The ValidationResult
*/
public ValidationResult validate(FailureCollector collector) {
public ValidationResult getValidationResult(FailureCollector collector) {
IdUtils.validateReferenceName(referenceName, collector);
checkIfDirectoryOrFileIdentifierExists(collector);
ValidationResult validationResult = new ValidationResult();
Expand All @@ -171,13 +171,13 @@ public ValidationResult validate(FailureCollector collector) {
propertiesAreValid = validateServiceAccount(collector);
break;
default:
collector.addFailure(String.format("'%s' is not processed value.", authType.toString()), null)
collector.addFailure(String.format("'%s' is not processed value.", authType), null)
.withConfigProperty(AUTH_TYPE);
return validationResult;
}
if (propertiesAreValid) {
try {
GoogleDriveClient client = new GoogleDriveClient(this);
GoogleDriveClient<GoogleAuthBaseConfig> client = new GoogleDriveClient<>(this);

// check directory or file access
if (isDirectoryOrFileAccessible(collector, client)) {
Expand Down Expand Up @@ -240,10 +240,11 @@ private boolean validateServiceAccount(FailureCollector collector) {
.withConfigProperty(NAME_SERVICE_ACCOUNT_JSON);
}
}
return collector.getValidationFailures().size() == 0;
return collector.getValidationFailures().isEmpty();
}

private boolean isDirectoryOrFileAccessible(FailureCollector collector, GoogleDriveClient driveClient)
private boolean isDirectoryOrFileAccessible(FailureCollector collector,
GoogleDriveClient<GoogleAuthBaseConfig> driveClient)
throws IOException {
if (containsMacro(FILE_IDENTIFIER) || containsMacro(DIRECTORY_IDENTIFIER)) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public GoogleDriveClient(C config) throws IOException {
* @return {@link Drive} client.
* @throws IOException on issues with service account file reading.
*/
protected Drive getDriveClient() throws IOException {
public Drive getDriveClient() throws IOException {
Drive drive;
AuthType authType = config.getAuthType();
switch (authType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ public GoogleFilteringSourceConfig() {
* @param collector the failure collector is provided
* @return The ValidationResult
*/
public ValidationResult validate(FailureCollector collector) {
ValidationResult validationResult = super.validate(collector);
public ValidationResult getValidationResult(FailureCollector collector) {
ValidationResult validationResult = super.getValidationResult(collector);
if (validateModificationDateRange(collector)
&& getModificationDateRangeType().equals(ModifiedDateRangeType.CUSTOM)) {
if (checkPropertyIsSet(collector, startDate, START_DATE, START_DATE_LABEL)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public class GoogleDriveSinkConfig extends GoogleInputSchemaFieldsUsageConfig {
* @param schema the schema to check compatibility
*/
public void validate(FailureCollector collector, Schema schema) {
super.validate(collector);
super.getValidationResult(collector);
collector.getOrThrowException();

// validate body field is in schema and has valid format
validateSchemaField(collector, schema, SCHEMA_BODY_FIELD_NAME, schemaBodyFieldName,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright © 2025 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.google.drive.source;

import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.plugin.format.plugin.AbstractFileSource;
import java.util.HashMap;
import java.util.Map;

/**
* This class gives ability to read files from Google Drive with a structured schema.
*/
public class GoogleDriveFileSource extends AbstractFileSource<GoogleDriveSourceConfig> {
public static final String NAME = "GoogleDrive";
private final GoogleDriveSourceConfig config;

public GoogleDriveFileSource(GoogleDriveSourceConfig config) {
super(config);
this.config = config;
}

@Override
protected Map<String, String> getFileSystemProperties(BatchSourceContext context) {
FailureCollector collector = context == null ? null : context.getFailureCollector();
return new HashMap<>(config.getFileSystemProperties(collector));
}

@Override
protected boolean shouldGetSchema() {
return !config.containsMacro(GoogleDriveSourceConfig.NAME_FORMAT)
&& !config.containsMacro(GoogleDriveSourceConfig.NAME_DELIMITER)
&& !config.containsMacro(GoogleDriveSourceConfig.NAME_FILE_SYSTEM_PROPERTIES)
&& !config.containsMacro(GoogleDriveSourceConfig.NAME_FILE_ENCODING);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import com.github.rholder.retry.RetryException;
import com.google.api.services.drive.model.File;
import io.cdap.cdap.api.data.format.StructuredRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
Expand All @@ -33,7 +35,7 @@
/**
* Input format class which generates splits for each query.
*/
public class GoogleDriveInputFormat extends InputFormat {
public class GoogleDriveInputFormat extends InputFormat<NullWritable, StructuredRecord> {

@Override
public List<InputSplit> getSplits(JobContext jobContext) throws IOException {
Expand Down Expand Up @@ -75,7 +77,8 @@ private GoogleDriveSplit getSplitWithUnlimitedPartitionSize(String fileId) {
}

@Override
public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) {
public RecordReader<NullWritable, StructuredRecord> createRecordReader(InputSplit inputSplit,
TaskAttemptContext taskAttemptContext) {
return new GoogleDriveRecordReader();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package io.cdap.plugin.google.drive.source;

import com.github.rholder.retry.RetryException;
import io.cdap.plugin.google.drive.common.FileFromFolder;
import io.cdap.cdap.api.data.format.StructuredRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
Expand All @@ -28,12 +28,13 @@
import java.util.concurrent.ExecutionException;

/**
* RecordReader implementation, which reads {@link FileFromFolder} wrappers from Google Drive using
* RecordReader implementation, which reads {@link StructuredRecord} wrappers from Google Drive using
* Google Drive API.
*/
public class GoogleDriveRecordReader extends RecordReader<NullWritable, FileFromFolder> {
public class GoogleDriveRecordReader extends RecordReader<NullWritable, StructuredRecord> {

private GoogleDriveSourceClient googleDriveSourceClient;
private GoogleDriveSourceConfig googleDriveSourceConfig;
private String fileId;
private long bytesFrom;
private long bytesTo;
Expand All @@ -43,6 +44,7 @@ public class GoogleDriveRecordReader extends RecordReader<NullWritable, FileFrom
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException {
Configuration conf = taskAttemptContext.getConfiguration();
googleDriveSourceConfig = GoogleDriveInputFormatProvider.extractPropertiesFromConfig(conf);
GoogleDriveSourceConfig googleDriveSourceConfig =
GoogleDriveInputFormatProvider.extractPropertiesFromConfig(conf);
googleDriveSourceClient = new GoogleDriveSourceClient(googleDriveSourceConfig);
Expand All @@ -62,18 +64,20 @@ public boolean nextKeyValue() {

@Override
public NullWritable getCurrentKey() {
return null;
return NullWritable.get();
}

@Override
public FileFromFolder getCurrentValue() throws IOException {
public StructuredRecord getCurrentValue() throws IOException {
// read file and content
isFileProcessed = true;
try {
if (isPartitioned) {
return googleDriveSourceClient.getFilePartition(fileId, bytesFrom, bytesTo);
return FilesFromFolderTransformer.transform(
googleDriveSourceClient.getFilePartition(fileId, bytesFrom, bytesTo), googleDriveSourceConfig.getSchema());
} else {
return googleDriveSourceClient.getFile(fileId);
return FilesFromFolderTransformer.transform(googleDriveSourceClient.getFile(fileId),
googleDriveSourceConfig.getSchema());
}
} catch (ExecutionException | RetryException e) {
throw new RuntimeException("Exception during file or file part reading.", e);
Expand All @@ -88,6 +92,6 @@ public float getProgress() {

@Override
public void close() {

// no-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,11 @@
import io.cdap.cdap.api.data.batch.Input;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.google.drive.common.FileFromFolder;
import org.apache.hadoop.io.NullWritable;

import java.util.stream.Collectors;
Expand All @@ -41,42 +38,46 @@
@Plugin(type = BatchSource.PLUGIN_TYPE)
@Name(GoogleDriveSource.NAME)
@Description("Reads fileset from specified Google Drive directory.")
public class GoogleDriveSource extends BatchSource<NullWritable, FileFromFolder, StructuredRecord> {
public class GoogleDriveSource extends BatchSource<NullWritable, StructuredRecord, StructuredRecord> {
public static final String NAME = "GoogleDrive";

private final GoogleDriveSourceConfig config;
private final GoogleDriveFileSource delegate;

public GoogleDriveSource(GoogleDriveSourceConfig config) {
this.config = config;
this.delegate = new GoogleDriveFileSource(config);
}

@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
FailureCollector failureCollector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
config.validate(failureCollector);
config.getValidationResult(failureCollector);
failureCollector.getOrThrowException();

pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema());
if (!config.isStructuredSchemaRequired()) {
pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema());
return;
}
delegate.configurePipeline(pipelineConfigurer);
}

@Override
public void prepareRun(BatchSourceContext context) {
public void prepareRun(BatchSourceContext context) throws Exception {
FailureCollector failureCollector = context.getFailureCollector();
config.validate(failureCollector);
config.getValidationResult(failureCollector);
failureCollector.getOrThrowException();

LineageRecorder lineageRecorder = new LineageRecorder(context, config.getReferenceName());
lineageRecorder.createExternalDataset(config.getSchema());
lineageRecorder.recordRead("Read", "Reading Google Drive files",
Preconditions.checkNotNull(config.getSchema().getFields()).stream()
.map(Schema.Field::getName)
.collect(Collectors.toList()));
if (!config.isStructuredSchemaRequired()) {
LineageRecorder lineageRecorder = new LineageRecorder(context, config.getReferenceName());
lineageRecorder.createExternalDataset(config.getSchema());
lineageRecorder.recordRead("Read", "Reading Google Drive files",
Preconditions.checkNotNull(config.getSchema().getFields()).stream().map(Schema.Field::getName)
.collect(Collectors.toList()));

context.setInput(Input.of(config.getReferenceName(), new GoogleDriveInputFormatProvider(config)));
}

@Override
public void transform(KeyValue<NullWritable, FileFromFolder> input, Emitter<StructuredRecord> emitter) {
emitter.emit(FilesFromFolderTransformer.transform(input.getValue(), config.getSchema()));
context.setInput(Input.of(config.getReferenceName(), new GoogleDriveInputFormatProvider(config)));
return;
}
delegate.prepareRun(context);
}
}
Loading