Skip to content

Commit 721e068

Browse files
fixing ups.
1 parent 078a486 commit 721e068

File tree

13 files changed

+85
-163
lines changed

13 files changed

+85
-163
lines changed

amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java

Lines changed: 4 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -77,67 +77,17 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
7777
}
7878
return;
7979
}
80-
if (!sourceConfig.containsMacro(PROPERTY_IMPORT_QUERY_TYPE)) {
81-
82-
String importQueryTypeValue = sourceConfig.getImportQueryType();
83-
84-
boolean isImportQuerySelected = ImportQueryType.IMPORT_QUERY.name().equalsIgnoreCase(importQueryTypeValue);
85-
86-
if (isImportQuerySelected && !sourceConfig.containsMacro(IMPORT_QUERY) &&
87-
Strings.isNullOrEmpty(sourceConfig.getImportQuery())) {
88-
collector.addFailure("Import Query cannot be empty", null)
89-
.withConfigProperty(IMPORT_QUERY);
90-
91-
} else if (!isImportQuerySelected && !sourceConfig.containsMacro(TABLE_NAME) &&
92-
Strings.isNullOrEmpty(sourceConfig.getTableName())) {
93-
collector.addFailure("Table Name cannot be empty", null)
94-
.withConfigProperty(TABLE_NAME);
95-
}
96-
} else {
97-
if (!sourceConfig.containsMacro(IMPORT_QUERY) &&
98-
Strings.isNullOrEmpty(sourceConfig.getImportQuery()) &&
99-
!sourceConfig.containsMacro(TABLE_NAME) &&
100-
Strings.isNullOrEmpty(sourceConfig.getTableName())) {
101-
collector.addFailure("Either 'Import Query' or 'Table Name' must be provided.", null)
102-
.withConfigProperty(IMPORT_QUERY)
103-
.withConfigProperty(TABLE_NAME);
104-
}
105-
}
106-
collector.getOrThrowException();
80+
validateTableNameAndImportQuery(collector);
10781
super.configurePipeline(pipelineConfigurer);
10882
}
10983

