2020import com .google .common .base .Throwables ;
2121import com .google .common .collect .ImmutableMap ;
2222import com .google .common .collect .Sets ;
23-
2423import io .cdap .cdap .api .artifact .ArtifactSummary ;
2524import io .cdap .cdap .api .plugin .PluginClass ;
2625import io .cdap .cdap .datapipeline .DataPipelineApp ;
3231import io .cdap .plugin .db .batch .DatabasePluginTestBase ;
3332import io .cdap .plugin .db .batch .sink .ETLDBOutputFormat ;
3433import io .cdap .plugin .db .batch .source .DataDrivenETLDBInputFormat ;
35-
3634import org .junit .AfterClass ;
3735import org .junit .BeforeClass ;
3836import org .junit .ClassRule ;
4442import java .sql .Driver ;
4543import java .sql .DriverManager ;
4644import java .sql .PreparedStatement ;
47- import java .sql .ResultSet ;
48- import java .sql .ResultSetMetaData ;
4945import java .sql .SQLException ;
5046import java .sql .Statement ;
5147import java .sql .Time ;
5248import java .sql .Timestamp ;
53- import java .util .Arrays ;
5449import java .util .Calendar ;
5550import java .util .Collections ;
5651import java .util .Map ;
@@ -97,9 +92,9 @@ public static void setupTest() throws Exception {
9792 setupBatchArtifacts (DATAPIPELINE_ARTIFACT_ID , DataPipelineApp .class );
9893
9994 addPluginArtifact (NamespaceId .DEFAULT .artifact (JDBC_DRIVER_NAME , "1.0.0" ),
100- DATAPIPELINE_ARTIFACT_ID ,
101- NetezzaSource .class , NetezzaSink .class , DBRecord .class , ETLDBOutputFormat .class ,
102- DataDrivenETLDBInputFormat .class , DBRecord .class , NetezzaPostAction .class , NetezzaAction .class );
95+ DATAPIPELINE_ARTIFACT_ID ,
96+ NetezzaSource .class , NetezzaSink .class , DBRecord .class , ETLDBOutputFormat .class ,
97+ DataDrivenETLDBInputFormat .class , DBRecord .class , NetezzaPostAction .class , NetezzaAction .class );
10398
10499 connectionUrl = "jdbc:netezza://" + BASE_PROPS .get (ConnectionConfig .HOST ) + ":" +
105100 BASE_PROPS .get (ConnectionConfig .PORT ) + "/" + BASE_PROPS .get (ConnectionConfig .DATABASE );
@@ -108,11 +103,11 @@ public static void setupTest() throws Exception {
108103
109104 // add netezza 3rd party plugin
110105 PluginClass netezzaDriver = new PluginClass (ConnectionConfig .JDBC_PLUGIN_TYPE , JDBC_DRIVER_NAME ,
111- "netezza driver class" , aClass .getCanonicalName (),
112- null , Collections .emptyMap ());
106+ "netezza driver class" , aClass .getCanonicalName (),
107+ null , Collections .emptyMap ());
113108 addPluginArtifact (NamespaceId .DEFAULT .artifact ("netezza-jdbc-connector" , "1.0.0" ),
114- DATAPIPELINE_ARTIFACT_ID ,
115- Sets .newHashSet (netezzaDriver ), aClass );
109+ DATAPIPELINE_ARTIFACT_ID ,
110+ Sets .newHashSet (netezzaDriver ), aClass );
116111
117112 TimeZone .setDefault (TimeZone .getTimeZone ("UTC" ));
118113
@@ -128,64 +123,85 @@ protected static void createTestTables(Connection conn) throws SQLException {
128123 // create a table that the action will truncate at the end of the run
129124 stmt .execute ("CREATE TABLE post_action_test (x int, day varchar(10))" );
130125
131- stmt .execute ("create table MY_TABLE (INTEGER_COL INTEGER, BYTEINT_COL BYTEINT, SMALLINT_COL SMALLINT, " +
132- "BIGINT_COL BIGINT, REAL_COL REAL, REAL_FLOAT_COL FLOAT(1), DOUBLE_FLOAT_COL FLOAT(7), " +
133- "DOUBLE_PRECISION_COL DOUBLE PRECISION, NUMERIC_COL NUMERIC(" + PRECISION + "," + SCALE + "), " +
134- "DECIMAL_COL DECIMAL(" + PRECISION + "," + SCALE + "), CHAR_COL CHAR(40), VARCHAR_COL VARCHAR(40), " +
135- "NCHAR_COL NCHAR(40), NVARCHAR_COL NVARCHAR(40), VARBINARY_COL BINARY VARYING(10), " +
136- "ST_GEOMETRY_COL ST_GEOMETRY(10), DATE_COL DATE, TIME_COL TIME, TIMETZ_COL TIMETZ, TIMESTAMP_COL TIMESTAMP, " +
137- "INTERVAL_COL INTERVAL, BOOLEAN_COL BOOLEAN)" );
126+ stmt .execute ("create table MY_TABLE (" +
127+ "INTEGER_COL INTEGER, " +
128+ "BYTEINT_COL BYTEINT, " +
129+ "SMALLINT_COL SMALLINT, " +
130+ "BIGINT_COL BIGINT, " +
131+ "REAL_COL REAL, " +
132+ "REAL_FLOAT_COL FLOAT(1), " +
133+ "DOUBLE_FLOAT_COL FLOAT(7), " +
134+ "DOUBLE_PRECISION_COL DOUBLE PRECISION, " +
135+ "NUMERIC_COL NUMERIC(" + PRECISION + "," + SCALE + "), " +
136+ "DECIMAL_COL DECIMAL(" + PRECISION + "," + SCALE + "), " +
137+ "CHAR_COL CHAR(40), " +
138+ "VARCHAR_COL VARCHAR(40), " +
139+ "NCHAR_COL NCHAR(40), " +
140+ "NVARCHAR_COL NVARCHAR(40), " +
141+ "VARBINARY_COL BINARY VARYING(10), " +
142+ "ST_GEOMETRY_COL ST_GEOMETRY(10), " +
143+ "DATE_COL DATE, " +
144+ "TIME_COL TIME, " +
145+ "TIMETZ_COL TIMETZ, " +
146+ "TIMESTAMP_COL TIMESTAMP, " +
147+ "INTERVAL_COL INTERVAL, " +
148+ "BOOLEAN_COL BOOLEAN)" );
138149
139150 stmt .execute ("CREATE TABLE MY_DEST_TABLE AS SELECT * FROM my_table" );
140151 stmt .execute ("CREATE TABLE YOUR_TABLE AS SELECT * FROM my_table" );
141152 }
142153 }
143154
144155 protected static void prepareTestData (Connection conn ) throws SQLException {
145- try (Statement stmt = conn .createStatement ()) {
156+ try (
157+ Statement stmt = conn .createStatement ();
158+ PreparedStatement pStmt1 =
159+ conn .prepareStatement ("INSERT INTO my_table " +
160+ "VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?," +
161+ " ?, ?, ?, ?, ?, ?, ?, ?, ?, ?," +
162+ " ?, ?)" );
163+ PreparedStatement pStmt2 =
164+ conn .prepareStatement ("INSERT INTO your_table " +
165+ "VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?," +
166+ " ?, ?, ?, ?, ?, ?, ?, ?, ?, ?," +
167+ " ?, ?)" )) {
168+
146169 stmt .execute ("insert into db_action_test values (1, '1970-01-01')" );
147170 stmt .execute ("insert into post_action_test values (1, '1970-01-01')" );
148171
149- for (String tableName : Arrays .asList ("MY_TABLE" , "YOUR_TABLE" )) {
150- for (int i = 1 ; i <= 5 ; i ++) {
151- String name = "user" + i ;
152- try (PreparedStatement pStmt = conn .prepareStatement ("insert into " + tableName + " values(" +
153- "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" )) {
154- pStmt .setString (11 , name );
155- pStmt .setString (12 , name );
156- pStmt .setString (13 , name );
157- pStmt .setString (14 , name );
158- pStmt .setBytes (15 , name .getBytes (Charsets .UTF_8 ));
159- pStmt .setBytes (16 , name .getBytes (Charsets .UTF_8 ));
160-
161- pStmt .setInt (1 , i );
162- pStmt .setInt (2 , i );
163- pStmt .setInt (3 , i );
164- pStmt .setLong (4 , i );
165-
166- pStmt .setFloat (5 , 123.45f + i );
167- pStmt .setFloat (6 , 123.45f + i );
168- pStmt .setDouble (7 , 123.45 + i );
169- pStmt .setDouble (8 , 123.45 + i );
170- pStmt .setBigDecimal (
171- 9 ,
172- new BigDecimal (123.45 , MathContext .DECIMAL64 ).add (new BigDecimal (i , MathContext .DECIMAL64 ))
173- );
174- pStmt .setBigDecimal (
175- 10 ,
176- new BigDecimal (123.45 , MathContext .DECIMAL64 ).add (new BigDecimal (i , MathContext .DECIMAL64 ))
177- );
178- pStmt .setDate (17 , new Date (CURRENT_TS ));
179- pStmt .setTime (18 , new Time (CURRENT_TS ));
180- pStmt .setString (19 , "13:24:16+03" );
181- pStmt .setTimestamp (20 , new Timestamp (CURRENT_TS ));
182- pStmt .setString (21 , "2 year 3 month " + i + " day" );
183- // other
184- pStmt .setBoolean (22 , (i % 2 == 0 ));
185-
186- pStmt .execute ();
187- }
188- }
172+ populateData (pStmt1 , pStmt2 );
173+ }
174+ }
175+
176+ private static void populateData (PreparedStatement ... stmts ) throws SQLException {
177+ // insert the same data into both tables: my_table and your_table
178+ for (PreparedStatement pStmt : stmts ) {
179+ for (int i = 1 ; i <= 5 ; i ++) {
180+ String name = "user" + i ;
181+ pStmt .setInt (1 , i );
182+ pStmt .setInt (2 , i );
183+ pStmt .setInt (3 , i );
184+ pStmt .setLong (4 , i );
185+ pStmt .setFloat (5 , 123.45f + i );
186+ pStmt .setFloat (6 , 123.45f + i );
187+ pStmt .setDouble (7 , 123.45 + i );
188+ pStmt .setDouble (8 , 123.45 + i );
189+ pStmt .setBigDecimal (9 , new BigDecimal (123.45 , MathContext .DECIMAL64 ).add (new BigDecimal (i , MathContext .DECIMAL64 )));
190+ pStmt .setBigDecimal (10 , new BigDecimal (123.45 , MathContext .DECIMAL64 ).add (new BigDecimal (i , MathContext .DECIMAL64 )));
191+ pStmt .setString (11 , name );
192+ pStmt .setString (12 , name );
193+ pStmt .setString (13 , name );
194+ pStmt .setString (14 , name );
195+ pStmt .setBytes (15 , name .getBytes (Charsets .UTF_8 ));
196+ pStmt .setBytes (16 , name .getBytes (Charsets .UTF_8 ));
197+ pStmt .setDate (17 , new Date (CURRENT_TS ));
198+ pStmt .setTime (18 , new Time (CURRENT_TS ));
199+ pStmt .setString (19 , "13:24:16+03" );
200+ pStmt .setTimestamp (20 , new Timestamp (CURRENT_TS ));
201+ pStmt .setString (21 , "2 year 3 month " + i + " day" );
202+ pStmt .setBoolean (22 , (i % 2 == 0 ));
203+
204+ pStmt .executeUpdate ();
189205 }
190206 }
191207 }
@@ -194,7 +210,7 @@ public static Connection createConnection() {
194210 try {
195211 Class .forName (Driver .class .getCanonicalName ());
196212 return DriverManager .getConnection (connectionUrl , BASE_PROPS .get (ConnectionConfig .USER ),
197- BASE_PROPS .get (ConnectionConfig .PASSWORD ));
213+ BASE_PROPS .get (ConnectionConfig .PASSWORD ));
198214 } catch (Exception e ) {
199215 throw Throwables .propagate (e );
200216 }
0 commit comments