Skip to content

Commit 9e6d1de

Browse files
authored
Merge pull request #49 from data-integrations/bugfix_release/CDAP-14490-spanner-sink
CDAP-14490 auto create spanner table and database if does not exist
2 parents 01c7424 + bc0135d commit 9e6d1de

File tree

14 files changed

+450
-50
lines changed

14 files changed

+450
-50
lines changed

docs/Spanner-batchsink.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,15 @@ It can be found on the Dashboard in the Google Cloud Platform Console.
3030
Instance is an allocation of resources that is used by Cloud Spanner databases created in that instance.
3131

3232
**Database Name**: Database the Spanner table belongs to.
33-
Spanner database is contained within a specific Spanner instance.
33+
Spanner database is contained within a specific Spanner instance. If the database does not exist, it will get created.
3434

3535
**Table Name**: Table to write to. A table contains individual records organized in rows.
3636
Each record is composed of columns (also called fields).
3737
Every table is defined by a schema that describes the column names, data types, and other information.
38+
If the table does not exist, it will get created.
39+
40+
**Primary Key**: If the table does not exist, a primary key must be provided in order to auto-create the table.
41+
The key can be a composite key of multiple fields in the schema. This is not required if the table already exists.
3842

3943
**Service Account File Path**: Path on the local file system of the service account key used for
4044
authorization. Can be set to 'auto-detect' when running on a Dataproc cluster.

