Skip to content

Commit c317668

Browse files
authored
Merge pull request #43 from data-integrations/bugfix_release/fix-gcs-sink
Fix path macro in gcs sink
2 parents 737ef07 + 4553eae commit c317668

File tree

3 files changed

+30
-6
lines changed

3 files changed

+30
-6
lines changed

src/main/java/co/cask/gcp/bigquery/BigQuerySink.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@
3030
import co.cask.cdap.etl.api.batch.BatchSink;
3131
import co.cask.cdap.etl.api.batch.BatchSinkContext;
3232
import co.cask.hydrator.common.LineageRecorder;
33+
import com.google.cloud.bigquery.BigQuery;
34+
import com.google.cloud.bigquery.BigQueryException;
35+
import com.google.cloud.bigquery.DatasetInfo;
3336
import com.google.cloud.bigquery.Field;
3437
import com.google.cloud.bigquery.FieldList;
3538
import com.google.cloud.bigquery.LegacySQLTypeName;
@@ -92,6 +95,16 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
9295

9396
@Override
9497
public void prepareRun(BatchSinkContext context) throws Exception {
98+
BigQuery bigquery = BigQueryUtils.getBigQuery(config.getServiceAccountFilePath(), config.getProject());
99+
// create dataset if dataset does not exist
100+
if (bigquery.getDataset(config.dataset) == null) {
101+
try {
102+
bigquery.create(DatasetInfo.newBuilder(config.dataset).build());
103+
} catch (BigQueryException e) {
104+
throw new RuntimeException("Exception occured while creating dataset " + config.dataset + ".", e);
105+
}
106+
}
107+
95108
validateSchema();
96109

97110
uuid = UUID.randomUUID();

src/main/java/co/cask/gcp/bigquery/BigQueryUtils.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -116,17 +116,25 @@ static Configuration getBigQueryConfig(@Nullable String serviceAccountFilePath,
116116
@Nullable
117117
static Table getBigQueryTable(@Nullable String serviceAccountFilePath, String project,
118118
String dataset, String table) throws IOException {
119-
BigQuery bigquery;
119+
BigQuery bigquery = getBigQuery(serviceAccountFilePath, project);
120+
121+
TableId id = TableId.of(project, dataset, table);
122+
return bigquery.getTable(id);
123+
}
124+
125+
/**
126+
* Get BigQuery service
127+
* @param serviceAccountFilePath service account file path
128+
* @param project BigQuery project ID
129+
*/
130+
static BigQuery getBigQuery(@Nullable String serviceAccountFilePath, String project) throws IOException {
120131
BigQueryOptions.Builder bigqueryBuilder = BigQueryOptions.newBuilder();
121132
if (serviceAccountFilePath != null) {
122133
bigqueryBuilder.setCredentials(loadServiceAccountCredentials(serviceAccountFilePath));
123134
}
124135

125136
bigqueryBuilder.setProjectId(project);
126-
bigquery = bigqueryBuilder.build().getService();
127-
128-
TableId id = TableId.of(project, dataset, table);
129-
return bigquery.getTable(id);
137+
return bigqueryBuilder.build().getService();
130138
}
131139

132140
/**

src/main/java/co/cask/gcp/gcs/sink/GCSBatchSink.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,10 @@ public static class GCSBatchSinkConfig extends GCPReferenceSinkConfig implements
116116
@Override
117117
public void validate() {
118118
super.validate();
119-
GCSConfigHelper.getPath(path);
119+
// validate that path is valid
120+
if (!containsMacro("path")) {
121+
GCSConfigHelper.getPath(path);
122+
}
120123
if (suffix != null && !containsMacro("suffix")) {
121124
new SimpleDateFormat(suffix);
122125
}

0 commit comments

Comments
 (0)