1818
1919import com .google .common .base .Charsets ;
2020import com .google .common .collect .ImmutableMap ;
21- import com .google .common .collect .ImmutableSet ;
22- import io .cdap .cdap .api .common .Bytes ;
2321import io .cdap .cdap .api .data .format .StructuredRecord ;
2422import io .cdap .cdap .api .data .schema .Schema ;
2523import io .cdap .cdap .api .dataset .table .Table ;
3129import io .cdap .plugin .common .Constants ;
3230import io .cdap .plugin .db .batch .sink .AbstractDBSink ;
3331import org .junit .Assert ;
32+ import org .junit .Before ;
3433import org .junit .Test ;
3534
3635import java .math .BigDecimal ;
3736import java .math .MathContext ;
3837import java .sql .Connection ;
39- import java .sql .Date ;
4038import java .sql .ResultSet ;
4139import java .sql .Statement ;
42- import java .sql .Time ;
4340import java .sql .Timestamp ;
4441import java .time .LocalDateTime ;
45- import java .time .ZoneId ;
46- import java .time .ZoneOffset ;
4742import java .util .ArrayList ;
48- import java .util .HashSet ;
43+ import java .util .Arrays ;
4944import java .util .List ;
50- import java .util .Set ;
5145
5246/**
5347 * Test for ETL using databases.
5448 */
5549public class MysqlSinkTestRun extends MysqlPluginTestBase {
5650
51+ private static final Schema SCHEMA = Schema .recordOf (
52+ "dbRecord" ,
53+ Schema .Field .of ("ID" , Schema .of (Schema .Type .INT )),
54+ Schema .Field .of ("NAME" , Schema .of (Schema .Type .STRING )),
55+ Schema .Field .of ("SCORE" , Schema .of (Schema .Type .DOUBLE )),
56+ Schema .Field .of ("GRADUATED" , Schema .of (Schema .Type .BOOLEAN )),
57+ Schema .Field .of ("TINY" , Schema .of (Schema .Type .INT )),
58+ Schema .Field .of ("SMALL" , Schema .of (Schema .Type .INT )),
59+ Schema .Field .of ("BIG" , Schema .of (Schema .Type .LONG )),
60+ Schema .Field .of ("MEDIUMINT_COL" , Schema .of (Schema .Type .INT )),
61+ Schema .Field .of ("FLOAT_COL" , Schema .of (Schema .Type .FLOAT )),
62+ Schema .Field .of ("REAL_COL" , Schema .of (Schema .Type .DOUBLE )),
63+ Schema .Field .of ("NUMERIC_COL" , Schema .decimalOf (PRECISION , SCALE )),
64+ Schema .Field .of ("DECIMAL_COL" , Schema .decimalOf (PRECISION , SCALE )),
65+ Schema .Field .of ("BIT_COL" , Schema .of (Schema .Type .BOOLEAN )),
66+ Schema .Field .of ("DATE_COL" , Schema .of (Schema .LogicalType .DATE )),
67+ Schema .Field .of ("TIME_COL" , Schema .of (Schema .LogicalType .TIME_MICROS )),
68+ Schema .Field .of ("TIMESTAMP_COL" , Schema .of (Schema .LogicalType .TIMESTAMP_MICROS )),
69+ Schema .Field .of ("DATETIME_COL" , Schema .of (Schema .LogicalType .TIMESTAMP_MICROS )),
70+ Schema .Field .of ("YEAR_COL" , Schema .of (Schema .LogicalType .DATE )),
71+ Schema .Field .of ("TEXT_COL" , Schema .of (Schema .Type .STRING )),
72+ Schema .Field .of ("TINYTEXT_COL" , Schema .of (Schema .Type .STRING )),
73+ Schema .Field .of ("MEDIUMTEXT_COL" , Schema .of (Schema .Type .STRING )),
74+ Schema .Field .of ("LONGTEXT_COL" , Schema .of (Schema .Type .STRING )),
75+ Schema .Field .of ("CHAR_COL" , Schema .of (Schema .Type .STRING )),
76+ Schema .Field .of ("BINARY_COL" , Schema .of (Schema .Type .BYTES )),
77+ Schema .Field .of ("VARBINARY_COL" , Schema .of (Schema .Type .BYTES )),
78+ Schema .Field .of ("TINYBLOB_COL" , Schema .of (Schema .Type .BYTES )),
79+ Schema .Field .of ("BLOB_COL" , Schema .of (Schema .Type .BYTES )),
80+ Schema .Field .of ("MEDIUMBLOB_COL" , Schema .of (Schema .Type .BYTES )),
81+ Schema .Field .of ("LONGBLOB_COL" , Schema .of (Schema .Type .BYTES )),
82+ Schema .Field .of ("ENUM_COL" , Schema .of (Schema .Type .STRING )),
83+ Schema .Field .of ("SET_COL" , Schema .of (Schema .Type .STRING ))
84+ );
85+
86+ @ Before
87+ public void setup () throws Exception {
88+ try (Statement stmt = createConnection ().createStatement ()) {
89+ stmt .execute ("TRUNCATE TABLE MY_DEST_TABLE" );
90+ }
91+ }
92+
93+ @ Test
94+ public void testDBSinkWithExplicitInputSchema () throws Exception {
95+ testDBSink ("testDBSinkWithExplicitInputSchema" , "input-dbsinktest-explicit" , SCHEMA );
96+ }
97+
5798 @ Test
58- public void testDBSink () throws Exception {
59- String inputDatasetName = "input-dbsinktest" ;
99+ public void testDBSinkWithInferredInputSchema () throws Exception {
100+ testDBSink ("testDBSinkWithInferredInputSchema" , "input-dbsinktest-inferred" , null );
101+ }
102+
103+ private void testDBSink (String appName , String inputDatasetName , Schema schema ) throws Exception {
104+ ETLPlugin sourceConfig = (schema != null )
105+ ? MockSource .getPlugin (inputDatasetName , schema )
106+ : MockSource .getPlugin (inputDatasetName );
60107
61- ETLPlugin sourceConfig = MockSource .getPlugin (inputDatasetName );
62108 ETLPlugin sinkConfig = new ETLPlugin (
63109 MysqlConstants .PLUGIN_NAME ,
64110 BatchSink .PLUGIN_TYPE ,
@@ -72,96 +118,137 @@ public void testDBSink() throws Exception {
72118 .build (),
73119 null );
74120
75- ApplicationManager appManager = deployETL (sourceConfig , sinkConfig , DATAPIPELINE_ARTIFACT , "testDBSink" );
76- createInputData (inputDatasetName );
121+ ApplicationManager appManager = deployETL (sourceConfig , sinkConfig , DATAPIPELINE_ARTIFACT , appName );
122+
123+ // Prepare test input data
124+ List <StructuredRecord > inputRecords = createInputData ();
125+ DataSetManager <Table > inputManager = getDataset (inputDatasetName );
126+ MockSource .writeInput (inputManager , inputRecords );
77127 runETLOnce (appManager , ImmutableMap .of ("logical.start.time" , String .valueOf (CURRENT_TS )));
78128
79129 try (Connection conn = createConnection ();
80130 Statement stmt = conn .createStatement ();
81- ResultSet resultSet = stmt .executeQuery ("SELECT * FROM MY_DEST_TABLE" )) {
82- Set <String > users = new HashSet <>();
83- Assert .assertTrue (resultSet .next ());
84- users .add (resultSet .getString ("NAME" ));
85- Assert .assertEquals (new Date (CURRENT_TS ).toString (), resultSet .getDate ("DATE_COL" ).toString ());
86- Assert .assertEquals (new Time (CURRENT_TS ).toString (), resultSet .getTime ("TIME_COL" ).toString ());
87- Assert .assertEquals (new Timestamp (CURRENT_TS ),
88- resultSet .getTimestamp ("TIMESTAMP_COL" ));
89- Assert .assertTrue (resultSet .next ());
90- Assert .assertEquals ("user2" , Bytes .toString (resultSet .getBytes ("BLOB_COL" ), 0 , 5 ));
91- Assert .assertEquals ("user2" , Bytes .toString (resultSet .getBytes ("TINYBLOB_COL" ), 0 , 5 ));
92- Assert .assertEquals ("user2" , Bytes .toString (resultSet .getBytes ("MEDIUMBLOB_COL" ), 0 , 5 ));
93- Assert .assertEquals ("user2" , Bytes .toString (resultSet .getBytes ("LONGBLOB_COL" ), 0 , 5 ));
94- Assert .assertEquals (new BigDecimal (3.458 , new MathContext (PRECISION )).setScale (SCALE ),
95- resultSet .getBigDecimal ("NUMERIC_COL" ));
96- Assert .assertEquals (new BigDecimal (3.459 , new MathContext (PRECISION )).setScale (SCALE ),
97- resultSet .getBigDecimal ("DECIMAL_COL" ));
98- users .add (resultSet .getString ("NAME" ));
99- Assert .assertEquals (ImmutableSet .of ("user1" , "user2" ), users );
131+ ResultSet actual = stmt .executeQuery ("SELECT * FROM MY_DEST_TABLE ORDER BY ID" )) {
132+
133+ for (StructuredRecord expected : inputRecords ) {
134+ Assert .assertTrue (actual .next ());
135+
136+ // Verify data
137+ assertObjectEquals (expected .get ("ID" ), actual .getInt ("ID" ));
138+ assertObjectEquals (expected .get ("NAME" ), actual .getString ("NAME" ));
139+ assertObjectEquals (expected .get ("TEXT_COL" ), actual .getString ("TEXT_COL" ));
140+ assertObjectEquals (expected .get ("TINYTEXT_COL" ), actual .getString ("TINYTEXT_COL" ));
141+ assertObjectEquals (expected .get ("MEDIUMTEXT_COL" ), actual .getString ("MEDIUMTEXT_COL" ));
142+ assertObjectEquals (expected .get ("LONGTEXT_COL" ), actual .getString ("LONGTEXT_COL" ));
143+ assertObjectEquals (expected .get ("CHAR_COL" ), actual .getString ("CHAR_COL" ).trim ());
144+ assertObjectEquals (expected .get ("GRADUATED" ), actual .getBoolean ("GRADUATED" ));
145+ Assert .assertNull (actual .getString ("NOT_IMPORTED" ));
146+ assertObjectEquals (expected .get ("ENUM_COL" ), actual .getString ("ENUM_COL" ));
147+ assertObjectEquals (expected .get ("SET_COL" ), actual .getString ("SET_COL" ));
148+ assertObjectEquals (expected .get ("TINY" ), actual .getInt ("TINY" ));
149+ assertObjectEquals (expected .get ("SMALL" ), actual .getInt ("SMALL" ));
150+ assertObjectEquals (expected .get ("BIG" ), actual .getLong ("BIG" ));
151+ assertObjectEquals (expected .get ("MEDIUMINT_COL" ), actual .getInt ("MEDIUMINT_COL" ));
152+ assertNumericEquals (expected .get ("SCORE" ), actual .getDouble ("SCORE" ));
153+ assertNumericEquals (expected .get ("FLOAT_COL" ), actual .getFloat ("FLOAT_COL" ));
154+ assertNumericEquals (expected .get ("REAL_COL" ), actual .getDouble ("REAL_COL" ));
155+ assertObjectEquals (expected .getDecimal ("NUMERIC_COL" ), actual .getBigDecimal ("NUMERIC_COL" ));
156+ assertObjectEquals (expected .getDecimal ("DECIMAL_COL" ), actual .getBigDecimal ("DECIMAL_COL" ));
157+ assertObjectEquals (expected .get ("BIT_COL" ), actual .getBoolean ("BIT_COL" ));
100158
159+ // Verify binary columns
160+ assertBytesEquals (expected .get ("BINARY_COL" ), actual .getBytes ("BINARY_COL" ));
161+ assertBytesEquals (expected .get ("VARBINARY_COL" ), actual .getBytes ("VARBINARY_COL" ));
162+ assertBytesEquals (expected .get ("BLOB_COL" ), actual .getBytes ("BLOB_COL" ));
163+ assertBytesEquals (expected .get ("MEDIUMBLOB_COL" ), actual .getBytes ("MEDIUMBLOB_COL" ));
164+ assertBytesEquals (expected .get ("TINYBLOB_COL" ), actual .getBytes ("TINYBLOB_COL" ));
165+ assertBytesEquals (expected .get ("LONGBLOB_COL" ), actual .getBytes ("LONGBLOB_COL" ));
166+
167+ // Verify time columns
168+ Assert .assertEquals (expected .getDate ("DATE_COL" ), actual .getDate ("DATE_COL" ).toLocalDate ());
169+
170+ // compare seconds, since mysql 'time' type does not store milliseconds but 'LocalTime' does
171+ Assert .assertEquals (expected .getTime ("TIME_COL" ).toSecondOfDay (),
172+ actual .getTime ("TIME_COL" ).toLocalTime ().toSecondOfDay ());
173+ Assert .assertEquals (expected .getDate ("YEAR_COL" ).getYear (), actual .getInt ("YEAR_COL" ));
174+ Assert .assertEquals (expected .getTimestamp ("DATETIME_COL" ),
175+ actual .getTimestamp ("DATETIME_COL" ).toInstant ().atZone (UTC_ZONE ));
176+ Assert .assertEquals (expected .getTimestamp ("TIMESTAMP_COL" ),
177+ actual .getTimestamp ("TIMESTAMP_COL" ).toInstant ().atZone (UTC_ZONE ));
178+ }
101179 }
102180 }
103181
104- private void createInputData ( String inputDatasetName ) throws Exception {
105- // add some data to the input table
106- DataSetManager < Table > inputManager = getDataset ( inputDatasetName );
107- Schema schema = Schema . recordOf (
108- "dbRecord" ,
109- Schema . Field . of ( "ID" , Schema . of ( Schema . Type . INT )),
110- Schema . Field . of ( "NAME" , Schema . of ( Schema . Type . STRING )),
111- Schema . Field . of ( "SCORE" , Schema . of ( Schema . Type . DOUBLE )),
112- Schema . Field . of ( "GRADUATED" , Schema . of ( Schema . Type . BOOLEAN )),
113- Schema . Field . of ( "TINY" , Schema . of ( Schema . Type . INT )),
114- Schema . Field . of ( "SMALL" , Schema . of ( Schema . Type . INT )),
115- Schema . Field . of ( "BIG" , Schema . of ( Schema . Type . LONG )),
116- Schema . Field . of ( "FLOAT_COL" , Schema . of ( Schema . Type . FLOAT )),
117- Schema . Field . of ( "REAL_COL" , Schema . of ( Schema . Type . DOUBLE )),
118- Schema . Field . of ( "NUMERIC_COL" , Schema . decimalOf ( PRECISION , SCALE )),
119- Schema . Field . of ( "DECIMAL_COL" , Schema . decimalOf ( PRECISION , SCALE )),
120- Schema . Field . of ( "BIT_COL" , Schema . of ( Schema . Type . BOOLEAN )),
121- Schema . Field . of ( "DATE_COL" , Schema . of ( Schema . LogicalType . DATE )),
122- Schema . Field . of ( "TIME_COL" , Schema . of ( Schema . LogicalType . TIME_MICROS )),
123- Schema . Field . of ( "TIMESTAMP_COL" , Schema . of ( Schema . LogicalType . TIMESTAMP_MICROS )),
124- Schema . Field . of ( "BINARY_COL" , Schema . of ( Schema . Type . BYTES )),
125- Schema . Field . of ( "BLOB_COL" , Schema . of ( Schema . Type . BYTES )),
126- Schema . Field . of ( "TINYBLOB_COL" , Schema . of ( Schema . Type . BYTES )),
127- Schema . Field . of ( "MEDIUMBLOB_COL" , Schema . of ( Schema . Type . BYTES )),
128- Schema . Field . of ( "LONGBLOB_COL" , Schema . of ( Schema . Type . BYTES )),
129- Schema . Field . of ( "TEXT_COL" , Schema . of ( Schema . Type . STRING )),
130- Schema . Field . of ( "TINYTEXT_COL" , Schema . of ( Schema . Type . STRING )),
131- Schema . Field . of ( "MEDIUMTEXT_COL" , Schema . of ( Schema . Type . STRING )),
132- Schema . Field . of ( "LONGTEXT_COL" , Schema . of ( Schema . Type . STRING ))
133- );
182+ /**
183+ * Added to prevent 'Ambiguous method call' issue
184+ */
185+ private void assertObjectEquals ( Object expected , Object actual ) {
186+ Assert . assertEquals ( expected , actual );
187+ }
188+
189+ /**
190+ * Added to trim arrays of bytes since it's common that actual arrays are larger.
191+ */
192+ private void assertBytesEquals ( byte [] expected , byte [] actual ) {
193+ Assert . assertTrue ( actual . length >= expected . length );
194+ Assert . assertArrayEquals ( expected , Arrays . copyOf ( actual , expected . length ));
195+ }
196+
197+ /**
198+ * Added to prevent repetitive casts to 'double' and specifying delta.
199+ */
200+ private void assertNumericEquals ( double expected , double actual ) {
201+ Assert . assertEquals ( expected , actual , 0.000001 );
202+ }
203+
204+ /**
205+ * Added to prevent repetitive casts to 'float' and specifying delta.
206+ */
207+ private void assertNumericEquals ( float expected , float actual ) {
208+ Assert . assertEquals ( expected , actual , 0.000001 );
209+ }
210+
211+ private List < StructuredRecord > createInputData () throws Exception {
134212 List <StructuredRecord > inputRecords = new ArrayList <>();
135213 LocalDateTime localDateTime = new Timestamp (CURRENT_TS ).toLocalDateTime ();
136214 for (int i = 1 ; i <= 2 ; i ++) {
137215 String name = "user" + i ;
138- inputRecords .add (StructuredRecord .builder (schema )
139- .set ("ID" , i )
140- .set ("NAME" , name )
141- .set ("SCORE" , 3.451 )
142- .set ("GRADUATED" , (i % 2 == 0 ))
143- .set ("TINY" , i + 1 )
144- .set ("SMALL" , i + 2 )
145- .set ("BIG" , 3456987L )
146- .set ("FLOAT_COL" , 3.456f )
147- .set ("REAL_COL" , 3.457 )
148- .setDecimal ("NUMERIC_COL" , new BigDecimal (3.458d , new MathContext (PRECISION )).setScale (SCALE ))
149- .setDecimal ("DECIMAL_COL" , new BigDecimal (3.459d , new MathContext (PRECISION )).setScale (SCALE ))
150- .set ("BIT_COL" , (i % 2 == 1 ))
151- .setDate ("DATE_COL" , localDateTime .toLocalDate ())
152- .setTime ("TIME_COL" , localDateTime .toLocalTime ())
153- .setTimestamp ("TIMESTAMP_COL" , localDateTime .atZone (ZoneId .ofOffset ("UTC" , ZoneOffset .UTC )))
154- .set ("BINARY_COL" , name .getBytes (Charsets .UTF_8 ))
155- .set ("BLOB_COL" , name .getBytes (Charsets .UTF_8 ))
156- .set ("TINYBLOB_COL" , name .getBytes (Charsets .UTF_8 ))
157- .set ("MEDIUMBLOB_COL" , name .getBytes (Charsets .UTF_8 ))
158- .set ("LONGBLOB_COL" , name .getBytes (Charsets .UTF_8 ))
159- .set ("TEXT_COL" , name )
160- .set ("TINYTEXT_COL" , name )
161- .set ("MEDIUMTEXT_COL" , name )
162- .set ("LONGTEXT_COL" , name )
163- .build ());
216+ StructuredRecord .Builder builder = StructuredRecord .builder (SCHEMA )
217+ .set ("ID" , i )
218+ .set ("NAME" , name )
219+ .set ("SCORE" , 3.451 )
220+ .set ("GRADUATED" , (i % 2 == 0 ))
221+ .set ("TINY" , i )
222+ .set ("SMALL" , i )
223+ .set ("BIG" , 3456987L )
224+ .set ("MEDIUMINT_COL" , 8388607 )
225+ .set ("FLOAT_COL" , 3.456f )
226+ .set ("REAL_COL" , 3.457 )
227+ .setDecimal ("NUMERIC_COL" , new BigDecimal (3.458d , new MathContext (PRECISION )).setScale (SCALE ))
228+ .setDecimal ("DECIMAL_COL" , new BigDecimal (3.459d , new MathContext (PRECISION )).setScale (SCALE ))
229+ .set ("BIT_COL" , (i % 2 == 1 ))
230+ .setDate ("DATE_COL" , localDateTime .toLocalDate ())
231+ .setTime ("TIME_COL" , localDateTime .toLocalTime ())
232+ .setTimestamp ("TIMESTAMP_COL" , localDateTime .atZone (UTC_ZONE ))
233+ .setTimestamp ("DATETIME_COL" , localDateTime .atZone (UTC_ZONE ))
234+ .setDate ("YEAR_COL" , localDateTime .toLocalDate ())
235+ .set ("TEXT_COL" , name )
236+ .set ("TINYTEXT_COL" , name )
237+ .set ("MEDIUMTEXT_COL" , name )
238+ .set ("LONGTEXT_COL" , name )
239+ .set ("CHAR_COL" , "char" + i )
240+ .set ("BINARY_COL" , name .getBytes (Charsets .UTF_8 ))
241+ .set ("VARBINARY_COL" , name .getBytes (Charsets .UTF_8 ))
242+ .set ("TINYBLOB_COL" , name .getBytes (Charsets .UTF_8 ))
243+ .set ("BLOB_COL" , name .getBytes (Charsets .UTF_8 ))
244+ .set ("MEDIUMBLOB_COL" , name .getBytes (Charsets .UTF_8 ))
245+ .set ("LONGBLOB_COL" , name .getBytes (Charsets .UTF_8 ))
246+ .set ("ENUM_COL" , "Second" )
247+ .set ("SET_COL" , "a,b,c,d" );
248+
249+ inputRecords .add (builder .build ());
164250 }
165- MockSource .writeInput (inputManager , inputRecords );
251+
252+ return inputRecords ;
166253 }
167254}
0 commit comments