src/main/java/co/cask/gcp/bigquery/BigQuerySink.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,12 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
9696
@Override
9797
public void prepareRun(BatchSinkContext context) throws Exception {
9898
BigQuery bigquery = BigQueryUtils.getBigQuery(config.getServiceAccountFilePath(), config.getProject());
99-
// create dataset if dataset does not exist
100-
if (bigquery.getDataset(config.dataset) == null) {
99+
// create dataset if it does not exist
100+
if (bigquery.getDataset(config.getDataset()) == null) {
101101
try {
102-
bigquery.create(DatasetInfo.newBuilder(config.dataset).build());
102+
bigquery.create(DatasetInfo.newBuilder(config.getDataset()).build());
103103
} catch (BigQueryException e) {
104-
throw new RuntimeException("Exception occured while creating dataset " + config.dataset + ".", e);
104+
throw new RuntimeException("Exception occurred while creating dataset " + config.getDataset() + ".", e);
105105
}
106106
}
107107

@@ -120,8 +120,8 @@ public void prepareRun(BatchSinkContext context) throws Exception {
120120
fields.add(tableFieldSchema);
121121
}
122122

123-
String bucket = config.bucket;
124-
if (config.bucket == null) {
123+
String bucket = config.getBucket();
124+
if (config.getBucket() == null) {
125125
bucket = uuid.toString();
126126
// By default, this option is false, meaning the job can not delete the bucket. So enable it only when bucket name
127127
// is not provided.
@@ -135,7 +135,7 @@ public void prepareRun(BatchSinkContext context) throws Exception {
135135

136136
BigQueryOutputConfiguration.configure(
137137
configuration,
138-
String.format("%s.%s", config.dataset, config.table),
138+
String.format("%s.%s", config.getDataset(), config.getTable()),
139139
new BigQueryTableSchema().setFields(fields),
140140
temporaryGcsPath,
141141
BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
@@ -165,7 +165,7 @@ public void transform(StructuredRecord input, Emitter<KeyValue<JsonObject, NullW
165165

166166
@Override
167167
public void onRunFinish(boolean succeeded, BatchSinkContext context) {
168-
if (config.bucket == null) {
168+
if (config.getBucket() == null) {
169169
Path gcsPath = new Path(String.format("gs://%s", uuid.toString()));
170170
try {
171171
FileSystem fs = gcsPath.getFileSystem(configuration);
@@ -216,7 +216,7 @@ private LegacySQLTypeName getTableDataType(Schema schema) {
216216
}
217217

218218
private void setOutputFormat(BatchSinkContext context) {
219-
context.addOutput(Output.of(config.referenceName, new OutputFormatProvider() {
219+
context.addOutput(Output.of(config.getReferenceName(), new OutputFormatProvider() {
220220
@Override
221221
public String getOutputFormatClassName() {
222222
return IndirectBigQueryOutputFormat.class.getName();
@@ -234,7 +234,7 @@ public Map<String, String> getOutputFormatConfiguration() {
234234
}
235235

236236
private void emitLineage(BatchSinkContext context, List<BigQueryTableFieldSchema> fields) {
237-
LineageRecorder lineageRecorder = new LineageRecorder(context, config.referenceName);
237+
LineageRecorder lineageRecorder = new LineageRecorder(context, config.getReferenceName());
238238
lineageRecorder.createExternalDataset(config.getSchema());
239239

240240
if (!fields.isEmpty()) {
@@ -302,7 +302,7 @@ private static void decodeSimpleTypes(JsonObject json, String name, StructuredRe
302302
*/
303303
private void validateSchema() throws IOException {
304304
Table table = BigQueryUtils.getBigQueryTable(config.getServiceAccountFilePath(), config.getProject(),
305-
config.dataset, config.table);
305+
config.getDataset(), config.getTable());
306306
if (table == null) {
307307
// Table does not exist, so no further validation is required.
308308
return;
@@ -322,7 +322,8 @@ private void validateSchema() throws IOException {
322322
if (!diff.isEmpty()) {
323323
throw new IllegalArgumentException(
324324
String.format("The output schema does not match the BigQuery table schema for '%s.%s' table. " +
325-
"The table does not contain the '%s' column(s).", config.dataset, config.table, diff));
325+
"The table does not contain the '%s' column(s).",
326+
config.getDataset(), config.getTable(), diff));
326327
}
327328

328329
// validate the missing columns in output schema are nullable fields in bigquery
@@ -332,14 +333,15 @@ private void validateSchema() throws IOException {
332333
throw new IllegalArgumentException(
333334
String.format("The output schema does not match the BigQuery table schema for '%s.%s'. " +
334335
"The table requires column '%s', which is not in the output schema.",
335-
config.dataset, config.table, field));
336+
config.getDataset(), config.getTable(), field));
336337
}
337338
}
338339

339340
// Match output schema field type with bigquery column type
340341
for (Schema.Field field : config.getSchema().getFields()) {
341342
validateSimpleTypes(field);
342-
BigQueryUtils.validateFieldSchemaMatches(bqFields.get(field.getName()), field, config.dataset, config.table);
343+
BigQueryUtils.validateFieldSchemaMatches(bqFields.get(field.getName()),
344+
field, config.getDataset(), config.getTable());
343345
}
344346
}
345347

src/main/java/co/cask/gcp/bigquery/BigQuerySinkConfig.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import co.cask.cdap.api.annotation.Description;
2020
import co.cask.cdap.api.annotation.Macro;
21-
import co.cask.cdap.api.annotation.Name;
2221
import co.cask.cdap.api.data.schema.Schema;
2322
import co.cask.gcp.common.GCPReferenceSinkConfig;
2423

@@ -33,25 +32,47 @@ public final class BigQuerySinkConfig extends GCPReferenceSinkConfig {
3332
@Macro
3433
@Description("The dataset to write to. A dataset is contained within a specific project. "
3534
+ "Datasets are top-level containers that are used to organize and control access to tables and views.")
36-
public String dataset;
35+
private String dataset;
3736

3837
@Macro
3938
@Description("The table to write to. A table contains individual records organized in rows. "
4039
+ "Each record is composed of columns (also called fields). "
4140
+ "Every table is defined by a schema that describes the column names, data types, and other information.")
42-
public String table;
41+
private String table;
4342

4443
@Macro
4544
@Nullable
4645
@Description("The Google Cloud Storage bucket to store temporary data in. "
4746
+ "It will be automatically created if it does not exist, but will not be automatically deleted. "
4847
+ "Cloud Storage data will be deleted after it is loaded into BigQuery. " +
4948
"If it is not provided, a unique bucket will be created and then deleted after the run finishes.")
50-
public String bucket;
49+
private String bucket;
5150

5251
@Macro
5352
@Description("The schema of the data to write. Must be compatible with the table schema.")
54-
public String schema;
53+
private String schema;
54+
55+
public BigQuerySinkConfig(String referenceName, String dataset, String table,
56+
@Nullable String bucket, String schema) {
57+
this.referenceName = referenceName;
58+
this.dataset = dataset;
59+
this.table = table;
60+
this.bucket = bucket;
61+
this.schema = schema;
62+
}
63+
64+
public String getDataset() {
65+
return dataset;
66+
}
67+
68+
public String getTable() {
69+
return table;
70+
}
71+
72+
@Nullable
73+
public String getBucket() {
74+
return bucket;
75+
}
5576

5677
/**
5778
* @return the schema of the dataset

src/main/java/co/cask/gcp/common/GCPReferenceSinkConfig.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,16 @@ public class GCPReferenceSinkConfig extends GCPConfig {
2828
@Name("referenceName")
2929
@Description("This will be used to uniquely identify this sink for lineage, annotating metadata, etc.")
3030
@Macro
31-
public String referenceName;
31+
protected String referenceName;
3232

3333
/**
3434
* Validates the given referenceName to consists of characters allowed to represent a dataset.
3535
*/
3636
public void validate() {
3737
IdUtils.validateId(referenceName);
3838
}
39+
40+
public String getReferenceName() {
41+
return referenceName;
42+
}
3943
}

src/main/java/co/cask/gcp/gcs/sink/GCSBatchSink.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,16 @@ public static class GCSBatchSinkConfig extends GCPReferenceSinkConfig implements
113113
@Nullable
114114
private String schema;
115115

116+
public GCSBatchSinkConfig(String referenceName, String path, @Nullable String suffix, String format,
117+
@Nullable String delimiter, @Nullable String schema) {
118+
this.referenceName = referenceName;
119+
this.path = path;
120+
this.suffix = suffix;
121+
this.format = format;
122+
this.delimiter = delimiter;
123+
this.schema = schema;
124+
}
125+
116126
@Override
117127
public void validate() {
118128
super.validate();
@@ -129,11 +139,6 @@ public void validate() {
129139
getSchema();
130140
}
131141

132-
@Override
133-
public String getReferenceName() {
134-
return referenceName;
135-
}
136-
137142
@Override
138143
public String getPath() {
139144
return GCSConfigHelper.getPath(path).toString();

src/main/java/co/cask/gcp/publisher/GooglePublisher.java

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,12 @@ public void prepareRun(BatchSinkContext context) throws IOException {
101101
}
102102

103103
Schema inputSchema = context.getInputSchema();
104-
LineageRecorder lineageRecorder = new LineageRecorder(context, config.referenceName);
104+
LineageRecorder lineageRecorder = new LineageRecorder(context, config.getReferenceName());
105105
lineageRecorder.createExternalDataset(inputSchema);
106106

107107
Configuration configuration = new Configuration();
108108
PubSubOutputFormat.configure(configuration, config);
109-
context.addOutput(Output.of(config.referenceName,
109+
context.addOutput(Output.of(config.getReferenceName(),
110110
new SinkOutputFormatProvider(PubSubOutputFormat.class, configuration)));
111111

112112
// record field level lineage information
@@ -129,38 +129,51 @@ public void transform(StructuredRecord input, Emitter<KeyValue<NullWritable, Tex
129129
public static class Config extends GCPReferenceSinkConfig {
130130
@Description("Cloud Pub/Sub topic to publish records to")
131131
@Macro
132-
public String topic;
132+
private String topic;
133133

134134
// batching options
135135
@Description("Maximum count of messages in a batch. The default value is 100.")
136136
@Macro
137137
@Nullable
138-
public Long messageCountBatchSize;
138+
private Long messageCountBatchSize;
139139

140140
@Description("Maximum size of a batch in kilo bytes. The default value is 1KB.")
141141
@Macro
142142
@Nullable
143-
public Long requestThresholdKB;
143+
private Long requestThresholdKB;
144144

145145
@Description("Maximum delay in milli-seconds for publishing the batched messages. The default value is 1 ms.")
146146
@Macro
147147
@Nullable
148-
public Long publishDelayThresholdMillis;
148+
private Long publishDelayThresholdMillis;
149149

150150
@Description("Maximum number of message publishing failures to tolerate per partition " +
151151
"before the pipeline will be failed. The default value is 0.")
152152
@Macro
153153
@Nullable
154-
public Long errorThreshold;
154+
private Long errorThreshold;
155155

156156
@Description("Maximum amount of time in seconds to spend retrying publishing failures. " +
157157
"The default value is 30 seconds.")
158158
@Macro
159159
@Nullable
160-
public Integer retryTimeoutSeconds;
161-
160+
private Integer retryTimeoutSeconds;
161+
162+
163+
public Config(String referenceName, String topic, @Nullable Long messageCountBatchSize,
164+
@Nullable Long requestThresholdKB, @Nullable Long publishDelayThresholdMillis,
165+
@Nullable Long errorThreshold, @Nullable Integer retryTimeoutSeconds) {
166+
this.referenceName = referenceName;
167+
this.topic = topic;
168+
this.messageCountBatchSize = messageCountBatchSize;
169+
this.requestThresholdKB = requestThresholdKB;
170+
this.publishDelayThresholdMillis = publishDelayThresholdMillis;
171+
this.errorThreshold = errorThreshold;
172+
this.retryTimeoutSeconds = retryTimeoutSeconds;
173+
}
162174

163175
public void validate() {
176+
super.validate();
164177
if (!containsMacro("messageCountBatchSize") && messageCountBatchSize != null && messageCountBatchSize < 1) {
165178
throw new IllegalArgumentException("Maximum count of messages in a batch should be positive for Pub/Sub");
166179
}
@@ -199,5 +212,14 @@ public long getErrorThreshold() {
199212
public int getRetryTimeoutSeconds() {
200213
return retryTimeoutSeconds == null ? 30 : retryTimeoutSeconds;
201214
}
215+
216+
public String getTopic() {
217+
return topic;
218+
}
219+
220+
@Nullable
221+
public Long getRequestThresholdKB() {
222+
return requestThresholdKB;
223+
}
202224
}
203225
}

src/main/java/co/cask/gcp/publisher/PubSubOutputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public static void configure(Configuration configuration, GooglePublisher.Config
6767
}
6868
String projectId = config.getProject();
6969
configuration.set(PROJECT, projectId);
70-
configuration.set(TOPIC, config.topic);
70+
configuration.set(TOPIC, config.getTopic());
7171
configuration.set(COUNT_BATCH_SIZE, String.valueOf(config.getMessageCountBatchSize()));
7272
configuration.set(REQUEST_BYTES_THRESHOLD, String.valueOf(config.getRequestBytesThreshold()));
7373
configuration.set(DELAY_THRESHOLD, String.valueOf(config.getPublishDelayThresholdMillis()));

0 commit comments

Comments
 (0)