Skip to content

Commit 1013bcd

Browse files
authored
Merge pull request #72 from cloudsufi/feat/AbstractFileBatchSource
[PLUGIN-1906] GDrive abstract file batch source plugin extension
2 parents 490078a + 4dc5202 commit 1013bcd

File tree

9 files changed

+623
-16
lines changed

9 files changed

+623
-16
lines changed

src/main/java/io/cdap/plugin/google/common/GoogleDriveClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public GoogleDriveClient(C config) throws IOException {
6868
* @return {@link Drive} client.
6969
* @throws IOException on issues with service account file reading.
7070
*/
71-
protected Drive getDriveClient() throws IOException {
71+
public Drive getDriveClient() throws IOException {
7272
Drive drive;
7373
AuthType authType = config.getAuthType();
7474
switch (authType) {
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.google.drive.source;
18+
19+
import io.cdap.cdap.etl.api.FailureCollector;
20+
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
21+
import io.cdap.plugin.format.plugin.AbstractFileSource;
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
25+
/**
26+
* This class gives ability to read files from Google Drive with a structured schema.
27+
*/
28+
public class GoogleDriveFileSource extends AbstractFileSource<GoogleDriveSourceConfig> {
29+
public static final String NAME = "GoogleDrive";
30+
private final GoogleDriveSourceConfig config;
31+
32+
public GoogleDriveFileSource(GoogleDriveSourceConfig config) {
33+
super(config);
34+
this.config = config;
35+
}
36+
37+
@Override
38+
protected Map<String, String> getFileSystemProperties(BatchSourceContext context) {
39+
FailureCollector collector = context == null ? null : context.getFailureCollector();
40+
return new HashMap<>(config.getFileSystemProperties(collector));
41+
}
42+
43+
@Override
44+
protected boolean shouldGetSchema() {
45+
return !config.containsMacro(GoogleDriveSourceConfig.NAME_FORMAT)
46+
&& !config.containsMacro(GoogleDriveSourceConfig.NAME_DELIMITER)
47+
&& !config.containsMacro(GoogleDriveSourceConfig.NAME_FILE_SYSTEM_PROPERTIES)
48+
&& !config.containsMacro(GoogleDriveSourceConfig.NAME_FILE_ENCODING);
49+
}
50+
}

src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveSource.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,11 @@
2323
import io.cdap.cdap.api.data.batch.Input;
2424
import io.cdap.cdap.api.data.format.StructuredRecord;
2525
import io.cdap.cdap.api.data.schema.Schema;
26-
import io.cdap.cdap.api.dataset.lib.KeyValue;
27-
import io.cdap.cdap.etl.api.Emitter;
2826
import io.cdap.cdap.etl.api.FailureCollector;
2927
import io.cdap.cdap.etl.api.PipelineConfigurer;
3028
import io.cdap.cdap.etl.api.batch.BatchSource;
3129
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
3230
import io.cdap.plugin.common.LineageRecorder;
33-
import io.cdap.plugin.google.drive.common.FileFromFolder;
3431
import org.apache.hadoop.io.NullWritable;
3532

3633
import java.util.stream.Collectors;
@@ -45,9 +42,11 @@ public class GoogleDriveSource extends BatchSource<NullWritable, StructuredRecor
4542
public static final String NAME = "GoogleDrive";
4643

4744
private final GoogleDriveSourceConfig config;
45+
private final GoogleDriveFileSource delegate;
4846

4947
public GoogleDriveSource(GoogleDriveSourceConfig config) {
5048
this.config = config;
49+
this.delegate = new GoogleDriveFileSource(config);
5150
}
5251

5352
@Override
@@ -56,22 +55,29 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
5655
config.getValidationResult(failureCollector);
5756
failureCollector.getOrThrowException();
5857

59-
pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema());
58+
if (!config.isStructuredSchemaRequired()) {
59+
pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema());
60+
return;
61+
}
62+
delegate.configurePipeline(pipelineConfigurer);
6063
}
6164

6265
@Override
63-
public void prepareRun(BatchSourceContext context) {
66+
public void prepareRun(BatchSourceContext context) throws Exception {
6467
FailureCollector failureCollector = context.getFailureCollector();
6568
config.getValidationResult(failureCollector);
6669
failureCollector.getOrThrowException();
6770

68-
LineageRecorder lineageRecorder = new LineageRecorder(context, config.getReferenceName());
69-
lineageRecorder.createExternalDataset(config.getSchema());
70-
lineageRecorder.recordRead("Read", "Reading Google Drive files",
71-
Preconditions.checkNotNull(config.getSchema().getFields()).stream()
72-
.map(Schema.Field::getName)
73-
.collect(Collectors.toList()));
71+
if (!config.isStructuredSchemaRequired()) {
72+
LineageRecorder lineageRecorder = new LineageRecorder(context, config.getReferenceName());
73+
lineageRecorder.createExternalDataset(config.getSchema());
74+
lineageRecorder.recordRead("Read", "Reading Google Drive files",
75+
Preconditions.checkNotNull(config.getSchema().getFields()).stream().map(Schema.Field::getName)
76+
.collect(Collectors.toList()));
7477

75-
context.setInput(Input.of(config.getReferenceName(), new GoogleDriveInputFormatProvider(config)));
78+
context.setInput(Input.of(config.getReferenceName(), new GoogleDriveInputFormatProvider(config)));
79+
return;
80+
}
81+
delegate.prepareRun(context);
7682
}
7783
}

src/main/java/io/cdap/plugin/google/drive/source/GoogleDriveSourceConfig.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717
package io.cdap.plugin.google.drive.source;
1818

1919
import com.google.common.base.Strings;
20+
import com.google.gson.Gson;
21+
import com.google.gson.GsonBuilder;
2022
import com.google.gson.JsonObject;
23+
import com.google.gson.reflect.TypeToken;
2124
import io.cdap.cdap.api.annotation.Description;
2225
import io.cdap.cdap.api.annotation.Macro;
2326
import io.cdap.cdap.api.annotation.Name;
@@ -31,12 +34,16 @@
3134
import io.cdap.plugin.google.common.ValidationResult;
3235
import io.cdap.plugin.google.common.exceptions.InvalidPropertyTypeException;
3336
import io.cdap.plugin.google.common.utils.ExportedType;
37+
import io.cdap.plugin.google.drive.source.fs.GoogleDriveFileSystem;
3438
import io.cdap.plugin.google.drive.source.utils.BodyFormat;
3539

3640
import java.io.IOException;
41+
import java.lang.reflect.Type;
3742
import java.util.Arrays;
3843
import java.util.Collections;
44+
import java.util.HashMap;
3945
import java.util.List;
46+
import java.util.Map;
4047
import java.util.regex.Pattern;
4148
import java.util.stream.Collectors;
4249
import javax.annotation.Nullable;
@@ -57,6 +64,9 @@ public class GoogleDriveSourceConfig extends GoogleFilteringSourceConfig impleme
5764
public static final String IS_STRUCTURED_SCHEMA_REQUIRED = "structuredSchemaRequired";
5865
public static final String NAME_SCHEMA = "schema";
5966
public static final String NAME_FORMAT = "format";
67+
public static final String NAME_DELIMITER = "delimiter";
68+
public static final String NAME_FILE_SYSTEM_PROPERTIES = "filesystemproperties";
69+
public static final String NAME_FILE_ENCODING = "fileEncoding";
6070

6171
public static final String DEFAULT_BODY_FORMAT = "bytes";
6272
public static final long DEFAULT_MAX_PARTITION_SIZE = 0;
@@ -75,6 +85,12 @@ public class GoogleDriveSourceConfig extends GoogleFilteringSourceConfig impleme
7585
public static final String GOOGLE_DRIVE_FOLDER_PATH_PREFIX = "/drive/folders";
7686
public static final String GOOGLE_DRIVE_DEFAULT_FILENAME = "default.txt";
7787

88+
public static final String GOOGLE_DRIVE_FILESYSTEM_IMPL = String.format("fs.%s.impl", GOOGLE_DRIVE_SCHEMA);
89+
public static final String GOOGLE_DRIVE_FILESYSTEM_DISABLE_CACHE_KEY = String.format("fs.%s.impl.disable.cache",
90+
GOOGLE_DRIVE_SCHEMA);
91+
public static final Gson GSON = new GsonBuilder().create();
92+
private static final Type MAP_STRING_STRING_TYPE = new TypeToken<Map<String, String>>() { }.getType();
93+
7894
@Nullable
7995
@Name(FILE_METADATA_PROPERTIES)
8096
@Description("Properties that represent metadata of files. \n" +
@@ -168,6 +184,7 @@ public class GoogleDriveSourceConfig extends GoogleFilteringSourceConfig impleme
168184
"skipped.")
169185
private String override;
170186

187+
@Name(NAME_DELIMITER)
171188
@Macro
172189
@Nullable
173190
@Description("The delimiter to use if the format is 'delimited'. The delimiter will be ignored if the format "
@@ -186,12 +203,14 @@ public class GoogleDriveSourceConfig extends GoogleFilteringSourceConfig impleme
186203
"is 'csv', 'tsv' or 'delimited'. The default value is false.")
187204
protected Boolean enableQuotedValues;
188205

206+
@Name(NAME_FILE_SYSTEM_PROPERTIES)
189207
@Macro
190208
@Nullable
191209
@Description("Any additional properties to use when reading from the filesystem. "
192210
+ "This is an advanced feature that requires knowledge of the properties supported by the underlying filesystem.")
193211
private String fileSystemProperties;
194212

213+
@Name(NAME_FILE_ENCODING)
195214
@Macro
196215
@Nullable
197216
@Description("File encoding for the source files. The default encoding is 'UTF-8'")
@@ -588,4 +607,31 @@ public static GoogleDriveSourceConfig of(JsonObject properties) throws IOExcepti
588607
}
589608
return googleDriveSourceConfig;
590609
}
610+
611+
public Map<String, String> getFileSystemProperties(@Nullable FailureCollector collector) {
612+
Map<String, String> properties = new HashMap<>();
613+
properties.put(GOOGLE_DRIVE_FILESYSTEM_IMPL, GoogleDriveFileSystem.class.getName());
614+
properties.put(GoogleDriveInputFormatProvider.PROPERTY_CONFIG_JSON, GSON.toJson(getProperties()));
615+
properties.put(GOOGLE_DRIVE_FILESYSTEM_DISABLE_CACHE_KEY, Boolean.TRUE.toString());
616+
617+
if (fileSystemProperties != null) {
618+
try {
619+
Map<String, String> userProperties = GSON.fromJson(fileSystemProperties, MAP_STRING_STRING_TYPE);
620+
for (Map.Entry<String, String> entry : userProperties.entrySet()) {
621+
if (entry.getKey() != null && !entry.getKey().isEmpty()) {
622+
// let user override the default properties
623+
properties.put(entry.getKey(), entry.getValue());
624+
}
625+
}
626+
} catch (Exception e) {
627+
String errorMessage = String.format("Unable to parse filesystem properties, %s: %s", e.getClass().getName(),
628+
e.getMessage());
629+
if (collector == null) {
630+
throw new IllegalArgumentException(errorMessage, e);
631+
}
632+
collector.addFailure(errorMessage, "Ensure the properties are in a valid JSON format.");
633+
}
634+
}
635+
return properties;
636+
}
591637
}
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.google.drive.source.fs;
18+
19+
import com.google.api.services.drive.Drive;
20+
import com.google.common.base.Strings;
21+
import io.cdap.plugin.google.common.GoogleDriveClient;
22+
import io.cdap.plugin.google.drive.source.GoogleDriveInputFormatProvider;
23+
import io.cdap.plugin.google.drive.source.GoogleDriveSourceConfig;
24+
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.hadoop.fs.FSDataInputStream;
26+
import org.apache.hadoop.fs.FSDataOutputStream;
27+
import org.apache.hadoop.fs.FileStatus;
28+
import org.apache.hadoop.fs.FileSystem;
29+
import org.apache.hadoop.fs.Path;
30+
import org.apache.hadoop.fs.permission.FsPermission;
31+
import org.apache.hadoop.util.Progressable;
32+
33+
import java.io.FileNotFoundException;
34+
import java.io.IOException;
35+
import java.net.URI;
36+
37+
/**
38+
* A custom Hadoop FileSystem implementation for Google Drive.
39+
* This class provides methods to interact with Google Drive files and directories.
40+
*/
41+
public class GoogleDriveFileSystem extends FileSystem {
42+
private URI uri;
43+
private Path workingDir;
44+
private Drive driveService;
45+
private String filter;
46+
47+
@Override
48+
public void initialize(URI name, Configuration conf) throws IOException {
49+
super.initialize(name, conf);
50+
setConf(conf);
51+
this.uri = URI.create(name.getScheme() + "://" + name.getAuthority());
52+
this.workingDir = new Path("/");
53+
54+
GoogleDriveSourceConfig googleDriveSourceConfig = GoogleDriveInputFormatProvider.extractPropertiesFromConfig(conf);
55+
GoogleDriveClient<GoogleDriveSourceConfig> client = new GoogleDriveClient<>(googleDriveSourceConfig);
56+
57+
// Initialize Google Drive service (using OAuth2 or service account)
58+
this.driveService = client.getDriveClient();
59+
60+
// Initialize filter to be passed down to listStatus
61+
if (!Strings.isNullOrEmpty(googleDriveSourceConfig.getFilter())) {
62+
this.filter = googleDriveSourceConfig.getFilter();
63+
}
64+
}
65+
@Override
66+
public URI getUri() {
67+
return this.uri;
68+
}
69+
70+
@Override
71+
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
72+
return new GoogleDriveInputStream(new GoogleDriveInputStreamWrapper(driveService, f));
73+
}
74+
75+
@Override
76+
public FSDataOutputStream create(Path f,
77+
FsPermission permission,
78+
boolean overwrite,
79+
int bufferSize,
80+
short replication,
81+
long blockSize,
82+
Progressable progress) throws IOException {
83+
throw new UnsupportedOperationException(
84+
"GDrive does not support: FSDataOutputStream create(Path, FsPermission, boolean, int, " +
85+
"short, long, Progressable)");
86+
}
87+
88+
@Override
89+
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
90+
throw new UnsupportedOperationException(
91+
"GDrive does not support: FSDataOutputStream append(Path, int, Progressable)");
92+
}
93+
94+
@Override
95+
public boolean rename(Path src, Path dst) throws IOException {
96+
throw new UnsupportedOperationException("GDrive does not support: boolean rename(Path, Path)");
97+
}
98+
99+
@Override
100+
public boolean delete(Path f, boolean recursive) throws IOException {
101+
throw new UnsupportedOperationException("GDrive does not support: boolean delete(Path, boolean)");
102+
}
103+
104+
@Override
105+
public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
106+
if (isPathDirectory(f)) {
107+
return GoogleDriveUtils.listStatus(driveService, f, filter);
108+
}
109+
return new FileStatus[]{GoogleDriveUtils.listObjectStatus(driveService, f, false, GoogleDriveUtils.getFileId(f))};
110+
}
111+
112+
@Override
113+
public void setWorkingDirectory(Path newDir) {
114+
throw new UnsupportedOperationException("GDrive does not support: void setWorkingDirectory(Path)");
115+
}
116+
117+
@Override
118+
public Path getWorkingDirectory() {
119+
return workingDir;
120+
}
121+
122+
@Override
123+
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
124+
throw new UnsupportedOperationException(
125+
"GDrive does not support: boolean mkdirs(Path, FsPermission)");
126+
}
127+
128+
@Override
129+
public FileStatus getFileStatus(Path f) throws IOException {
130+
if (isPathDirectory(f)) {
131+
return GoogleDriveUtils.listObjectStatus(driveService, f, true, f.getName());
132+
}
133+
return GoogleDriveUtils.listObjectStatus(driveService, f, false, GoogleDriveUtils.getFileId(f));
134+
}
135+
136+
private boolean isPathDirectory(Path path) {
137+
return path != null && path.toString().startsWith(String.format("%s://%s%s/",
138+
GoogleDriveSourceConfig.GOOGLE_DRIVE_SCHEMA,
139+
GoogleDriveSourceConfig.GOOGLE_DRIVE_AUTHORITY,
140+
GoogleDriveSourceConfig.GOOGLE_DRIVE_FOLDER_PATH_PREFIX));
141+
}
142+
143+
}

0 commit comments

Comments
 (0)