Skip to content

Commit 0a94f5e

Browse files
authored
Merge pull request #1146 from data-integrations/feat/add-fqn-to-gcs
CDAP-19623: add fqn support to gcs plugins
2 parents 102723f + 8ce36aa commit 0a94f5e

File tree

2 files changed

+61
-9
lines changed

2 files changed

+61
-9
lines changed

src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,12 @@
4040
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
4141
import io.cdap.cdap.etl.api.connector.Connector;
4242
import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat;
43+
import io.cdap.plugin.common.Asset;
4344
import io.cdap.plugin.common.ConfigUtil;
4445
import io.cdap.plugin.common.Constants;
4546
import io.cdap.plugin.common.IdUtils;
4647
import io.cdap.plugin.common.LineageRecorder;
48+
import io.cdap.plugin.common.ReferenceNames;
4749
import io.cdap.plugin.format.FileFormat;
4850
import io.cdap.plugin.format.plugin.AbstractFileSink;
4951
import io.cdap.plugin.format.plugin.FileSinkProperties;
@@ -84,6 +86,7 @@ public class GCSBatchSink extends AbstractFileSink<GCSBatchSink.GCSBatchSinkConf
8486

8587
private final GCSBatchSinkConfig config;
8688
private String outputPath;
89+
private Asset asset;
8790

8891
public GCSBatchSink(GCSBatchSinkConfig config) {
8992
super(config);
@@ -109,7 +112,6 @@ public ValidatingOutputFormat getOutputFormatForRun(BatchSinkContext context) th
109112

110113
@Override
111114
public void prepareRun(BatchSinkContext context) throws Exception {
112-
super.prepareRun(context);
113115
FailureCollector collector = context.getFailureCollector();
114116
CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(config.cmekKey, context.getArguments().asMap(), collector);
115117
collector.getOrThrowException();
@@ -125,17 +127,30 @@ public void prepareRun(BatchSinkContext context) throws Exception {
125127
null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), isServiceAccountFilePath);
126128
Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
127129
Bucket bucket;
130+
String location;
128131
try {
129132
bucket = storage.get(config.getBucket());
130133
} catch (StorageException e) {
131134
throw new RuntimeException(
132135
String.format("Unable to access or create bucket %s. ", config.getBucket())
133136
+ "Ensure you entered the correct bucket path and have permissions for it.", e);
134137
}
135-
if (bucket == null) {
138+
if (bucket != null) {
139+
location = bucket.getLocation();
140+
} else {
136141
GCPUtils.createBucket(storage, config.getBucket(), config.getLocation(), cmekKeyName);
142+
location = config.getLocation();
137143
}
138144
this.outputPath = getOutputDir(context);
145+
// create asset for lineage
146+
String referenceName = Strings.isNullOrEmpty(config.getReferenceName())
147+
? ReferenceNames.normalizeFqn(config.getPath())
148+
: config.getReferenceName();
149+
asset = Asset.builder(referenceName)
150+
.setFqn(config.getPath()).setLocation(location).build();
151+
152+
// super is called down here to avoid instantiating the lineage recorder with a null asset
153+
super.prepareRun(context);
139154
}
140155

141156
@Override
@@ -154,6 +169,11 @@ protected Map<String, String> getFileSystemProperties(BatchSinkContext context)
154169
return properties;
155170
}
156171

