Skip to content

Commit 490078a

Browse files
authored
Merge pull request #75 from cloudsufi/feat/AbstractFileBatchSource-ui
[PLUGIN-1906] Add AbstractFileBatchSource GDrive [UI + Docs]
2 parents 4bc3b37 + f8359cc commit 490078a

File tree

4 files changed

+946
-108
lines changed

4 files changed

+946
-108
lines changed

docs/GoogleDrive-batchsource.md

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,57 @@ Then the File Identifier would be `17W3vOhBwe0i24OdVNsbz8rAMClzUitKeAbumTqWFrkow
3333
They will be a part of output structured record. Descriptions for properties can be view at
3434
[Drive API file reference](https://developers.google.com/drive/api/v3/reference/files).
3535

36+
**Structured Schema Required :** Set this property to `true` if the output should be structured records.
37+
Using structured records allows you to use read files with formats like avro, csv, json, text, xls, etc.
38+
39+
**Format:** Format of the data to read.
40+
The format must be one of 'avro', 'blob', 'csv', 'delimited', 'json', 'parquet', 'text', 'tsv', 'xls', or the
41+
name of any format plugin that you have deployed to your environment.
42+
If the format is a macro, only the pre-packaged formats can be used.
43+
If the format is 'blob', every input file will be read into a separate record.
44+
The 'blob' format also requires a schema that contains a field named 'body' of type 'bytes'.
45+
If the format is 'text', the schema must contain a field named 'body' of type 'string'.
46+
47+
**Get Schema:** Auto-detects schema from file. Supported formats are: avro, parquet, csv, delimited, tsv, blob, xls
48+
and text.
49+
50+
Blob - is set by default as field named 'body' of type bytes.
51+
52+
Text - is set by default as two fields: 'body' of type bytes and 'offset' of type 'long'.
53+
54+
JSON - is not supported, user has to manually provide the output schema.
55+
56+
Parquet - If the path is a directory, the plugin will look for files ending in '.parquet' to read the schema from.
57+
If no such file can be found, an error will be returned.
58+
59+
Avro - If the path is a directory, the plugin will look for files ending in '.avro' to read the schema from.
60+
If no such file can be found, an error will be returned.
61+
62+
**Sample Size:** The maximum number of rows that will get investigated for automatic data type detection.
63+
The default value is 1000. This is used when the format is `xls`, `csv`, `tsv`, `delimited`.
64+
65+
**Override:** A list of columns with the corresponding data types for whom the automatic data type detection gets
66+
skipped. This is used when the format is `xls`, `csv`, `tsv`, `delimited`.
67+
68+
**Delimiter:** Delimiter to use when the format is 'delimited'. This will be ignored for other formats.
69+
70+
**Enable Quoted Values** Whether to treat content between quotes as a value. This value will only be used if the format
71+
is 'csv', 'tsv' or 'delimited'. For example, if this is set to true, a line that looks like `1, "a, b, c"` will output two fields.
72+
The first field will have `1` as its value and the second will have `a, b, c` as its value. The quote characters will be trimmed.
73+
The newline delimiter cannot be within quotes.
74+
75+
It also assumes the quotes are well enclosed. The left quote will match the first following quote right before the delimiter. If there is an
76+
unenclosed quote, an error will occur.
77+
78+
**Use First Row as Header:** Whether to use the first line of each file as the column headers. Supported formats are 'text', 'csv', 'tsv', 'xls', 'delimited'.
79+
80+
**Terminate Reading After Empty Row:** Specify whether to stop reading after encountering the first empty row. Defaults to false. When false the reader will read all rows in the sheet. This is only used when the format is 'xls'.
81+
82+
**Select Sheet Using:** Select the sheet by name or number. Default is 'Sheet Number'. This is only used when the format is 'xls'.
83+
84+
**Sheet Value:** The name/number of the sheet to read from. If not specified, the first sheet will be read.
85+
Sheet Numbers are 0 based, ie first sheet is 0. This is only used when the format is 'xls'.
86+
3687
### Filtering
3788

3889
**Filter:** Filter that can be applied to the files in the selected directory.
@@ -107,6 +158,10 @@ Default 0 value means unlimited. Is not applicable for files in Google formats.
107158

108159
**Body Output Format** Output format for body of file. "Bytes" and "String" values are available.
109160

161+
**File System Properties:** Additional properties to use with the InputFormat when reading the data.
162+
163+
**File Encoding:** The character encoding for the file(s) to be read. The default encoding is UTF-8.
164+
110165
### Exporting
111166

112167
**Google Documents Export Format:** MIME type which is used for Google Documents when converted to structured records.

pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@
6363
<artifactId>hydrator-common</artifactId>
6464
<version>${cdap.plugin.version}</version>
6565
</dependency>
66+
<dependency>
67+
<groupId>io.cdap.plugin</groupId>
68+
<artifactId>format-common</artifactId>
69+
<version>${cdap.plugin.version}</version>
70+
</dependency>
6671
<dependency>
6772
<groupId>io.cdap.cdap</groupId>
6873
<artifactId>cdap-data-pipeline2_2.11</artifactId>

src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveSourceConfig.java

Lines changed: 202 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,11 @@
2323
import io.cdap.cdap.api.annotation.Name;
2424
import io.cdap.cdap.api.data.schema.Schema;
2525
import io.cdap.cdap.etl.api.FailureCollector;
26+
import io.cdap.cdap.etl.api.StageContext;
27+
import io.cdap.plugin.format.FileFormat;
28+
import io.cdap.plugin.format.plugin.FileSourceProperties;
2629
import io.cdap.plugin.google.common.GoogleFilteringSourceConfig;
30+
import io.cdap.plugin.google.common.IdentifierType;
2731
import io.cdap.plugin.google.common.ValidationResult;
2832
import io.cdap.plugin.google.common.exceptions.InvalidPropertyTypeException;
2933
import io.cdap.plugin.google.common.utils.ExportedType;
@@ -33,13 +37,14 @@
3337
import java.util.Arrays;
3438
import java.util.Collections;
3539
import java.util.List;
40+
import java.util.regex.Pattern;
3641
import java.util.stream.Collectors;
3742
import javax.annotation.Nullable;
3843

3944
/**
4045
* Configurations for Google Drive Batch Source plugin.
4146
*/
42-
public class GoogleDriveSourceConfig extends GoogleFilteringSourceConfig {
47+
public class GoogleDriveSourceConfig extends GoogleFilteringSourceConfig implements FileSourceProperties {
4348
public static final String FILE_METADATA_PROPERTIES = "fileMetadataProperties";
4449
public static final String CONFIGURATION_PARSE_PROPERTY_NAME = "properties";
4550
public static final String FILE_TYPES_TO_PULL = "fileTypesToPull";
@@ -49,6 +54,9 @@ public class GoogleDriveSourceConfig extends GoogleFilteringSourceConfig {
4954
public static final String SHEETS_EXPORTING_FORMAT = "sheetsExportingFormat";
5055
public static final String DRAWINGS_EXPORTING_FORMAT = "drawingsExportingFormat";
5156
public static final String PRESENTATIONS_EXPORTING_FORMAT = "presentationsExportingFormat";
57+
public static final String IS_STRUCTURED_SCHEMA_REQUIRED = "structuredSchemaRequired";
58+
public static final String NAME_SCHEMA = "schema";
59+
public static final String NAME_FORMAT = "format";
5260

5361
public static final String DEFAULT_BODY_FORMAT = "bytes";
5462
public static final long DEFAULT_MAX_PARTITION_SIZE = 0;
@@ -61,6 +69,12 @@ public class GoogleDriveSourceConfig extends GoogleFilteringSourceConfig {
6169
public static final String FILE_TYPES_TO_PULL_LABEL = "File types to pull";
6270
public static final String BODY_FORMAT_LABEL = "Body output format";
6371

72+
public static final String GOOGLE_DRIVE_SCHEMA = "drive";
73+
public static final String GOOGLE_DRIVE_AUTHORITY = "drive.google.com";
74+
public static final String GOOGLE_DRIVE_FILE_PATH_PREFIX = "/drive/file/d";
75+
public static final String GOOGLE_DRIVE_FOLDER_PATH_PREFIX = "/drive/folders";
76+
public static final String GOOGLE_DRIVE_DEFAULT_FILENAME = "default.txt";
77+
6478
@Nullable
6579
@Name(FILE_METADATA_PROPERTIES)
6680
@Description("Properties that represent metadata of files. \n" +
@@ -113,7 +127,91 @@ public class GoogleDriveSourceConfig extends GoogleFilteringSourceConfig {
113127
@Nullable
114128
@Macro
115129
protected String presentationsExportingFormat;
116-
private transient Schema schema = null;
130+
131+
@Macro
132+
@Nullable
133+
@Description("Output schema for the source. Formats like 'avro' and 'parquet' require a schema in order to "
134+
+ "read the data.")
135+
private String schema;
136+
137+
@Name(IS_STRUCTURED_SCHEMA_REQUIRED)
138+
@Description("Wheather to fetch schema or not")
139+
@Nullable
140+
protected Boolean isStructuredSchemaRequired;
141+
142+
@Name(NAME_FORMAT)
143+
@Macro
144+
@Description("Format of the data to read. Supported formats are 'csv'....")
145+
@Nullable
146+
private String format;
147+
148+
@Macro
149+
@Nullable
150+
@Description("Whether to recursively read directories within the input directory. The default is false.")
151+
private Boolean recursive;
152+
153+
@Macro
154+
@Nullable
155+
@Description("Whether to allow an input that does not exist. When false, the source will fail the run if the input "
156+
+ "does not exist. When true, the run will not fail and the source will not generate any output. "
157+
+ "The default value is false.")
158+
private Boolean ignoreNonExistingFolders;
159+
160+
@Macro
161+
@Nullable
162+
@Description("The maximum number of rows that will get investigated for automatic data type detection.")
163+
private Long sampleSize;
164+
165+
@Macro
166+
@Nullable
167+
@Description("A list of columns with the corresponding data types for whom the automatic data type detection gets " +
168+
"skipped.")
169+
private String override;
170+
171+
@Macro
172+
@Nullable
173+
@Description("The delimiter to use if the format is 'delimited'. The delimiter will be ignored if the format "
174+
+ "is anything other than 'delimited'.")
175+
private String delimiter;
176+
177+
@Macro
178+
@Nullable
179+
@Description("Whether to use first row as header. Supported formats are 'text', 'csv', 'tsv', " +
180+
"'delimited'. Default value is false.")
181+
private Boolean skipHeader;
182+
183+
@Macro
184+
@Nullable
185+
@Description("Whether to treat content between quotes as a value. This value will only be used if the format " +
186+
"is 'csv', 'tsv' or 'delimited'. The default value is false.")
187+
protected Boolean enableQuotedValues;
188+
189+
@Macro
190+
@Nullable
191+
@Description("Any additional properties to use when reading from the filesystem. "
192+
+ "This is an advanced feature that requires knowledge of the properties supported by the underlying filesystem.")
193+
private String fileSystemProperties;
194+
195+
@Macro
196+
@Nullable
197+
@Description("File encoding for the source files. The default encoding is 'UTF-8'")
198+
private String fileEncoding;
199+
200+
@Macro
201+
@Nullable
202+
@Description("Select the sheet by name or number. Default is 'Sheet Number'.")
203+
private String sheet;
204+
205+
@Macro
206+
@Nullable
207+
@Description("The name/number of the sheet to read from. If not specified, the first sheet will be read." +
208+
"Sheet Numbers are 0 based, ie first sheet is 0.")
209+
private String sheetValue;
210+
211+
@Macro
212+
@Nullable
213+
@Description("Specify whether to stop reading after encountering the first empty row. Defaults to false.")
214+
private String terminateIfEmptyRow;
117215

118216
public GoogleDriveSourceConfig(String referenceName, @Nullable String fileMetadataProperties, String fileTypesToPull,
119217
String maxPartitionSize, String bodyFormat, String sheetsExportingFormat,
@@ -134,15 +232,103 @@ public GoogleDriveSourceConfig(String referenceName, @Nullable String fileMetada
134232
this.endDate = endDate;
135233
}
136234

235+
@Override
236+
public void validate(FailureCollector collector) {
237+
getSchema();
238+
// Extra validation when structure schema is required
239+
}
240+
241+
@Override
242+
public String getPath() {
243+
IdentifierType idType = getIdentifierType();
244+
if (idType == IdentifierType.FILE_IDENTIFIER) {
245+
return String.format("%s://%s%s/%s/%s", GOOGLE_DRIVE_SCHEMA, GOOGLE_DRIVE_AUTHORITY,
246+
GOOGLE_DRIVE_FILE_PATH_PREFIX, getFileIdentifier(), GOOGLE_DRIVE_DEFAULT_FILENAME);
247+
} else if (idType == IdentifierType.DIRECTORY_IDENTIFIER) {
248+
return String.format("%s://%s%s/%s/", GOOGLE_DRIVE_SCHEMA, GOOGLE_DRIVE_AUTHORITY,
249+
GOOGLE_DRIVE_FOLDER_PATH_PREFIX, getDirectoryIdentifier());
250+
}
251+
throw new IllegalArgumentException(String.format("Invalid identifier type '%s'. Expected one of: %s or %s.", idType,
252+
IdentifierType.FILE_IDENTIFIER, IdentifierType.DIRECTORY_IDENTIFIER));
253+
}
254+
255+
@Override
256+
public String getPath(StageContext context) {
257+
return getPath();
258+
}
259+
260+
@Override
261+
public String getFormatName() {
262+
// need to do this for backwards compatibility, where the pre-packaged format names were case insensitive.
263+
try {
264+
FileFormat fileFormat = FileFormat.from(format, x -> true);
265+
return fileFormat.name().toLowerCase();
266+
} catch (IllegalArgumentException e) {
267+
// ignore
268+
}
269+
return format;
270+
}
271+
272+
@Nullable
273+
@Override
274+
public FileFormat getFormat() {
275+
throw new UnsupportedOperationException("GDrive does not support: FileFormat getFormat() method");
276+
}
277+
278+
@Nullable
279+
@Override
280+
public Pattern getFilePattern() {
281+
return null;
282+
}
283+
284+
@Override
285+
public long getMaxSplitSize() {
286+
return Long.MAX_VALUE;
287+
}
288+
289+
@Override
290+
public boolean shouldAllowEmptyInput() {
291+
return ignoreNonExistingFolders != null && ignoreNonExistingFolders;
292+
}
293+
294+
@Override
295+
public boolean shouldReadRecursively() {
296+
return recursive != null && recursive;
297+
}
298+
299+
@Nullable
300+
@Override
301+
public String getPathField() {
302+
return null;
303+
}
304+
305+
@Override
306+
public boolean useFilenameAsPath() {
307+
throw new UnsupportedOperationException("GDrive does not support: boolean useFilenameAsPath() method");
308+
}
309+
310+
@Override
311+
public boolean skipHeader() {
312+
throw new UnsupportedOperationException("GDrive does not support: boolean skipHeader() method");
313+
}
314+
137315
/**
316+
* throw new UnsupportedOperationException("GDrive does not support: /**() method;
138317
* Returns the instance of Schema.
139318
* @return The instance of Schema
140319
*/
141320
public Schema getSchema() {
142-
if (schema == null) {
143-
schema = SchemaBuilder.buildSchema(getFileMetadataProperties(), getBodyFormat());
321+
if (!isStructuredSchemaRequired() && Strings.isNullOrEmpty(schema)) {
322+
schema = SchemaBuilder.buildSchema(getFileMetadataProperties(), getBodyFormat()).toString();
323+
}
324+
if (Strings.isNullOrEmpty(schema)) {
325+
return null;
326+
}
327+
try {
328+
return Schema.parseJson(schema);
329+
} catch (Exception e) {
330+
throw new IllegalArgumentException("Invalid schema: " + e.getMessage(), e);
144331
}
145-
return schema;
146332
}
147333

148334
/**
@@ -236,8 +422,15 @@ public String getDrawingsExportingFormat() {
236422
}
237423

238424
public String getPresentationsExportingFormat() {
239-
return Strings.isNullOrEmpty(presentationsExportingFormat) ? DEFAULT_PRESENTATIONS_EXPORTING_FORMAT
240-
: presentationsExportingFormat;
425+
return Strings.isNullOrEmpty(presentationsExportingFormat) ?
426+
DEFAULT_PRESENTATIONS_EXPORTING_FORMAT : presentationsExportingFormat;
427+
}
428+
429+
public boolean isStructuredSchemaRequired() {
430+
if (isStructuredSchemaRequired == null) {
431+
return false; // for backward compatibility, default to false
432+
}
433+
return isStructuredSchemaRequired;
241434
}
242435

243436
public GoogleDriveSourceConfig(String referenceName) {
@@ -284,8 +477,8 @@ public void setFilter(String filter) {
284477
this.filter = filter;
285478
}
286479

287-
public void setSchema(String schema) throws IOException {
288-
this.schema = Schema.parseJson(schema);
480+
public void setSchema(String schema) {
481+
this.schema = schema;
289482
}
290483

291484
public void setModificationDateRange(String modificationDateRange) {

0 commit comments

Comments
 (0)