11084
@Override
11185
public void prepareRun(BatchSourceContext context) throws Exception {
11286
FailureCollector collector = context.getFailureCollector();
113-
114-
if (!sourceConfig.containsMacro(PROPERTY_IMPORT_QUERY_TYPE)) {
115-
String importQueryTypeValue = sourceConfig.getImportQueryType();
116-
117-
boolean isImportQuerySelected = ImportQueryType.IMPORT_QUERY.name().equalsIgnoreCase(importQueryTypeValue);
118-
119-
if (isImportQuerySelected && !sourceConfig.containsMacro(IMPORT_QUERY) &&
120-
Strings.isNullOrEmpty(sourceConfig.getImportQuery())) {
121-
collector.addFailure("Import Query is empty.", "Specify the Import Query.")
122-
.withConfigProperty(IMPORT_QUERY);
123-
124-
} else if (!isImportQuerySelected && !sourceConfig.containsMacro(TABLE_NAME) &&
125-
Strings.isNullOrEmpty(sourceConfig.getTableName())) {
126-
collector.addFailure("Table Name cannot be empty", "Specify the Table Name.")
127-
.withConfigProperty(TABLE_NAME);
128-
}
129-
} else {
130-
if (!sourceConfig.containsMacro(IMPORT_QUERY) &&
131-
Strings.isNullOrEmpty(sourceConfig.getImportQuery()) &&
132-
!sourceConfig.containsMacro(TABLE_NAME) &&
133-
Strings.isNullOrEmpty(sourceConfig.getTableName())) {
134-
collector.addFailure("Either 'Import Query' or 'Table Name' must be provided.",
135-
"Please provide Either 'Import Query' or 'Table Name'")
136-
.withConfigProperty(IMPORT_QUERY)
137-
.withConfigProperty(TABLE_NAME);
138-
}
87+
if (sourceConfig.containsMacro(TABLE_NAME) || sourceConfig.containsMacro(IMPORT_QUERY)) {
88+
return;
13989
}
140-
collector.getOrThrowException();
90+
validateTableNameAndImportQuery(collector);
14191
super.prepareRun(context);
14292
}
14393

amazon-redshift-plugin/widgets/Redshift-batchsource.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,14 +114,14 @@
114114
"name": "importQueryType",
115115
"widget-attributes": {
116116
"layout": "inline",
117-
"default": "importQuery",
117+
"default": "nativeQuery",
118118
"options": [
119119
{
120-
"id": "importQuery",
120+
"id": "nativeQuery",
121121
"label": "Native Query"
122122
},
123123
{
124-
"id": "tableName",
124+
"id": "namedTable",
125125
"label": "Named Table"
126126
}
127127
]

cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsource.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,14 +133,14 @@
133133
"name": "importQueryType",
134134
"widget-attributes": {
135135
"layout": "inline",
136-
"default": "importQuery",
136+
"default": "nativeQuery",
137137
"options": [
138138
{
139-
"id": "importQuery",
139+
"id": "nativeQuery",
140140
"label": "Native Query"
141141
},
142142
{
143-
"id": "tableName",
143+
"id": "namedTable",
144144
"label": "Named Table"
145145
}
146146
]

cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsource.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,14 +133,14 @@
133133
"name": "importQueryType",
134134
"widget-attributes": {
135135
"layout": "inline",
136-
"default": "importQuery",
136+
"default": "nativeQuery",
137137
"options": [
138138
{
139-
"id": "importQuery",
139+
"id": "nativeQuery",
140140
"label": "Native Query"
141141
},
142142
{
143-
"id": "tableName",
143+
"id": "namedTable",
144144
"label": "Named Table"
145145
}
146146
]

database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSourceConfig.java

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -148,30 +148,19 @@ public void validate(FailureCollector collector) {
148148
TransactionIsolationLevel.validate(getTransactionIsolationLevel(), collector);
149149
}
150150

151-
if (!containsMacro(PROPERTY_IMPORT_QUERY_TYPE)) {
152-
ImportQueryType importQueryType = ImportQueryType.fromString(getImportQueryType());
153-
154-
boolean isImportQuerySelected = importQueryType == ImportQueryType.IMPORT_QUERY;
155-
if (isImportQuerySelected && !containsMacro(IMPORT_QUERY) &&
156-
Strings.isNullOrEmpty(importQuery)) {
157-
collector.addFailure("Import Query is empty.", "Specify the Import Query.")
158-
.withConfigProperty(IMPORT_QUERY);
159-
160-
} else if (!isImportQuerySelected && !containsMacro(TABLE_NAME) &&
161-
Strings.isNullOrEmpty(tableName)) {
162-
collector.addFailure("Import Query is empty.", "Specify the Import Query.")
163-
.withConfigProperty(TABLE_NAME);
151+
if (!containsMacro(PROPERTY_IMPORT_QUERY_TYPE)) {
152+
ImportQueryType importQueryType = ImportQueryType.fromString(getImportQueryType());
153+
boolean isImportQuery = importQueryType == ImportQueryType.IMPORT_QUERY;
154+
155+
if ((isImportQuery && !containsMacro(IMPORT_QUERY) && Strings.isNullOrEmpty(importQuery)) ||
156+
(!isImportQuery && !containsMacro(TABLE_NAME) && Strings.isNullOrEmpty(tableName))) {
157+
collector.addFailure("Import Query cannot be null.", "Please specify the Import Query.")
158+
.withConfigProperty(isImportQuery ? IMPORT_QUERY : TABLE_NAME);
159+
}
164160
}
165-
} else {
166-
if (!containsMacro(IMPORT_QUERY) && Strings.isNullOrEmpty(importQuery) &&
167-
!containsMacro(TABLE_NAME) && Strings.isNullOrEmpty(tableName)) {
168-
collector.addFailure("Import Query is empty.", "Specify the Import Query.")
169-
.withConfigProperty(IMPORT_QUERY)
170-
.withConfigProperty(TABLE_NAME);
171-
}
172-
}
173161

174-
if (!hasOneSplit && !containsMacro(IMPORT_QUERY) && !getImportQuery().contains("$CONDITIONS")) {
162+
163+
if (!hasOneSplit && !containsMacro(IMPORT_QUERY) && !getImportQuery().contains("$CONDITIONS")) {
175164
collector.addFailure(String.format(
176165
"Import Query %s must contain the string '$CONDITIONS'. if Number of Splits is not set to 1.", importQuery),
177166
"Include '$CONDITIONS' in the Import Query")

database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,17 @@
6767
import java.sql.ResultSet;
6868
import java.sql.SQLException;
6969
import java.sql.Statement;
70-
import java.sql.Types;
7170
import java.util.List;
7271
import java.util.Properties;
7372
import java.util.regex.Pattern;
7473
import java.util.stream.Collectors;
7574
import javax.annotation.Nullable;
7675

76+
import static io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig.IMPORT_QUERY;
77+
import static io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig.PROPERTY_IMPORT_QUERY_TYPE;
78+
import static io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig.TABLE_NAME;
79+
80+
7781
/**
7882
* Batch source to read from a DB table
7983
* @param <T> the DB Source config
@@ -431,6 +435,42 @@ private String getJDBCPluginId() {
431435

432436
protected abstract String createConnectionString();
433437

438+
/**
439+
* Validates that either an import query or a table name is provided, according to the selected import query type.
440+
* If the {@code importQueryType} property does not contain a macro, this method checks:
441+
* If {@code importQueryType} is {@code IMPORT_QUERY}, ensures that the {@code importQuery} property is not empty.
442+
* If {@code importQueryType} is {@code TABLE_NAME}, ensures that the {@code tableName} property is not empty.
443+
* If the {@code importQueryType} property contains a macro, this method checks that at least one of
444+
* {@code importQuery} or {@code tableName} is provided.
445+
* Any validation failures are added to the provided {@link FailureCollector}. If any failures are present,
446+
* this method will throw an exception at the end of validation.
447+
* @param collector the {@link FailureCollector} used to collect validation failures
448+
*/
449+
public void validateTableNameAndImportQuery(FailureCollector collector) {
450+
if (!sourceConfig.containsMacro(PROPERTY_IMPORT_QUERY_TYPE)) {
451+
ImportQueryType type = ImportQueryType.fromString(sourceConfig.getImportQueryType());
452+
boolean isImportQuery = type == ImportQueryType.IMPORT_QUERY;
453+
454+
if (isImportQuery && Strings.isNullOrEmpty(sourceConfig.getImportQuery())) {
455+
collector.addFailure("Import Query cannot be empty", "Please specify Import Query!")
456+
.withConfigProperty(IMPORT_QUERY);
457+
} else if (!isImportQuery && Strings.isNullOrEmpty(sourceConfig.getTableName())) {
458+
collector.addFailure("Table Name cannot be empty", "Please specify Table Name!")
459+
.withConfigProperty(TABLE_NAME);
460+
}
461+
} else {
462+
boolean importQueryEmpty = Strings.isNullOrEmpty(sourceConfig.getImportQuery());
463+
boolean tableNameEmpty = Strings.isNullOrEmpty(sourceConfig.getTableName());
464+
if (importQueryEmpty && tableNameEmpty) {
465+
collector.addFailure("Either 'Import Query' or 'Table Name' must be provided.",
466+
"Please specify Either 'ImportQuery' or 'Table Name.")
467+
.withConfigProperty(IMPORT_QUERY)
468+
.withConfigProperty(TABLE_NAME);
469+
}
470+
}
471+
collector.getOrThrowException();
472+
}
473+
434474
/**
435475
* {@link PluginConfig} for {@link AbstractDBSource}
436476
*/

database-commons/src/main/java/io/cdap/plugin/util/ImportQueryType.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
* Enum to specify the import query type used.
2121
*/
2222
public enum ImportQueryType {
23-
IMPORT_QUERY("importQuery"),
24-
TABLE_NAME("tableName");
23+
IMPORT_QUERY("nativeQuery"),
24+
TABLE_NAME("namedTable");
2525

2626
private String value;
2727

mssql-plugin/widgets/SqlServer-batchsource.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,14 +146,14 @@
146146
"name": "importQueryType",
147147
"widget-attributes": {
148148
"layout": "inline",
149-
"default": "importQuery",
149+
"default": "nativeQuery",
150150
"options": [
151151
{
152-
"id": "importQuery",
152+
"id": "nativeQuery",
153153
"label": "Native Query"
154154
},
155155
{
156-
"id": "tableName",
156+
"id": "namedTable",
157157
"label": "Named Table"
158158
}
159159
]

mysql-plugin/widgets/Mysql-batchsource.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,14 +127,14 @@
127127
"name": "importQueryType",
128128
"widget-attributes": {
129129
"layout": "inline",
130-
"default": "importQuery",
130+
"default": "nativeQuery",
131131
"options": [
132132
{
133-
"id": "importQuery",
133+
"id": "nativeQuery",
134134
"label": "Native Query"
135135
},
136136
{
137-
"id": "tableName",
137+
"id": "namedTable",
138138
"label": "Named Table"
139139
}
140140
]

oracle-plugin/widgets/Oracle-batchsource.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -230,14 +230,14 @@
230230
"name": "importQueryType",
231231
"widget-attributes": {
232232
"layout": "inline",
233-
"default": "importQuery",
233+
"default": "nativeQuery",
234234
"options": [
235235
{
236-
"id": "importQuery",
236+
"id": "NativeQuery",
237237
"label": "Native Query"
238238
},
239239
{
240-
"id": "tableName",
240+
"id": "namedTable",
241241
"label": "Named Table"
242242
}
243243
]

0 commit comments

Comments
 (0)