Skip to content

Commit f4d0f7c

Browse files
resolved Pr comments.
1 parent 53874e9 commit f4d0f7c

File tree

7 files changed

+137
-42
lines changed

7 files changed

+137
-42
lines changed

amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReader.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,6 @@ public List<Schema.Field> getSchemaFields(ResultSet resultSet) throws SQLExcepti
9292
return schemaFields;
9393
}
9494

95-
// schema function updation
96-
9795
/**
9896
* Override: Fetches schema fields for a specific table using database metadata.
9997
*/
@@ -102,13 +100,11 @@ public List<Schema.Field> getSchemaFields(Connection connection, String tableNam
102100
DatabaseMetaData dbMetaData = connection.getMetaData();
103101
String schema = null;
104102
String table = tableName;
105-
106103
if (tableName.contains(".")) {
107104
String[] parts = tableName.split("\\.", 2);
108105
schema = parts[0];
109106
table = parts[1];
110107
}
111-
112108
try (ResultSet columns = dbMetaData.getColumns(null, schema, table, null)) {
113109
List<Schema.Field> schemaFields = Lists.newArrayList();
114110
while (columns.next()) {
@@ -118,7 +114,6 @@ public List<Schema.Field> getSchemaFields(Connection connection, String tableNam
118114
int precision = columns.getInt("COLUMN_SIZE");
119115
int scale = columns.getInt("DECIMAL_DIGITS");
120116
int nullable = columns.getInt("NULLABLE");
121-
122117
Schema columnSchema = getSchema(typeName, columnType, precision, scale, columnName);
123118
if (nullable == DatabaseMetaData.columnNullable) {
124119
columnSchema = Schema.nullableOf(columnSchema);
@@ -131,8 +126,16 @@ public List<Schema.Field> getSchemaFields(Connection connection, String tableNam
131126
}
132127

133128
/**
134-
* Helper method for type mapping when fetching schema by table name.
129+
* Maps database column type information to a corresponding {@link Schema}.
130+
*
131+
* @param typeName the SQL type name
132+
* @param columnType the JDBC type code
133+
* @param precision the column precision
134+
* @param scale the column scale
135+
* @param columnName the column name
136+
* @return the mapped {@link Schema} type
135137
*/
138+
136139
public Schema getSchema(String typeName, int columnType, int precision, int scale, String columnName) {
137140
if (STRING_MAPPED_REDSHIFT_TYPES_NAMES.contains(typeName)) {
138141
return Schema.of(Schema.Type.STRING);

amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.cdap.plugin.amazon.redshift;
1818

1919
import com.google.common.annotations.VisibleForTesting;
20+
import com.google.common.base.Strings;
2021
import io.cdap.cdap.api.annotation.Description;
2122
import io.cdap.cdap.api.annotation.Macro;
2223
import io.cdap.cdap.api.annotation.Metadata;
@@ -67,10 +68,10 @@ public RedshiftSource(RedshiftSourceConfig redshiftSourceConfig) {
6768
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
6869
FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
6970
if (!sourceConfig.containsMacro("tableName") && !sourceConfig.containsMacro("importQuery")) {
70-
if ((sourceConfig.getTableName() == null || sourceConfig.getTableName().isEmpty())
71-
&& (sourceConfig.getImportQuery() == null || sourceConfig.getImportQuery().isEmpty())) {
71+
if ((Strings.isNullOrEmpty(sourceConfig.getTableName()))
72+
&& (Strings.isNullOrEmpty(sourceConfig.getImportQuery()))) {
7273
collector.addFailure(
73-
"Either 'tableName' or 'importQuery' must be specified for the PostgreSQL source.",
74+
"Either 'tableName' or 'importQuery' must be specified.",
7475
"Provide a value for either 'tableName' or 'importQuery' in the configuration."
7576
).withConfigProperty(sourceConfig.getTableName()).withConfigProperty(sourceConfig.getImportQuery());
7677
}
@@ -107,11 +108,6 @@ protected LineageRecorder getLineageRecorder(BatchSourceContext context) {
107108
return new LineageRecorder(context, assetBuilder.build());
108109
}
109110

110-
public DatabaseMetaData getDatabaseMetadata(Connection connection) throws SQLException {
111-
return (DatabaseMetaData) connection.getMetaData().getColumns(null,
112-
null, redshiftSourceConfig.getTableName(), null);
113-
}
114-
115111
/**
116112
* Redshift source config.
117113
*/

amazon-redshift-plugin/widgets/Redshift-batchsource.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@
256256
{
257257
"name": "ImportQuery",
258258
"condition": {
259-
"expression": "importQueryType == 'importQuery'"
259+
"expression": "importQueryType != 'tableName'"
260260
},
261261
"show": [
262262
{

database-commons/src/main/java/io/cdap/plugin/db/CommonSchemaReader.java

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -69,15 +69,13 @@ public List<Schema.Field> getSchemaFields(Connection connection, String tableNam
6969
DatabaseMetaData dbMetaData = connection.getMetaData();
7070
String schema = null;
7171
String table = tableName;
72-
7372
// Support schema-qualified table names like "schema.table"
7473
if (tableName != null && tableName.contains(".")) {
7574
String[] parts = tableName.split("\\.", 2);
7675
schema = parts[0];
7776
table = parts[1];
7877
}
79-
String resolvedTableName = resolveTableName(dbMetaData, schema, table);
80-
try (ResultSet columns = dbMetaData.getColumns(null, schema, resolvedTableName, null)) {
78+
try (ResultSet columns = dbMetaData.getColumns(null, schema, table, null)) {
8179
List<Schema.Field> schemaFields = Lists.newArrayList();
8280
while (columns.next()) {
8381
String columnName = columns.getString("COLUMN_NAME");
@@ -100,20 +98,4 @@ public List<Schema.Field> getSchemaFields(Connection connection, String tableNam
10098
return schemaFields;
10199
}
102100
}
103-
/**
104-
* Resolves the actual table name from the database in a case-insensitive way.
105-
*/
106-
private String resolveTableName(DatabaseMetaData dbMetaData, String schema, String userTableName)
107-
throws SQLException {
108-
try (ResultSet tables = dbMetaData.getTables(null, schema, null, new String[]{"TABLE"})) {
109-
while (tables.next()) {
110-
String actualTableName = tables.getString("TABLE_NAME");
111-
if (actualTableName != null && actualTableName.equalsIgnoreCase(userTableName)) {
112-
return actualTableName;
113-
}
114-
}
115-
}
116-
throw new SQLException("Table '" + userTableName + "' not found in schema '" +
117-
(schema == null ? "default" : schema) + "'");
118-
}
119101
}

database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -164,9 +164,9 @@ public Schema getSchema() throws SQLException {
164164
try (Connection connection = getConnection()) {
165165
executeInitQueries(connection, sourceConfig.getInitQueries());
166166
String query = sourceConfig.getImportQuery();
167-
if (query != null && !query.isEmpty()) {
167+
if (!Strings.isNullOrEmpty(query)) {
168168
return loadSchemaFromDB(connection, query);
169-
} else if (sourceConfig.getTableName() != null && !sourceConfig.getTableName().isEmpty()) {
169+
} else if (!Strings.isNullOrEmpty(sourceConfig.getTableName())) {
170170
List<Schema.Field> fields = getSchemaReader().getSchemaFields(connection, sourceConfig.getTableName());
171171
return Schema.recordOf("schema", fields);
172172
} else {
@@ -182,14 +182,9 @@ private Schema loadSchemaFromDB(Connection connection, String query) throws SQLE
182182
query = removeConditionsClause(query);
183183
}
184184
ResultSet resultSet = statement.executeQuery(query);
185-
DatabaseMetaData databaseMetaData = getDatabaseMetadata(connection);
186185
return Schema.recordOf("outputSchema", getSchemaReader().getSchemaFields(resultSet));
187186
}
188187

189-
public DatabaseMetaData getDatabaseMetadata(Connection connection) throws SQLException {
190-
return null;
191-
}
192-
193188
@VisibleForTesting
194189
static String removeConditionsClause(String importQueryString) {
195190
String query = importQueryString;

database-commons/src/test/java/io/cdap/plugin/db/CommonSchemaReaderTest.java

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,14 @@
2525
import org.mockito.Mock;
2626
import org.mockito.junit.MockitoJUnitRunner;
2727

28+
import java.sql.Connection;
29+
import java.sql.DatabaseMetaData;
30+
import java.sql.ResultSet;
2831
import java.sql.ResultSetMetaData;
2932
import java.sql.SQLException;
3033
import java.sql.Types;
3134

35+
import static org.mockito.ArgumentMatchers.any;
3236
import static org.mockito.ArgumentMatchers.eq;
3337
import static org.mockito.Mockito.when;
3438

@@ -40,11 +44,126 @@ public class CommonSchemaReaderTest {
4044
@Mock
4145
ResultSetMetaData metadata;
4246

47+
@Mock
48+
Connection mockConn;
49+
@Mock
50+
DatabaseMetaData mockDbMeta;
51+
@Mock
52+
ResultSet mockColumns;
53+
@Mock
54+
ResultSet mockTables;
55+
56+
4357
@Before
4458
public void before() {
4559
reader = new CommonSchemaReader();
4660
}
4761

62+
/**
63+
* Test: getSchemaFields(Connection, String) with a simple table name.
64+
* This covers the case where the table exists, and two columns are present:
65+
* one NOT NULL integer, one nullable string.
66+
*/
67+
@Test
68+
public void testGetSchemaFieldsWithConnection() throws Exception {
69+
// Setup mocks for DatabaseMetaData and columns ResultSet
70+
when(mockConn.getMetaData()).thenReturn(mockDbMeta);
71+
// Simulate resolveTableName: table exists with name "MYTABLE"
72+
when(mockDbMeta.getTables(any(), any(), any(), eq(new String[]{"TABLE"}))).thenReturn(mockTables);
73+
when(mockTables.next()).thenReturn(true, false);
74+
when(mockTables.getString("TABLE_NAME")).thenReturn("MYTABLE");
75+
76+
// Simulate columns: two columns, one nullable, one not
77+
when(mockDbMeta.getColumns(any(), any(), eq("MYTABLE"), any())).thenReturn(mockColumns);
78+
when(mockColumns.next()).thenReturn(true, true, false);
79+
// Column 1
80+
when(mockColumns.getString("COLUMN_NAME")).thenReturn("id", "name");
81+
when(mockColumns.getString("TYPE_NAME")).thenReturn("INTEGER", "VARCHAR");
82+
when(mockColumns.getInt("DATA_TYPE")).thenReturn(Types.INTEGER, Types.VARCHAR);
83+
when(mockColumns.getInt("COLUMN_SIZE")).thenReturn(10, 255);
84+
when(mockColumns.getInt("DECIMAL_DIGITS")).thenReturn(0, 0);
85+
when(mockColumns.getInt("NULLABLE")).thenReturn(DatabaseMetaData.columnNoNulls, DatabaseMetaData.columnNullable);
86+
87+
// NOTE: In a real test, you may need to mock DBUtils.getSchema if it is static.
88+
// For demonstration, we assume the mapping is correct.
89+
90+
// Run
91+
java.util.List<Schema.Field> fields = reader.getSchemaFields(mockConn, "MYTABLE");
92+
93+
// Verify
94+
Assert.assertEquals(2, fields.size());
95+
Assert.assertEquals("id", fields.get(0).getName());
96+
Assert.assertEquals(Schema.of(Schema.Type.INT), fields.get(0).getSchema());
97+
Assert.assertEquals("name", fields.get(1).getName());
98+
Assert.assertTrue(fields.get(1).getSchema().isNullable());
99+
Assert.assertEquals(Schema.of(Schema.Type.STRING), fields.get(1).getSchema().getNonNullable());
100+
}
101+
102+
/**
103+
* Test: getSchemaFields(Connection, String) with a schema-qualified table name.
104+
* This checks that "myschema.MYTABLE" is parsed and resolved correctly.
105+
*/
106+
@Test
107+
public void testGetSchemaFieldsWithSchemaQualifiedName() throws Exception {
108+
// Setup for schema-qualified table name "myschema.MYTABLE"
109+
when(mockConn.getMetaData()).thenReturn(mockDbMeta);
110+
when(mockDbMeta.getTables(any(), eq("myschema"), any(), eq(new String[]{"TABLE"}))).thenReturn(mockTables);
111+
when(mockTables.next()).thenReturn(true, false);
112+
when(mockTables.getString("TABLE_NAME")).thenReturn("MYTABLE");
113+
114+
when(mockDbMeta.getColumns(any(), eq("myschema"), eq("MYTABLE"), any())).thenReturn(mockColumns);
115+
when(mockColumns.next()).thenReturn(true, false);
116+
when(mockColumns.getString("COLUMN_NAME")).thenReturn("id");
117+
when(mockColumns.getString("TYPE_NAME")).thenReturn("INTEGER");
118+
when(mockColumns.getInt("DATA_TYPE")).thenReturn(Types.INTEGER);
119+
when(mockColumns.getInt("COLUMN_SIZE")).thenReturn(10);
120+
when(mockColumns.getInt("DECIMAL_DIGITS")).thenReturn(0);
121+
when(mockColumns.getInt("NULLABLE")).thenReturn(DatabaseMetaData.columnNoNulls);
122+
123+
java.util.List<Schema.Field> fields = reader.getSchemaFields(mockConn, "myschema.MYTABLE");
124+
Assert.assertEquals(1, fields.size());
125+
Assert.assertEquals("id", fields.get(0).getName());
126+
Assert.assertEquals(Schema.of(Schema.Type.INT), fields.get(0).getSchema());
127+
}
128+
129+
/**
130+
* Test: Nullability logic is correct for columns.
131+
*/
132+
@Test
133+
public void testGetSchemaFieldsHandlesNullability() throws Exception {
134+
when(mockConn.getMetaData()).thenReturn(mockDbMeta);
135+
when(mockDbMeta.getTables(any(), any(), any(), eq(new String[]{"TABLE"}))).thenReturn(mockTables);
136+
when(mockTables.next()).thenReturn(true, false);
137+
when(mockTables.getString("TABLE_NAME")).thenReturn("MYTABLE");
138+
139+
when(mockDbMeta.getColumns(any(), any(), eq("MYTABLE"), any())).thenReturn(mockColumns);
140+
when(mockColumns.next()).thenReturn(true, true, false);
141+
when(mockColumns.getString("COLUMN_NAME")).thenReturn("col1", "col2");
142+
when(mockColumns.getString("TYPE_NAME")).thenReturn("INTEGER", "VARCHAR");
143+
when(mockColumns.getInt("DATA_TYPE")).thenReturn(Types.INTEGER, Types.VARCHAR);
144+
when(mockColumns.getInt("COLUMN_SIZE")).thenReturn(10, 255);
145+
when(mockColumns.getInt("DECIMAL_DIGITS")).thenReturn(0, 0);
146+
when(mockColumns.getInt("NULLABLE")).thenReturn(DatabaseMetaData.columnNullable, DatabaseMetaData.columnNoNulls);
147+
148+
java.util.List<Schema.Field> fields = reader.getSchemaFields(mockConn, "MYTABLE");
149+
Assert.assertTrue(fields.get(0).getSchema().isNullable());
150+
Assert.assertFalse(fields.get(1).getSchema().isNullable());
151+
}
152+
153+
/**
154+
* Test: Exception is thrown when table is not found.
155+
*/
156+
@Test(expected = SQLException.class)
157+
public void testGetSchemaFieldsThrowsWhenTableNotFound() throws Exception {
158+
when(mockConn.getMetaData()).thenReturn(mockDbMeta);
159+
when(mockDbMeta.getTables(any(), any(), any(), eq(new String[]{"TABLE"}))).thenReturn(mockTables);
160+
when(mockTables.next()).thenReturn(false); // Table not found
161+
162+
reader.getSchemaFields(mockConn, "NOTABLE");
163+
}
164+
165+
166+
48167
@Test
49168
public void testGetSchemaHandlesNull() throws SQLException {
50169
when(metadata.getColumnType(eq(1))).thenReturn(Types.NULL);

postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
6565
if ((sourceConfig.getTableName() == null || sourceConfig.getTableName().isEmpty())
6666
&& (sourceConfig.getImportQuery() == null || sourceConfig.getImportQuery().isEmpty())) {
6767
collector.addFailure(
68-
"Either 'tableName' or 'importQuery' must be specified for the PostgreSQL source.",
68+
"Either 'tableName' or 'importQuery' must be specified.",
6969
"Provide a value for either 'tableName' or 'importQuery' in the configuration."
7070
).withConfigProperty(sourceConfig.getTableName()).withConfigProperty(sourceConfig.getImportQuery());
7171
}

0 commit comments

Comments
 (0)