1919import com .google .common .base .Charsets ;
2020import com .google .common .collect .ImmutableMap ;
2121import com .google .common .collect .ImmutableSet ;
22+ import io .cdap .cdap .api .common .Bytes ;
2223import io .cdap .cdap .api .data .format .StructuredRecord ;
2324import io .cdap .cdap .api .data .schema .Schema ;
2425import io .cdap .cdap .api .dataset .table .Table ;
2526import io .cdap .cdap .etl .api .batch .BatchSink ;
2627import io .cdap .cdap .etl .mock .batch .MockSource ;
2728import io .cdap .cdap .etl .proto .v2 .ETLPlugin ;
29+ import io .cdap .cdap .test .ApplicationManager ;
2830import io .cdap .cdap .test .DataSetManager ;
2931import io .cdap .plugin .common .Constants ;
3032import io .cdap .plugin .db .batch .sink .AbstractDBSink ;
3133import org .junit .Assert ;
34+ import org .junit .Before ;
3235import org .junit .Test ;
3336
3437import java .math .BigDecimal ;
5154 * Test for ETL using databases.
5255 */
5356public class NetezzaSinkTestRun extends NetezzaPluginTestBase {
57+ private static final Schema SCHEMA = Schema .recordOf (
58+ "dbRecord" ,
59+ Schema .Field .of ("INTEGER_COL" , Schema .of (Schema .Type .INT )),
60+ Schema .Field .of ("BYTEINT_COL" , Schema .of (Schema .Type .INT )),
61+ Schema .Field .of ("SMALLINT_COL" , Schema .of (Schema .Type .INT )),
62+ Schema .Field .of ("BIGINT_COL" , Schema .of (Schema .Type .LONG )),
63+ Schema .Field .of ("REAL_COL" , Schema .of (Schema .Type .FLOAT )),
64+ Schema .Field .of ("REAL_FLOAT_COL" , Schema .of (Schema .Type .FLOAT )),
65+ Schema .Field .of ("DOUBLE_FLOAT_COL" , Schema .of (Schema .Type .DOUBLE )),
66+ Schema .Field .of ("DOUBLE_PRECISION_COL" , Schema .of (Schema .Type .DOUBLE )),
67+ Schema .Field .of ("NUMERIC_COL" , Schema .decimalOf (PRECISION , SCALE )),
68+ Schema .Field .of ("DECIMAL_COL" , Schema .decimalOf (PRECISION , SCALE )),
69+ Schema .Field .of ("CHAR_COL" , Schema .of (Schema .Type .STRING )),
70+ Schema .Field .of ("VARCHAR_COL" , Schema .of (Schema .Type .STRING )),
71+ Schema .Field .of ("NCHAR_COL" , Schema .of (Schema .Type .STRING )),
72+ Schema .Field .of ("NVARCHAR_COL" , Schema .of (Schema .Type .STRING )),
73+ Schema .Field .of ("VARBINARY_COL" , Schema .of (Schema .Type .BYTES )),
74+ Schema .Field .of ("ST_GEOMETRY_COL" , Schema .of (Schema .Type .BYTES )),
75+ Schema .Field .of ("DATE_COL" , Schema .of (Schema .LogicalType .DATE )),
76+ Schema .Field .of ("TIME_COL" , Schema .of (Schema .LogicalType .TIME_MICROS )),
77+ Schema .Field .of ("TIMETZ_COL" , Schema .of (Schema .Type .STRING )),
78+ Schema .Field .of ("TIMESTAMP_COL" , Schema .of (Schema .LogicalType .TIMESTAMP_MICROS )),
79+ Schema .Field .of ("INTERVAL_COL" , Schema .of (Schema .Type .STRING )),
80+ Schema .Field .of ("BOOLEAN_COL" , Schema .of (Schema .Type .BOOLEAN ))
81+ );
82+
83+ private static final BigDecimal NUMERIC_VALUE = new BigDecimal (3.458d , new MathContext (PRECISION )).setScale (SCALE );
84+
85+ @ Before
86+ public void setup () throws Exception {
87+ try (Statement stmt = createConnection ().createStatement ()) {
88+ stmt .execute ("DELETE FROM MY_DEST_TABLE" );
89+ }
90+ }
5491
5592 @ Test
56- public void testDBSink () throws Exception {
57- String inputDatasetName = "input-dbsinktest" ;
93+ public void testDBSinkWithExplicitInputSchema () throws Exception {
94+ testDBSink ("testDBSinkWithExplicitInputSchema" , "input-dbsinktest-explicit" , true );
95+ }
96+
97+
98+ @ Test
99+ public void testDBSinkWithInferredInputSchema () throws Exception {
100+ testDBSink ("testDBSinkWithInferredInputSchema" , "input-dbsinktest-inferred" , false );
101+ }
58102
59- ETLPlugin sourceConfig = MockSource .getPlugin (inputDatasetName );
103+ public void testDBSink (String appName , String inputDatasetName , boolean setInputSchema ) throws Exception {
104+ ETLPlugin sourceConfig = (setInputSchema )
105+ ? MockSource .getPlugin (inputDatasetName , SCHEMA )
106+ : MockSource .getPlugin (inputDatasetName );
60107 ETLPlugin sinkConfig = new ETLPlugin (
61108 NetezzaConstants .PLUGIN_NAME ,
62109 BatchSink .PLUGIN_TYPE ,
@@ -67,76 +114,82 @@ public void testDBSink() throws Exception {
67114 .build (),
68115 null );
69116
70- deployETL (sourceConfig , sinkConfig , DATAPIPELINE_ARTIFACT , "testDBSink" );
117+ ApplicationManager appManager = deployETL (sourceConfig , sinkConfig , DATAPIPELINE_ARTIFACT , appName );
71118 createInputData (inputDatasetName );
72119
120+ runETLOnce (appManager , ImmutableMap .of ("logical.start.time" , String .valueOf (CURRENT_TS )));
73121
74122 try (Connection conn = createConnection ();
75123 Statement stmt = conn .createStatement ();
76- ResultSet resultSet = stmt .executeQuery ("SELECT * FROM my_table ORDER BY ID" )) {
124+ ResultSet resultSet = stmt .executeQuery ("SELECT * FROM MY_DEST_TABLE ORDER BY INTEGER_COL" )) {
125+
77126 Set <String > users = new HashSet <>();
78127 Assert .assertTrue (resultSet .next ());
79- users .add (resultSet .getString ("NAME" ));
128+ users .add (resultSet .getString ("VARCHAR_COL" ));
129+
130+ Assert .assertEquals (1 , resultSet .getInt ("INTEGER_COL" ));
131+ Assert .assertEquals (1 , resultSet .getInt ("BYTEINT_COL" ));
132+ Assert .assertEquals (1 , resultSet .getInt ("SMALLINT_COL" ));
133+ Assert .assertEquals (1 , resultSet .getLong ("BIGINT_COL" ));
134+
135+ Assert .assertEquals (3.451f + 1 , resultSet .getFloat ("REAL_COL" ), 0.00001f );
136+ Assert .assertEquals (3.451f + 1 , resultSet .getFloat ("REAL_FLOAT_COL" ), 0.00001f );
137+ Assert .assertEquals (3.451 + 1 , resultSet .getFloat ("DOUBLE_FLOAT_COL" ), 0.000001 );
138+ Assert .assertEquals (3.451 + 1 , resultSet .getFloat ("DOUBLE_PRECISION_COL" ), 0.000001 );
139+ Assert .assertEquals (NUMERIC_VALUE , resultSet .getBigDecimal ("NUMERIC_COL" ));
140+
141+ Assert .assertEquals ("user1" , resultSet .getString ("CHAR_COL" ).trim ());
142+ Assert .assertEquals ("user1" , resultSet .getString ("VARCHAR_COL" ));
143+ Assert .assertEquals ("user1" , resultSet .getString ("NCHAR_COL" ).trim ());
144+ Assert .assertEquals ("user1" , resultSet .getString ("NVARCHAR_COL" ));
145+ Assert .assertEquals ("user1" , Bytes .toString (resultSet .getBytes ("VARBINARY_COL" )));
146+ Assert .assertEquals ("user1" , Bytes .toString (resultSet .getBytes ("ST_GEOMETRY_COL" )));
147+
80148 Assert .assertEquals (new Date (CURRENT_TS ).toString (), resultSet .getDate ("DATE_COL" ).toString ());
81149 Assert .assertEquals (new Time (CURRENT_TS ).toString (), resultSet .getTime ("TIME_COL" ).toString ());
82- Assert .assertEquals (new Timestamp (CURRENT_TS ),
83- resultSet .getTimestamp ("TIMESTAMP_COL" ));
150+ Assert .assertEquals ("13:24:16+03" , resultSet .getString ("TIMETZ_COL" ));
151+ Assert .assertEquals (new Timestamp (CURRENT_TS ), resultSet .getTimestamp ("TIMESTAMP_COL" ));
152+ Assert .assertEquals ("2 years 3 mons 2 days" , resultSet .getString ("INTERVAL_COL" ));
153+
154+ Assert .assertTrue (resultSet .getBoolean ("BOOLEAN_COL" ));
155+
84156 Assert .assertTrue (resultSet .next ());
85- Assert .assertEquals (new BigDecimal (3.458 , new MathContext (PRECISION )).setScale (SCALE ),
86- resultSet .getBigDecimal ("NUMERIC_COL" ));
87- Assert .assertEquals (new BigDecimal (3.459 , new MathContext (PRECISION )).setScale (SCALE ),
88- resultSet .getBigDecimal ("DECIMAL_COL" ));
89- users .add (resultSet .getString ("NAME" ));
90- Assert .assertEquals (ImmutableSet .of ("user1" , "user2" ), users );
157+ users .add (resultSet .getString ("VARCHAR_COL" ));
91158
159+ Assert .assertEquals (ImmutableSet .of ("user1" , "user2" ), users );
92160 }
93161 }
94162
95163 private void createInputData (String inputDatasetName ) throws Exception {
96164 // add some data to the input table
97165 DataSetManager <Table > inputManager = getDataset (inputDatasetName );
98- Schema schema = Schema .recordOf (
99- "dbRecord" ,
100- Schema .Field .of ("ID" , Schema .of (Schema .Type .INT )),
101- Schema .Field .of ("NAME" , Schema .of (Schema .Type .STRING )),
102- Schema .Field .of ("SCORE" , Schema .of (Schema .Type .FLOAT )),
103- Schema .Field .of ("GRADUATED" , Schema .of (Schema .Type .BOOLEAN )),
104- Schema .Field .of ("TINY" , Schema .of (Schema .Type .INT )),
105- Schema .Field .of ("SMALL" , Schema .of (Schema .Type .INT )),
106- Schema .Field .of ("BIG" , Schema .of (Schema .Type .LONG )),
107- Schema .Field .of ("FLOAT_COL" , Schema .of (Schema .Type .FLOAT )),
108- Schema .Field .of ("REAL_COL" , Schema .of (Schema .Type .FLOAT )),
109- Schema .Field .of ("NUMERIC_COL" , Schema .decimalOf (PRECISION , SCALE )),
110- Schema .Field .of ("DECIMAL_COL" , Schema .decimalOf (PRECISION , SCALE )),
111- Schema .Field .of ("BIT_COL" , Schema .of (Schema .Type .BOOLEAN )),
112- Schema .Field .of ("DATE_COL" , Schema .of (Schema .LogicalType .DATE )),
113- Schema .Field .of ("TIME_COL" , Schema .of (Schema .LogicalType .TIME_MICROS )),
114- Schema .Field .of ("TIMESTAMP_COL" , Schema .of (Schema .LogicalType .TIMESTAMP_MICROS )),
115- Schema .Field .of ("BINARY_COL" , Schema .of (Schema .Type .BYTES )),
116- Schema .Field .of ("BLOB_COL" , Schema .of (Schema .Type .BYTES ))
117- );
118166 List <StructuredRecord > inputRecords = new ArrayList <>();
119167 LocalDateTime localDateTime = new Timestamp (CURRENT_TS ).toLocalDateTime ();
120168 for (int i = 1 ; i <= 2 ; i ++) {
121169 String name = "user" + i ;
122- inputRecords .add (StructuredRecord .builder (schema )
123- .set ("ID" , i )
124- .set ("NAME" , name )
125- .set ("SCORE" , 3.451f )
126- .set ("GRADUATED" , (i % 2 == 0 ))
127- .set ("TINY" , i + 1 )
128- .set ("SMALL" , i + 2 )
129- .set ("BIG" , 3456987L )
130- .set ("FLOAT_COL" , 3.456f )
131- .set ("REAL_COL" , 3.457f )
132- .setDecimal ("NUMERIC_COL" , new BigDecimal (3.458d , new MathContext (PRECISION )).setScale (SCALE ))
133- .setDecimal ("DECIMAL_COL" , new BigDecimal (3.459d , new MathContext (PRECISION )).setScale (SCALE ))
134- .set ("BIT_COL" , (i % 2 == 1 ))
170+ inputRecords .add (StructuredRecord .builder (SCHEMA )
171+ .set ("INTEGER_COL" , i )
172+ .set ("BYTEINT_COL" , i )
173+ .set ("SMALLINT_COL" , i )
174+ .set ("BIGINT_COL" , (long ) i )
175+ .set ("REAL_COL" , 3.451f + i )
176+ .set ("REAL_FLOAT_COL" , 3.451f + i )
177+ .set ("DOUBLE_FLOAT_COL" , 3.451 + i )
178+ .set ("DOUBLE_PRECISION_COL" , 3.451 + i )
179+ .setDecimal ("NUMERIC_COL" , NUMERIC_VALUE )
180+ .setDecimal ("DECIMAL_COL" , NUMERIC_VALUE )
181+ .set ("CHAR_COL" , name )
182+ .set ("VARCHAR_COL" , name )
183+ .set ("NCHAR_COL" , name )
184+ .set ("NVARCHAR_COL" , name )
185+ .set ("VARBINARY_COL" , name .getBytes (Charsets .UTF_8 ))
186+ .set ("ST_GEOMETRY_COL" , name .getBytes (Charsets .UTF_8 ))
135187 .setDate ("DATE_COL" , localDateTime .toLocalDate ())
136188 .setTime ("TIME_COL" , localDateTime .toLocalTime ())
189+ .set ("TIMETZ_COL" , "13:24:16+03" )
137190 .setTimestamp ("TIMESTAMP_COL" , localDateTime .atZone (ZoneId .ofOffset ("UTC" , ZoneOffset .UTC )))
138- .set ("BINARY_COL " , name . getBytes ( Charsets . UTF_8 ) )
139- .set ("BLOB_COL " , name . getBytes ( Charsets . UTF_8 ) )
191+ .set ("INTERVAL_COL " , "2 years 3 mons 2 days" )
192+ .set ("BOOLEAN_COL " , true )
140193 .build ());
141194 }
142195 MockSource .writeInput (inputManager , inputRecords );
0 commit comments