172+
@Override
173+
protected LineageRecorder getLineageRecorder(BatchSinkContext context) {
174+
return new LineageRecorder(context, asset);
175+
}
176+
157177
@Override
158178
protected void recordLineage(LineageRecorder lineageRecorder, List<String> outputFields) {
159179
lineageRecorder.recordWrite("Write", "Wrote to Google Cloud Storage.", outputFields);

src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616

1717
package io.cdap.plugin.gcp.gcs.source;
1818

19+
import com.google.auth.Credentials;
20+
import com.google.cloud.storage.Storage;
21+
import com.google.cloud.storage.StorageException;
1922
import com.google.common.base.Strings;
2023
import com.google.gson.Gson;
2124
import com.google.gson.reflect.TypeToken;
@@ -25,27 +28,22 @@
2528
import io.cdap.cdap.api.annotation.MetadataProperty;
2629
import io.cdap.cdap.api.annotation.Name;
2730
import io.cdap.cdap.api.annotation.Plugin;
28-
import io.cdap.cdap.api.data.schema.Schema;
29-
import io.cdap.cdap.api.plugin.PluginConfig;
3031
import io.cdap.cdap.etl.api.FailureCollector;
3132
import io.cdap.cdap.etl.api.PipelineConfigurer;
3233
import io.cdap.cdap.etl.api.batch.BatchSource;
3334
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
3435
import io.cdap.cdap.etl.api.connector.Connector;
36+
import io.cdap.plugin.common.Asset;
3537
import io.cdap.plugin.common.ConfigUtil;
36-
import io.cdap.plugin.common.Constants;
37-
import io.cdap.plugin.common.IdUtils;
3838
import io.cdap.plugin.common.LineageRecorder;
39-
import io.cdap.plugin.format.FileFormat;
40-
import io.cdap.plugin.format.charset.fixedlength.FixedLengthCharset;
39+
import io.cdap.plugin.common.ReferenceNames;
4140
import io.cdap.plugin.format.input.PathTrackingInputFormat;
4241
import io.cdap.plugin.format.plugin.AbstractFileSource;
4342
import io.cdap.plugin.format.plugin.AbstractFileSourceConfig;
4443
import io.cdap.plugin.format.plugin.FileSourceProperties;
4544
import io.cdap.plugin.gcp.common.GCPConnectorConfig;
4645
import io.cdap.plugin.gcp.common.GCPUtils;
4746
import io.cdap.plugin.gcp.crypto.EncryptedFileSystem;
48-
import io.cdap.plugin.gcp.gcs.Formats;
4947
import io.cdap.plugin.gcp.gcs.GCSPath;
5048
import io.cdap.plugin.gcp.gcs.connector.GCSConnector;
5149

@@ -67,6 +65,7 @@
6765
public class GCSSource extends AbstractFileSource<GCSSource.GCSSourceConfig> {
6866
public static final String NAME = "GCSFile";
6967
private final GCSSourceConfig config;
68+
private Asset asset;
7069

7170
public GCSSource(GCSSourceConfig config) {
7271
super(config);
@@ -78,6 +77,34 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
7877
super.configurePipeline(pipelineConfigurer);
7978
}
8079

80+
@Override
81+
public void prepareRun(BatchSourceContext context) throws Exception {
82+
// Get location of the source for lineage
83+
String location;
84+
String bucketName = GCSPath.from(config.getPath()).getBucket();
85+
Credentials credentials = config.connection.getServiceAccount() == null ?
86+
null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(),
87+
config.connection.isServiceAccountFilePath());
88+
Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
89+
try {
90+
location = storage.get(bucketName).getLocation();
91+
} catch (StorageException e) {
92+
throw new RuntimeException(
93+
String.format("Unable to access bucket %s. ", bucketName)
94+
+ "Ensure you entered the correct bucket path and have permissions for it.", e);
95+
}
96+
97+
// create asset for lineage
98+
String referenceName = Strings.isNullOrEmpty(config.getReferenceName())
99+
? ReferenceNames.normalizeFqn(config.getPath())
100+
: config.getReferenceName();
101+
asset = Asset.builder(referenceName)
102+
.setFqn(config.getPath()).setLocation(location).build();
103+
104+
// super is called down here to avoid instantiating the lineage recorder with a null asset
105+
super.prepareRun(context);
106+
}
107+
81108
@Override
82109
protected Map<String, String> getFileSystemProperties(BatchSourceContext context) {
83110
Map<String, String> properties = GCPUtils.getFileSystemProperties(config.connection, config.getPath(),
@@ -101,6 +128,11 @@ protected Map<String, String> getFileSystemProperties(BatchSourceContext context
101128
return properties;
102129
}
103130

131+
@Override
132+
protected LineageRecorder getLineageRecorder(BatchSourceContext context) {
133+
return new LineageRecorder(context, asset);
134+
}
135+
104136
@Override
105137
protected void recordLineage(LineageRecorder lineageRecorder, List<String> outputFields) {
106138
lineageRecorder.recordRead("Read", String.format("Read%sfrom Google Cloud Storage.",

0 commit comments

Comments
 (0)