Skip to content

Commit c14e1b4

Browse files
fixing up.
1 parent 467a8cd commit c14e1b4

File tree

6 files changed

+55
-66
lines changed

6 files changed

+55
-66
lines changed

amazon-redshift-plugin/docs/Redshift-batchsource.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ contain the '$CONDITIONS' string. For example, 'SELECT * FROM table WHERE $CONDI
3131
The '$CONDITIONS' string will be replaced by 'splitBy' field limits specified by the bounding query.
3232
The '$CONDITIONS' string is not required if numSplits is set to one.
3333

34-
**ImportQueryType** - determines how data is extracted—either by using a Table Name or a custom Import Query.
34+
**Import Query Type** - Determines how data is extracted—either by using a Table Name or a custom Import Query.
3535

36-
* **TableName**: Extracts data directly from a specified database table.
36+
**Table Name**: Extracts data directly from a specified database table.
3737

3838
**Bounding Query:** Bounding Query should return the min and max of the values of the 'splitBy' field.
3939
For example, 'SELECT MIN(id),MAX(id) FROM table'. Not required if numSplits is set to one.

amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReader.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -90,14 +90,23 @@ public List<Schema.Field> getSchemaFields(ResultSet resultSet) throws SQLExcepti
9090
}
9191
return schemaFields;
9292
}
93+
9394
/**
94-
* Maps database column type information to a corresponding {@link Schema}.
95-
*
96-
* @param typeName the SQL type name
97-
* @param columnType the JDBC type code
98-
* @param precision the column precision
99-
* @param scale the column scale
100-
* @param columnName the column name
95+
* Returns the CDAP {@link Schema} for a database column based on JDBC metadata.
96+
* Handles Redshift-specific and common JDBC types:
97+
* Maps Redshift string types to {@link Schema.Type#STRING}
98+
* Maps "INT" to {@link Schema.Type#INT}
99+
* Maps "BIGINT" to {@link Schema.Type#LONG}.
100+
* Maps NUMERIC with zero precision to {@link Schema.Type#STRING} and logs a warning.
101+
* Maps "timestamp" to {@link Schema.LogicalType#DATETIME}.
102+
* Delegates to the parent plugin for all other types.
103+
* @param typeName SQL type name (e.g. "INT", "BIGINT", "timestamp")
104+
* @param columnType JDBC type code (see {@link java.sql.Types})
105+
* @param precision column precision (for numeric types)
106+
* @param scale column scale (for numeric types)
107+
* @param columnName column name
108+
* @param isSigned whether the column is signed
109+
* @param handleAsDecimal whether to handle as decimal
101110
* @return the mapped {@link Schema} type
102111
*/
103112
@Override

amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -78,26 +78,26 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
7878
return;
7979
}
8080
if (!sourceConfig.containsMacro(PROPERTY_IMPORT_QUERY_TYPE)) {
81-
boolean isImportQuerySelected = ImportQueryType.IMPORT_QUERY.getValue().equals(sourceConfig.importQueryType);
82-
boolean isTableNameSelected = ImportQueryType.TABLE_NAME.getValue().equals(sourceConfig.importQueryType);
81+
82+
String importQueryTypeValue = sourceConfig.getImportQueryType();
83+
84+
boolean isImportQuerySelected = ImportQueryType.IMPORT_QUERY.name().equalsIgnoreCase(importQueryTypeValue);
8385

8486
if (isImportQuerySelected && !sourceConfig.containsMacro(IMPORT_QUERY) &&
8587
Strings.isNullOrEmpty(sourceConfig.getImportQuery())) {
8688
collector.addFailure("Import Query cannot be empty", null)
8789
.withConfigProperty(IMPORT_QUERY);
8890

89-
} else if (isTableNameSelected && !sourceConfig.containsMacro(TABLE_NAME) &&
91+
} else if (!isImportQuerySelected && !sourceConfig.containsMacro(TABLE_NAME) &&
9092
Strings.isNullOrEmpty(sourceConfig.getTableName())) {
9193
collector.addFailure("Table Name cannot be empty", null)
9294
.withConfigProperty(TABLE_NAME);
9395
}
9496
} else {
95-
boolean isImportQueryMissing = !sourceConfig.containsMacro(IMPORT_QUERY) &&
96-
Strings.isNullOrEmpty(sourceConfig.getImportQuery());
97-
boolean isTableNameMissing = !sourceConfig.containsMacro(TABLE_NAME) &&
98-
Strings.isNullOrEmpty(sourceConfig.getTableName());
99-
100-
if (isImportQueryMissing && isTableNameMissing) {
97+
if (!sourceConfig.containsMacro(IMPORT_QUERY) &&
98+
Strings.isNullOrEmpty(sourceConfig.getImportQuery()) &&
99+
!sourceConfig.containsMacro(TABLE_NAME) &&
100+
Strings.isNullOrEmpty(sourceConfig.getTableName())) {
101101
collector.addFailure("Either 'Import Query' or 'Table Name' must be provided.", null)
102102
.withConfigProperty(IMPORT_QUERY)
103103
.withConfigProperty(TABLE_NAME);
@@ -112,26 +112,25 @@ public void prepareRun(BatchSourceContext context) throws Exception {
112112
FailureCollector collector = context.getFailureCollector();
113113

114114
if (!sourceConfig.containsMacro(PROPERTY_IMPORT_QUERY_TYPE)) {
115-
boolean isImportQuerySelected = ImportQueryType.IMPORT_QUERY.getValue().equals(sourceConfig.importQueryType);
116-
boolean isTableNameSelected = ImportQueryType.TABLE_NAME.getValue().equals(sourceConfig.importQueryType);
115+
String importQueryTypeValue = sourceConfig.getImportQueryType();
116+
117+
boolean isImportQuerySelected = ImportQueryType.IMPORT_QUERY.name().equalsIgnoreCase(importQueryTypeValue);
117118

118119
if (isImportQuerySelected && !sourceConfig.containsMacro(IMPORT_QUERY) &&
119120
Strings.isNullOrEmpty(sourceConfig.getImportQuery())) {
120121
collector.addFailure("Import Query cannot be empty", null)
121122
.withConfigProperty(IMPORT_QUERY);
122123

123-
} else if (isTableNameSelected && !sourceConfig.containsMacro(TABLE_NAME) &&
124+
} else if (!isImportQuerySelected && !sourceConfig.containsMacro(TABLE_NAME) &&
124125
Strings.isNullOrEmpty(sourceConfig.getTableName())) {
125126
collector.addFailure("Table Name cannot be empty", null)
126127
.withConfigProperty(TABLE_NAME);
127128
}
128129
} else {
129-
boolean isImportQueryMissing = !sourceConfig.containsMacro(IMPORT_QUERY) &&
130-
Strings.isNullOrEmpty(sourceConfig.getImportQuery());
131-
boolean isTableNameMissing = !sourceConfig.containsMacro(TABLE_NAME) &&
132-
Strings.isNullOrEmpty(sourceConfig.getTableName());
133-
134-
if (isImportQueryMissing && isTableNameMissing) {
130+
if (!sourceConfig.containsMacro(IMPORT_QUERY) &&
131+
Strings.isNullOrEmpty(sourceConfig.getImportQuery()) &&
132+
!sourceConfig.containsMacro(TABLE_NAME) &&
133+
Strings.isNullOrEmpty(sourceConfig.getTableName())) {
135134
collector.addFailure("Either 'Import Query' or 'Table Name' must be provided.", null)
136135
.withConfigProperty(IMPORT_QUERY)
137136
.withConfigProperty(TABLE_NAME);

amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftFailedConnectionTest.java

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.junit.Test;
2121

2222
import java.io.IOException;
23-
import static org.junit.Assert.assertTrue;
2423

2524
public class RedshiftFailedConnectionTest extends DBSpecificFailedConnectionTest {
2625
private static final String JDBC_DRIVER_CLASS_NAME = "com.amazon.redshift.Driver";
@@ -29,23 +28,11 @@ public class RedshiftFailedConnectionTest extends DBSpecificFailedConnectionTest
2928
public void test() throws ClassNotFoundException, IOException {
3029

3130
RedshiftConnector connector = new RedshiftConnector(
32-
new RedshiftConnectorConfig("username", "password", "jdbc", "",
33-
"localhost", "db", 5432));
31+
new RedshiftConnectorConfig("username", "password", "jdbc", "", "localhost", "db", 5432));
3432

35-
String expectedPrefix = "Failed to create connection to database via connection string: " +
36-
"jdbc:redshift://localhost:5432/db and arguments: {user=username}. Error:";
37-
try {
38-
super.test(JDBC_DRIVER_CLASS_NAME, connector, expectedPrefix + " ConnectException: Connection " +
39-
"refused (Connection refused).");
40-
} catch (AssertionError e) {
41-
// Accept either ConnectException or SunCertPathBuilderException
42-
String message = e.getMessage();
43-
assertTrue(
44-
"Expected either ConnectException or SunCertPathBuilderException, but got: " + message,
45-
message.contains("ConnectException: Connection refused") ||
46-
message.contains("SunCertPathBuilderException: unable to find valid certification " +
47-
"path to requested target")
48-
);
49-
}
33+
super.test(JDBC_DRIVER_CLASS_NAME, connector, "Failed to create connection to database via connection string: " +
34+
"jdbc:redshift://localhost:5432/db and arguments: " +
35+
"{user=username}. Error: ConnectException: Connection refused " +
36+
"(Connection refused).");
37+
}
5038
}
51-
}

postgresql-plugin/docs/Postgres-batchsource.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ contain the '$CONDITIONS' string. For example, 'SELECT * FROM table WHERE $CONDI
3838
The '$CONDITIONS' string will be replaced by 'splitBy' field limits specified by the bounding query.
3939
The '$CONDITIONS' string is not required if numSplits is set to one.
4040

41-
**ImportQueryType** - determines how data is extracted—either by using a Table Name or a custom Import Query.
41+
**Import Query Type** - Determines how data is extracted—either by using a Table Name or a custom Import Query.
4242

43-
* **TableName**: Extracts data directly from a specified database table.
43+
**Table Name**: Extracts data directly from a specified database table.
4444

4545
**Bounding Query:** Bounding Query should return the min and max of the values of the 'splitBy' field.
4646
For example, 'SELECT MIN(id),MAX(id) FROM table'. Not required if numSplits is set to one.

postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -75,26 +75,23 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
7575
return;
7676
}
7777
if (!sourceConfig.containsMacro(PROPERTY_IMPORT_QUERY_TYPE)) {
78-
boolean isImportQuerySelected = ImportQueryType.IMPORT_QUERY.getValue().equals(sourceConfig.importQueryType);
79-
boolean isTableNameSelected = ImportQueryType.TABLE_NAME.getValue().equals(sourceConfig.importQueryType);
78+
ImportQueryType importQueryType = ImportQueryType.fromString(sourceConfig.getImportQueryType());
8079

80+
boolean isImportQuerySelected = importQueryType == ImportQueryType.IMPORT_QUERY;
8181
if (isImportQuerySelected && !sourceConfig.containsMacro(IMPORT_QUERY) &&
8282
Strings.isNullOrEmpty(sourceConfig.getImportQuery())) {
8383
collector.addFailure("Import Query cannot be empty", null)
8484
.withConfigProperty(IMPORT_QUERY);
8585

86-
} else if (isTableNameSelected && !sourceConfig.containsMacro(TABLE_NAME) &&
86+
} else if (!isImportQuerySelected && !sourceConfig.containsMacro(TABLE_NAME) &&
8787
Strings.isNullOrEmpty(sourceConfig.getTableName())) {
8888
collector.addFailure("Table Name cannot be empty", null)
8989
.withConfigProperty(TABLE_NAME);
9090
}
9191
} else {
92-
boolean isImportQueryMissing = !sourceConfig.containsMacro(IMPORT_QUERY) &&
93-
Strings.isNullOrEmpty(sourceConfig.getImportQuery());
94-
boolean isTableNameMissing = !sourceConfig.containsMacro(TABLE_NAME) &&
95-
Strings.isNullOrEmpty(sourceConfig.getTableName());
96-
97-
if (isImportQueryMissing && isTableNameMissing) {
92+
if (!sourceConfig.containsMacro(IMPORT_QUERY) &&
93+
Strings.isNullOrEmpty(sourceConfig.getImportQuery()) && !sourceConfig.containsMacro(TABLE_NAME) &&
94+
Strings.isNullOrEmpty(sourceConfig.getTableName())) {
9895
collector.addFailure("Either 'Import Query' or 'Table Name' must be provided.", null)
9996
.withConfigProperty(IMPORT_QUERY)
10097
.withConfigProperty(TABLE_NAME);
@@ -112,26 +109,23 @@ public void prepareRun(BatchSourceContext context) throws Exception {
112109
}
113110

114111
if (!sourceConfig.containsMacro(PROPERTY_IMPORT_QUERY_TYPE)) {
115-
boolean isImportQuerySelected = ImportQueryType.IMPORT_QUERY.getValue().equals(sourceConfig.importQueryType);
116-
boolean isTableNameSelected = ImportQueryType.TABLE_NAME.getValue().equals(sourceConfig.importQueryType);
112+
ImportQueryType importQueryType = ImportQueryType.fromString(sourceConfig.getImportQueryType());
117113

114+
boolean isImportQuerySelected = importQueryType == ImportQueryType.IMPORT_QUERY;
118115
if (isImportQuerySelected && !sourceConfig.containsMacro(IMPORT_QUERY) &&
119116
Strings.isNullOrEmpty(sourceConfig.getImportQuery())) {
120117
collector.addFailure("Import Query cannot be empty", null)
121118
.withConfigProperty(IMPORT_QUERY);
122119

123-
} else if (isTableNameSelected && !sourceConfig.containsMacro(TABLE_NAME) &&
120+
} else if (!isImportQuerySelected && !sourceConfig.containsMacro(TABLE_NAME) &&
124121
Strings.isNullOrEmpty(sourceConfig.getTableName())) {
125122
collector.addFailure("Table Name cannot be empty", null)
126123
.withConfigProperty(TABLE_NAME);
127124
}
128125
} else {
129-
boolean isImportQueryMissing = !sourceConfig.containsMacro(IMPORT_QUERY) &&
130-
Strings.isNullOrEmpty(sourceConfig.getImportQuery());
131-
boolean isTableNameMissing = !sourceConfig.containsMacro(TABLE_NAME) &&
132-
Strings.isNullOrEmpty(sourceConfig.getTableName());
133-
134-
if (isImportQueryMissing && isTableNameMissing) {
126+
if (!sourceConfig.containsMacro(IMPORT_QUERY) &&
127+
Strings.isNullOrEmpty(sourceConfig.getImportQuery()) && !sourceConfig.containsMacro(TABLE_NAME) &&
128+
Strings.isNullOrEmpty(sourceConfig.getTableName())) {
135129
collector.addFailure("Either 'Import Query' or 'Table Name' must be provided.", null)
136130
.withConfigProperty(IMPORT_QUERY)
137131
.withConfigProperty(TABLE_NAME);

0 commit comments

Comments
 (0)