Skip to content

Commit 0de7704

Browse files
psainicsvikasrathee-cs
authored andcommitted
Added BQ Execute Action job label support
1 parent 77d1435 commit 0de7704

File tree

8 files changed

+350
-289
lines changed

8 files changed

+350
-289
lines changed

docs/BigQueryExecute-action.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,14 @@ write BigQuery data to this project.
3030

3131
**SQL**: SQL command to execute.
3232

33+
**BQ Job Labels:** Key value pairs to be added as labels to the BigQuery job. Keys must be unique. (Macro Enabled)
34+
35+
[job_source, type] are system defined labels used by CDAP for internal purpose and cannot be used as label keys.
36+
Macro format is supported. example `key1:val1,key2:val2`
37+
38+
Keys and values can contain only lowercase letters, numeric characters, underscores, and dashes.
39+
For more information about labels, see [Docs](https://cloud.google.com/bigquery/docs/labels-intro#requirements).
40+
3341
**Dialect**: Dialect of the SQL command. The value must be 'legacy' or 'standard'. If set to 'standard',
3442
the query will use BigQuery's standard SQL: https://cloud.google.com/bigquery/sql-reference/.
3543
If set to 'legacy', BigQuery's legacy SQL dialect will be used for this query.

src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public void run(ActionContext context) throws Exception {
125125
}
126126

127127
// Add labels for the BigQuery Execute job.
128-
builder.setLabels(BigQueryUtil.getJobLabels(BigQueryUtil.BQ_JOB_TYPE_EXECUTE_TAG));
128+
builder.setLabels(BigQueryUtil.getJobLabels(BigQueryUtil.BQ_JOB_TYPE_EXECUTE_TAG, config.getJobLabelKeyValue()));
129129

130130
QueryJobConfiguration queryConfig = builder.build();
131131

@@ -205,6 +205,7 @@ public static final class Config extends AbstractBigQueryActionConfig {
205205
private static final String DATASET = "dataset";
206206
private static final String TABLE = "table";
207207
private static final String NAME_LOCATION = "location";
208+
public static final String NAME_BQ_JOB_LABELS = "jobLabels";
208209
private static final int ERROR_CODE_NOT_FOUND = 404;
209210
private static final String STORE_RESULTS = "storeResults";
210211

@@ -272,10 +273,17 @@ public static final class Config extends AbstractBigQueryActionConfig {
272273
@Description("Whether to store results in a BigQuery Table.")
273274
private Boolean storeResults;
274275

276+
@Name(NAME_BQ_JOB_LABELS)
277+
@Macro
278+
@Nullable
279+
@Description("Key value pairs to be added as labels to the BigQuery job. Keys must be unique. [job_source, type] " +
280+
"are reserved keys and cannot be used as label keys.")
281+
protected String jobLabelKeyValue;
282+
275283
private Config(@Nullable String project, @Nullable String serviceAccountType, @Nullable String serviceFilePath,
276284
@Nullable String serviceAccountJson, @Nullable String dataset, @Nullable String table,
277285
@Nullable String location, @Nullable String cmekKey, @Nullable String dialect, @Nullable String sql,
278-
@Nullable String mode, @Nullable Boolean storeResults) {
286+
@Nullable String mode, @Nullable Boolean storeResults, @Nullable String jobLabelKeyValue) {
279287
this.project = project;
280288
this.serviceAccountType = serviceAccountType;
281289
this.serviceFilePath = serviceFilePath;
@@ -288,6 +296,7 @@ private Config(@Nullable String project, @Nullable String serviceAccountType, @N
288296
this.sql = sql;
289297
this.mode = mode;
290298
this.storeResults = storeResults;
299+
this.jobLabelKeyValue = jobLabelKeyValue;
291300
}
292301

293302
public boolean isLegacySQL() {
@@ -328,6 +337,11 @@ public String getTable() {
328337
return table;
329338
}
330339

340+
@Nullable
341+
public String getJobLabelKeyValue() {
342+
return jobLabelKeyValue;
343+
}
344+
331345
@Override
332346
public void validate(FailureCollector failureCollector) {
333347
validate(failureCollector, Collections.emptyMap());
@@ -376,9 +390,17 @@ public void validate(FailureCollector failureCollector, Map<String, String> argu
376390
validateCmekKey(failureCollector, arguments);
377391
}
378392

393+
if (!containsMacro(NAME_BQ_JOB_LABELS)) {
394+
validateJobLabelKeyValue(failureCollector);
395+
}
396+
379397
failureCollector.getOrThrowException();
380398
}
381399

400+
void validateJobLabelKeyValue(FailureCollector failureCollector) {
401+
BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, failureCollector, NAME_BQ_JOB_LABELS);
402+
}
403+
382404
void validateCmekKey(FailureCollector failureCollector, Map<String, String> arguments) {
383405
CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(cmekKey, arguments, failureCollector);
384406
//these fields are needed to check if bucket exists or not and for location validation
@@ -449,6 +471,7 @@ public static class Builder {
449471
private String sql;
450472
private String mode;
451473
private Boolean storeResults;
474+
private String jobLabelKeyValue;
452475

453476
public Builder setProject(@Nullable String project) {
454477
this.project = project;
@@ -505,6 +528,11 @@ public Builder setSql(@Nullable String sql) {
505528
return this;
506529
}
507530

531+
public Builder setJobLabelKeyValue(@Nullable String jobLabelKeyValue) {
532+
this.jobLabelKeyValue = jobLabelKeyValue;
533+
return this;
534+
}
535+
508536
public Config build() {
509537
return new Config(
510538
project,
@@ -518,7 +546,8 @@ public Config build() {
518546
dialect,
519547
sql,
520548
mode,
521-
storeResults
549+
storeResults,
550+
jobLabelKeyValue
522551
);
523552
}
524553

src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySinkConfig.java

Lines changed: 1 addition & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -187,106 +187,8 @@ void validateCmekKey(FailureCollector failureCollector, Map<String, String> argu
187187
validateCmekKeyLocation(cmekKeyName, null, location, failureCollector);
188188
}
189189

190-
/**
191-
* Validates job label key value pairs, as per the following rules:
192-
* Keys and values can contain only lowercase letters, numeric characters, underscores, and dashes.
193-
* Defined in the following link:
194-
* <a href="https://cloud.google.com/bigquery/docs/labels-intro#requirements">Docs</a>
195-
* @param failureCollector failure collector
196-
*/
197190
void validateJobLabelKeyValue(FailureCollector failureCollector) {
198-
Set<String> reservedKeys = BigQueryUtil.BQ_JOB_LABEL_SYSTEM_KEYS;
199-
int maxLabels = 64 - reservedKeys.size();
200-
int maxKeyLength = 63;
201-
int maxValueLength = 63;
202-
203-
String validLabelKeyRegex = "^[\\p{L}][a-z0-9-_\\p{L}]+$";
204-
String validLabelValueRegex = "^[a-z0-9-_\\p{L}]+$";
205-
String capitalLetterRegex = ".*[A-Z].*";
206-
String labelKeyValue = getJobLabelKeyValue();
207-
208-
if (Strings.isNullOrEmpty(labelKeyValue)) {
209-
return;
210-
}
211-
212-
String[] keyValuePairs = labelKeyValue.split(",");
213-
Set<String> uniqueKeys = new HashSet<>();
214-
215-
for (String keyValuePair : keyValuePairs) {
216-
217-
// Adding a label without a value is valid behavior
218-
// Read more here: https://cloud.google.com/bigquery/docs/adding-labels#adding_a_label_without_a_value
219-
String[] keyValue = keyValuePair.trim().split(":");
220-
boolean isKeyPresent = keyValue.length == 1 || keyValue.length == 2;
221-
boolean isValuePresent = keyValue.length == 2;
222-
223-
224-
if (!isKeyPresent) {
225-
failureCollector.addFailure(String.format("Invalid job label key value pair '%s'.", keyValuePair),
226-
"Job label key value pair should be in the format 'key:value'.")
227-
.withConfigProperty(NAME_BQ_JOB_LABELS);
228-
continue;
229-
}
230-
231-
// Check if key is reserved
232-
if (reservedKeys.contains(keyValue[0])) {
233-
failureCollector.addFailure(String.format("Invalid job label key '%s'.", keyValue[0]),
234-
"A system label already exists with same name.").withConfigProperty(NAME_BQ_JOB_LABELS);
235-
continue;
236-
}
237-
238-
String key = keyValue[0];
239-
String value = isValuePresent ? keyValue[1] : "";
240-
boolean isKeyValid = true;
241-
boolean isValueValid = true;
242-
243-
// Key cannot be empty
244-
if (Strings.isNullOrEmpty(key)) {
245-
failureCollector.addFailure(String.format("Invalid job label key '%s'.", key),
246-
"Job label key cannot be empty.").withConfigProperty(NAME_BQ_JOB_LABELS);
247-
isKeyValid = false;
248-
}
249-
250-
// Key cannot be longer than 63 characters
251-
if (key.length() > maxKeyLength) {
252-
failureCollector.addFailure(String.format("Invalid job label key '%s'.", key),
253-
"Job label key cannot be longer than 63 characters.").withConfigProperty(NAME_BQ_JOB_LABELS);
254-
isKeyValid = false;
255-
}
256-
257-
// Value cannot be longer than 63 characters
258-
if (value.length() > maxValueLength) {
259-
failureCollector.addFailure(String.format("Invalid job label value '%s'.", value),
260-
"Job label value cannot be longer than 63 characters.").withConfigProperty(NAME_BQ_JOB_LABELS);
261-
isValueValid = false;
262-
}
263-
264-
if (isKeyValid && (!key.matches(validLabelKeyRegex) || key.matches(capitalLetterRegex))) {
265-
failureCollector.addFailure(String.format("Invalid job label key '%s'.", key),
266-
"Job label key can only contain lowercase letters, numeric characters, " +
267-
"underscores, and dashes. Check docs for more details.")
268-
.withConfigProperty(NAME_BQ_JOB_LABELS);
269-
isKeyValid = false;
270-
}
271-
272-
if (isValuePresent && isValueValid &&
273-
(!value.matches(validLabelValueRegex) || value.matches(capitalLetterRegex))) {
274-
failureCollector.addFailure(String.format("Invalid job label value '%s'.", value),
275-
"Job label value can only contain lowercase letters, numeric characters, " +
276-
"underscores, and dashes.").withConfigProperty(NAME_BQ_JOB_LABELS);
277-
}
278-
279-
if (isKeyValid && !uniqueKeys.add(key)) {
280-
failureCollector.addFailure(String.format("Duplicate job label key '%s'.", key),
281-
"Job label key should be unique.").withConfigProperty(NAME_BQ_JOB_LABELS);
282-
}
283-
}
284-
// Check if number of labels is greater than 64 - reserved keys
285-
if (uniqueKeys.size() > maxLabels) {
286-
failureCollector.addFailure("Number of job labels exceeds the limit.",
287-
String.format("Number of job labels cannot be greater than %d.", maxLabels))
288-
.withConfigProperty(NAME_BQ_JOB_LABELS);
289-
}
191+
BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, failureCollector, NAME_BQ_JOB_LABELS);
290192
}
291193

292194
public String getDatasetProject() {

src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import io.cdap.cdap.etl.api.validation.InvalidConfigPropertyException;
4040
import io.cdap.cdap.etl.api.validation.InvalidStageException;
4141
import io.cdap.cdap.etl.api.validation.ValidationFailure;
42+
import io.cdap.plugin.gcp.bigquery.sink.AbstractBigQuerySinkConfig;
4243
import io.cdap.plugin.gcp.bigquery.sink.BigQuerySink;
4344
import io.cdap.plugin.gcp.bigquery.source.BigQuerySource;
4445
import io.cdap.plugin.gcp.bigquery.source.BigQuerySourceConfig;
@@ -60,6 +61,7 @@
6061
import java.io.IOException;
6162
import java.util.ArrayList;
6263
import java.util.HashMap;
64+
import java.util.HashSet;
6365
import java.util.List;
6466
import java.util.Map;
6567
import java.util.Set;
@@ -915,4 +917,106 @@ public static String getStagingBucketName(Map<String, String> arguments, @Nullab
915917
}
916918
return bucket;
917919
}
920+
921+
/**
922+
* Validates job label key value pairs, as per the following rules:
923+
* Keys and values can contain only lowercase letters, numeric characters, underscores, and dashes.
924+
* Defined in the following link:
925+
* <a href="https://cloud.google.com/bigquery/docs/labels-intro#requirements">Docs</a>
926+
* @param failureCollector failure collector
927+
*/
928+
public static void validateJobLabelKeyValue(String labelKeyValue, FailureCollector failureCollector,
929+
String stageConfigProperty) {
930+
Set<String> reservedKeys = BQ_JOB_LABEL_SYSTEM_KEYS;
931+
int maxLabels = 64 - reservedKeys.size();
932+
int maxKeyLength = 63;
933+
int maxValueLength = 63;
934+
935+
String validLabelKeyRegex = "^[\\p{L}][a-z0-9-_\\p{L}]+$";
936+
String validLabelValueRegex = "^[a-z0-9-_\\p{L}]+$";
937+
String capitalLetterRegex = ".*[A-Z].*";
938+
939+
if (com.google.api.client.util.Strings.isNullOrEmpty(labelKeyValue)) {
940+
return;
941+
}
942+
943+
String[] keyValuePairs = labelKeyValue.split(",");
944+
Set<String> uniqueKeys = new HashSet<>();
945+
946+
for (String keyValuePair : keyValuePairs) {
947+
948+
// Adding a label without a value is valid behavior
949+
// Read more here: https://cloud.google.com/bigquery/docs/adding-labels#adding_a_label_without_a_value
950+
String[] keyValue = keyValuePair.trim().split(":");
951+
boolean isKeyPresent = keyValue.length == 1 || keyValue.length == 2;
952+
boolean isValuePresent = keyValue.length == 2;
953+
954+
955+
if (!isKeyPresent) {
956+
failureCollector.addFailure(String.format("Invalid job label key value pair '%s'.", keyValuePair),
957+
"Job label key value pair should be in the format 'key:value'.")
958+
.withConfigProperty(stageConfigProperty);
959+
continue;
960+
}
961+
962+
// Check if key is reserved
963+
if (reservedKeys.contains(keyValue[0])) {
964+
failureCollector.addFailure(String.format("Invalid job label key '%s'.", keyValue[0]),
965+
"A system label already exists with same name.").withConfigProperty(stageConfigProperty);
966+
continue;
967+
}
968+
969+
String key = keyValue[0];
970+
String value = isValuePresent ? keyValue[1] : "";
971+
boolean isKeyValid = true;
972+
boolean isValueValid = true;
973+
974+
// Key cannot be empty
975+
if (com.google.api.client.util.Strings.isNullOrEmpty(key)) {
976+
failureCollector.addFailure(String.format("Invalid job label key '%s'.", key),
977+
"Job label key cannot be empty.").withConfigProperty(stageConfigProperty);
978+
isKeyValid = false;
979+
}
980+
981+
// Key cannot be longer than 63 characters
982+
if (key.length() > maxKeyLength) {
983+
failureCollector.addFailure(String.format("Invalid job label key '%s'.", key),
984+
"Job label key cannot be longer than 63 characters.").withConfigProperty(stageConfigProperty);
985+
isKeyValid = false;
986+
}
987+
988+
// Value cannot be longer than 63 characters
989+
if (value.length() > maxValueLength) {
990+
failureCollector.addFailure(String.format("Invalid job label value '%s'.", value),
991+
"Job label value cannot be longer than 63 characters.").withConfigProperty(stageConfigProperty);
992+
isValueValid = false;
993+
}
994+
995+
if (isKeyValid && (!key.matches(validLabelKeyRegex) || key.matches(capitalLetterRegex))) {
996+
failureCollector.addFailure(String.format("Invalid job label key '%s'.", key),
997+
"Job label key can only contain lowercase letters, numeric characters, " +
998+
"underscores, and dashes. Check docs for more details.")
999+
.withConfigProperty(stageConfigProperty);
1000+
isKeyValid = false;
1001+
}
1002+
1003+
if (isValuePresent && isValueValid &&
1004+
(!value.matches(validLabelValueRegex) || value.matches(capitalLetterRegex))) {
1005+
failureCollector.addFailure(String.format("Invalid job label value '%s'.", value),
1006+
"Job label value can only contain lowercase letters, numeric characters, " +
1007+
"underscores, and dashes.").withConfigProperty(stageConfigProperty);
1008+
}
1009+
1010+
if (isKeyValid && !uniqueKeys.add(key)) {
1011+
failureCollector.addFailure(String.format("Duplicate job label key '%s'.", key),
1012+
"Job label key should be unique.").withConfigProperty(stageConfigProperty);
1013+
}
1014+
}
1015+
// Check if number of labels is greater than 64 - reserved keys
1016+
if (uniqueKeys.size() > maxLabels) {
1017+
failureCollector.addFailure("Number of job labels exceeds the limit.",
1018+
String.format("Number of job labels cannot be greater than %d.", maxLabels))
1019+
.withConfigProperty(stageConfigProperty);
1020+
}
1021+
}
9181022
}

0 commit comments

Comments
 (0)