Skip to content

Commit bac13f5

Browse files
authored
Merge pull request #501 from data-integrations/cherrypick-plugin-474-015
[PLUGIN-474] Fixed BigQuery Plugins Not Deleting Temporary Files
2 parents 728f283 + 54fb84c commit bac13f5

File tree

4 files changed

+34
-25
lines changed

4 files changed

+34
-25
lines changed

src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public abstract class AbstractBigQuerySink extends BatchSink<StructuredRecord, A
7373

7474
private static final Logger LOG = LoggerFactory.getLogger(AbstractBigQuerySink.class);
7575

76-
private static final String gcsPathFormat = "gs://%s";
76+
private static final String gcsPathFormat = "gs://%s/%s";
7777
private static final String temporaryBucketFormat = gcsPathFormat + "/input/%s-%s";
7878
public static final String RECORDS_UPDATED_METRIC = "records.updated";
7979

@@ -115,17 +115,21 @@ public final void prepareRun(BatchSinkContext context) throws Exception {
115115

116116
@Override
117117
public void onRunFinish(boolean succeeded, BatchSinkContext context) {
118-
if (getConfig().getBucket() == null) {
119-
Path gcsPath = new Path(String.format(gcsPathFormat, uuid.toString()));
120-
try {
121-
FileSystem fs = gcsPath.getFileSystem(baseConfiguration);
122-
if (fs.exists(gcsPath)) {
123-
fs.delete(gcsPath, true);
124-
LOG.debug("Deleted temporary bucket '{}'", gcsPath);
125-
}
126-
} catch (IOException e) {
127-
LOG.warn("Failed to delete bucket '{}': {}", gcsPath, e.getMessage());
118+
Path gcsPath;
119+
String bucket = getConfig().getBucket();
120+
if (bucket == null) {
121+
gcsPath = new Path(String.format(gcsPathFormat, uuid.toString(), uuid.toString()));
122+
} else {
123+
gcsPath = new Path(String.format(gcsPathFormat, bucket, uuid.toString()));
124+
}
125+
try {
126+
FileSystem fs = gcsPath.getFileSystem(baseConfiguration);
127+
if (fs.exists(gcsPath)) {
128+
fs.delete(gcsPath, true);
129+
LOG.debug("Deleted temporary directory '{}'", gcsPath);
128130
}
131+
} catch (IOException e) {
132+
LOG.warn("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage());
129133
}
130134
}
131135

@@ -240,7 +244,7 @@ private Configuration getBaseConfiguration(@Nullable String cmekKey) throws IOEx
240244
* @return full path to temporary bucket
241245
*/
242246
private String getTemporaryGcsPath(String bucket, String tableName) {
243-
return String.format(temporaryBucketFormat, bucket, tableName, uuid);
247+
return String.format(temporaryBucketFormat, bucket, uuid, tableName, uuid);
244248
}
245249

246250
/**
@@ -257,7 +261,7 @@ private String configureBucket() {
257261
// So enable it only when bucket name is not provided.
258262
baseConfiguration.setBoolean("fs.gs.bucket.delete.enable", true);
259263
}
260-
baseConfiguration.set("fs.gs.system.bucket", bucket);
264+
baseConfiguration.set("fs.default.name", String.format(gcsPathFormat, bucket, uuid));
261265
baseConfiguration.setBoolean("fs.gs.impl.disable.cache", true);
262266
baseConfiguration.setBoolean("fs.gs.metadata.cache.enable", false);
263267
return bucket;

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ public void onRunFinish(boolean succeeded, BatchSinkContext context) {
131131
}
132132
}
133133

134-
private void recordMetric(boolean succeeded, BatchSinkContext context) {
134+
void recordMetric(boolean succeeded, BatchSinkContext context) {
135135
if (!succeeded) {
136136
return;
137137
}

src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.avro.generic.GenericData;
5050
import org.apache.hadoop.conf.Configuration;
5151
import org.apache.hadoop.fs.FileSystem;
52+
import org.apache.hadoop.fs.Path;
5253
import org.apache.hadoop.io.LongWritable;
5354
import org.apache.hadoop.io.Text;
5455
import org.apache.hadoop.mapreduce.Job;
@@ -147,7 +148,7 @@ public void prepareRun(BatchSourceContext context) throws Exception {
147148
cmekKey);
148149
}
149150

150-
configuration.set("fs.gs.system.bucket", bucket);
151+
configuration.set("fs.default.name", String.format("gs://%s/%s/", bucket, uuid));
151152
configuration.setBoolean("fs.gs.impl.disable.cache", true);
152153
configuration.setBoolean("fs.gs.metadata.cache.enable", false);
153154

@@ -171,7 +172,7 @@ public void prepareRun(BatchSourceContext context) throws Exception {
171172
configuration.set(BigQueryConstants.CONFIG_VIEW_MATERIALIZATION_DATASET, config.getViewMaterializationDataset());
172173
}
173174

174-
String temporaryGcsPath = String.format("gs://%s/hadoop/input/%s", bucket, uuid);
175+
String temporaryGcsPath = String.format("gs://%s/%s/hadoop/input/%s", bucket, uuid, uuid);
175176
PartitionedBigQueryInputFormat.setTemporaryCloudStorageDirectory(configuration, temporaryGcsPath);
176177
BigQueryConfiguration.configureBigQueryInput(configuration, config.getDatasetProject(),
177178
config.getDataset(), config.getTable());
@@ -210,16 +211,21 @@ public void transform(KeyValue<LongWritable, GenericData.Record> input, Emitter<
210211

211212
@Override
212213
public void onRunFinish(boolean succeeded, BatchSourceContext context) {
213-
org.apache.hadoop.fs.Path gcsPath = new org.apache.hadoop.fs.Path(String.format("gs://%s", uuid.toString()));
214+
org.apache.hadoop.fs.Path gcsPath = null;
215+
String bucket = config.getBucket();
216+
if (bucket == null) {
217+
gcsPath = new Path(String.format("gs://%s/%s", uuid, uuid));
218+
} else {
219+
gcsPath = new Path(String.format("gs://%s/%s", bucket, uuid));
220+
}
214221
try {
215-
if (config.getBucket() == null) {
216-
FileSystem fs = gcsPath.getFileSystem(configuration);
217-
if (fs.exists(gcsPath)) {
218-
fs.delete(gcsPath, true);
219-
}
222+
FileSystem fs = gcsPath.getFileSystem(configuration);
223+
if (fs.exists(gcsPath)) {
224+
fs.delete(gcsPath, true);
225+
LOG.debug("Deleted temporary directory '{}'", gcsPath);
220226
}
221227
} catch (IOException e) {
222-
LOG.warn("Failed to delete bucket " + gcsPath.toUri().getPath() + ", " + e.getMessage());
228+
LOG.warn("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage());
223229
}
224230
}
225231

src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import com.google.cloud.bigquery.TableDefinition;
2929
import com.google.cloud.bigquery.TableId;
3030
import io.cdap.cdap.api.data.schema.Schema;
31-
import io.cdap.cdap.etl.api.FailureCollector;
3231
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
3332
import io.cdap.cdap.etl.api.validation.ValidationException;
3433
import io.cdap.cdap.etl.api.validation.ValidationFailure;
@@ -120,7 +119,7 @@ private void testMetric(Job mockJob, long expectedCount, int invocations)
120119
BigQuerySink sink = getSinkToTest(mockJob);
121120
MockStageMetrics mockStageMetrics = Mockito.spy(new MockStageMetrics("test"));
122121
BatchSinkContext context = getContextWithMetrics(mockStageMetrics);
123-
sink.onRunFinish(true, context);
122+
sink.recordMetric(true, context);
124123
if (expectedCount > -1) {
125124
Assert.assertEquals(expectedCount, mockStageMetrics.getCount(AbstractBigQuerySink.RECORDS_UPDATED_METRIC));
126125
}

0 commit comments

Comments
 (0)