Skip to content

Commit 3f0c72e

Browse files
authored
[Test][Unit Test] Add unit test for kafka.
1 parent 972f42f commit 3f0c72e

File tree

13 files changed

+1127
-1
lines changed

13 files changed

+1127
-1
lines changed

chunjun-connectors/chunjun-connector-kafka/pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,14 @@
4242
<artifactId>avro</artifactId>
4343
<version>1.10.0</version>
4444
</dependency>
45+
46+
<!--flink formats-->
47+
<dependency>
48+
<groupId>org.apache.flink</groupId>
49+
<artifactId>flink-json</artifactId>
50+
<version>${flink.version}</version>
51+
<scope>test</scope>
52+
</dependency>
4553
</dependencies>
4654

4755
<build>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.chunjun.connector.kafka.adapter;
20+
21+
import com.dtstack.chunjun.connector.kafka.conf.KafkaConf;
22+
import com.dtstack.chunjun.connector.kafka.enums.StartupMode;
23+
import com.dtstack.chunjun.util.GsonUtil;
24+
25+
import com.google.gson.Gson;
26+
import com.google.gson.GsonBuilder;
27+
import org.junit.Assert;
28+
import org.junit.Test;
29+
30+
public class StartupModeAdapterTest {
31+
32+
@Test
33+
public void testStartupModeAdapter() {
34+
Gson gson =
35+
new GsonBuilder()
36+
.registerTypeAdapter(StartupMode.class, new StartupModeAdapter())
37+
.create();
38+
GsonUtil.setTypeAdapter(gson);
39+
KafkaConf kafkaConf = new KafkaConf();
40+
kafkaConf.setMode(StartupMode.LATEST);
41+
42+
String s = gson.toJson(kafkaConf);
43+
44+
Assert.assertTrue(s.contains("latest-offset"));
45+
}
46+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.chunjun.connector.kafka.conf;
20+
21+
import com.dtstack.chunjun.connector.kafka.enums.StartupMode;
22+
23+
import org.junit.Assert;
24+
import org.junit.Test;
25+
26+
import java.util.HashMap;
27+
28+
public class KafkaConfTest {
29+
30+
@Test
31+
public void testConfiguration() {
32+
KafkaConf kafkaConf = new KafkaConf();
33+
Assert.assertEquals(StartupMode.GROUP_OFFSETS, kafkaConf.getMode());
34+
Assert.assertEquals("text", kafkaConf.getCodec());
35+
36+
kafkaConf.setMode(StartupMode.LATEST);
37+
Assert.assertEquals(StartupMode.LATEST, kafkaConf.getMode());
38+
39+
kafkaConf.setCodec("json");
40+
Assert.assertEquals("json", kafkaConf.getCodec());
41+
42+
kafkaConf.setTopic("test");
43+
Assert.assertEquals("test", kafkaConf.getTopic());
44+
45+
kafkaConf.setGroupId("test");
46+
Assert.assertEquals("test", kafkaConf.getGroupId());
47+
48+
kafkaConf.setOffset("1000");
49+
Assert.assertEquals("1000", kafkaConf.getOffset());
50+
51+
Assert.assertEquals(-1, kafkaConf.getTimestamp());
52+
kafkaConf.setTimestamp(1983342301L);
53+
Assert.assertEquals(1983342301L, kafkaConf.getTimestamp());
54+
55+
HashMap<String, String> configs = new HashMap<>();
56+
configs.put("bootstrap.servers", "localhost:9092");
57+
kafkaConf.setConsumerSettings(configs);
58+
Assert.assertEquals(configs, kafkaConf.getConsumerSettings());
59+
60+
kafkaConf.setProducerSettings(configs);
61+
Assert.assertEquals(configs, kafkaConf.getProducerSettings());
62+
63+
kafkaConf.setSplit(true);
64+
Assert.assertTrue(kafkaConf.isSplit());
65+
66+
kafkaConf.setPavingData(true);
67+
Assert.assertTrue(kafkaConf.isPavingData());
68+
69+
kafkaConf.setDataCompelOrder(true);
70+
Assert.assertTrue(kafkaConf.isDataCompelOrder());
71+
72+
kafkaConf.setDeserialization("string");
73+
Assert.assertEquals("string", kafkaConf.getDeserialization());
74+
}
75+
}
Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.chunjun.connector.kafka.converter;
20+
21+
import com.dtstack.chunjun.connector.kafka.conf.KafkaConf;
22+
import com.dtstack.chunjun.element.ColumnRowData;
23+
import com.dtstack.chunjun.util.GsonUtil;
24+
25+
import org.junit.Assert;
26+
import org.junit.Test;
27+
28+
import java.math.BigDecimal;
29+
import java.math.BigInteger;
30+
import java.sql.Time;
31+
import java.sql.Timestamp;
32+
import java.util.ArrayList;
33+
import java.util.HashMap;
34+
35+
public class ConverterTest {
36+
private final String kafkaConfContent =
37+
" {\n"
38+
+ " \"topic\": \"da\",\n"
39+
+ " \"groupId\": \"dddd\",\n"
40+
+ " \"codec\": \"json\",\n"
41+
+ " \"column\": [\n"
42+
+ " {\n"
43+
+ " \"name\": \"id\",\n"
44+
+ " \"type\": \"int\"\n"
45+
+ " },\n"
46+
+ " {\n"
47+
+ " \"name\": \"INTEGER\",\n"
48+
+ " \"type\": \"INTEGER\"\n"
49+
+ " },\n"
50+
+ " {\n"
51+
+ " \"name\": \"BOOLEAN\",\n"
52+
+ " \"type\": \"BOOLEAN\"\n"
53+
+ " },\n"
54+
+ " {\n"
55+
+ " \"name\": \"TINYINT\",\n"
56+
+ " \"type\": \"TINYINT\"\n"
57+
+ " },\n"
58+
+ " {\n"
59+
+ " \"name\": \"VARCHAR\",\n"
60+
+ " \"type\": \"VARCHAR\"\n"
61+
+ " },\n"
62+
+ " {\n"
63+
+ " \"name\": \"CHAR\",\n"
64+
+ " \"type\": \"CHAR\"\n"
65+
+ " },\n"
66+
+ " {\n"
67+
+ " \"name\": \"CHARACTER\",\n"
68+
+ " \"type\": \"CHARACTER\"\n"
69+
+ " },\n"
70+
+ " {\n"
71+
+ " \"name\": \"STRING\",\n"
72+
+ " \"type\": \"STRING\"\n"
73+
+ " },\n"
74+
+ " {\n"
75+
+ " \"name\": \"TEXT\",\n"
76+
+ " \"type\": \"TEXT\"\n"
77+
+ " },\n"
78+
+ " {\n"
79+
+ " \"name\": \"SHORT\",\n"
80+
+ " \"type\": \"SHORT\"\n"
81+
+ " },\n"
82+
+ " {\n"
83+
+ " \"name\": \"LONG\",\n"
84+
+ " \"type\": \"LONG\"\n"
85+
+ " },\n"
86+
+ " {\n"
87+
+ " \"name\": \"BIGINT\",\n"
88+
+ " \"type\": \"BIGINT\"\n"
89+
+ " },\n"
90+
+ " {\n"
91+
+ " \"name\": \"FLOAT\",\n"
92+
+ " \"type\": \"FLOAT\"\n"
93+
+ " },\n"
94+
+ " {\n"
95+
+ " \"name\": \"DOUBLE\",\n"
96+
+ " \"type\": \"DOUBLE\"\n"
97+
+ " },\n"
98+
+ " {\n"
99+
+ " \"name\": \"DECIMAL\",\n"
100+
+ " \"type\": \"DECIMAL\"\n"
101+
+ " },\n"
102+
+ " {\n"
103+
+ " \"name\": \"DATE\",\n"
104+
+ " \"type\": \"DATE\"\n"
105+
+ " },\n"
106+
+ " {\n"
107+
+ " \"name\": \"TIME\",\n"
108+
+ " \"type\": \"TIME\"\n"
109+
+ " },\n"
110+
+ " {\n"
111+
+ " \"name\": \"DATETIME\",\n"
112+
+ " \"type\": \"DATETIME\"\n"
113+
+ " },\n"
114+
+ " {\n"
115+
+ " \"name\": \"TIMESTAMP\",\n"
116+
+ " \"type\": \"TIMESTAMP\"\n"
117+
+ " }\n"
118+
+ " ],"
119+
+ " \"consumerSettings\": {\n"
120+
+ " \"bootstrap.servers\": \"localhost:9092\",\n"
121+
+ " \"auto.commit.enable\": \"false\"\n"
122+
+ " }\n"
123+
+ " }\n";
124+
125+
@Test
126+
public void testConverter() throws Exception {
127+
KafkaConf kafkaConf = GsonUtil.GSON.fromJson(kafkaConfContent, KafkaConf.class);
128+
KafkaColumnConverter converter = new KafkaColumnConverter(kafkaConf);
129+
130+
HashMap<String, Object> data = new HashMap<>();
131+
data.put("id", 1);
132+
data.put("INTEGER", 1);
133+
data.put("BOOLEAN", "true");
134+
data.put("TINYINT", "1");
135+
data.put("CHAR", "char");
136+
data.put("CHARACTER", "char");
137+
data.put("STRING", "tiezhu");
138+
data.put("VARCHAR", "dujie");
139+
data.put("TEXT", "abcdefg");
140+
data.put("SHORT", Short.parseShort("2"));
141+
data.put("LONG", 1234L);
142+
data.put("BIGINT", BigInteger.valueOf(12345L));
143+
data.put("FLOAT", Float.parseFloat("2.22"));
144+
data.put("DOUBLE", 2.22d);
145+
data.put("DECIMAL", new BigDecimal("2.22"));
146+
data.put("DATE", "2022-08-22");
147+
data.put("TIME", "18:00:00");
148+
data.put("DATETIME", "2022-08-12T18:00:00");
149+
data.put("TIMESTAMP", "2022-08-12 18:00:00");
150+
151+
ColumnRowData rowData = (ColumnRowData) converter.toInternal(GsonUtil.GSON.toJson(data));
152+
Assert.assertEquals(19, rowData.getArity());
153+
154+
Assert.assertEquals(1, (int) (rowData.getField(0).asInt()));
155+
Assert.assertEquals(1, (int) (rowData.getField(1).asInt()));
156+
Assert.assertTrue((rowData.getField(2).asBoolean()));
157+
Assert.assertEquals(1, (int) (rowData.getField(3).asInt()));
158+
Assert.assertEquals("dujie", (rowData.getField(4).asString()));
159+
Assert.assertEquals("char", (rowData.getField(5).asString()));
160+
Assert.assertEquals("char", (rowData.getField(6).asString()));
161+
Assert.assertEquals("tiezhu", (rowData.getField(7).asString()));
162+
Assert.assertEquals("abcdefg", (rowData.getField(8).asString()));
163+
Assert.assertEquals(2, (int) (rowData.getField(9).asInt()));
164+
Assert.assertEquals(1234L, (long) (rowData.getField(10).asLong()));
165+
Assert.assertEquals(12345L, (long) (rowData.getField(11).asLong()));
166+
Assert.assertEquals("2.22", (rowData.getField(12).asFloat().toString()));
167+
Assert.assertEquals("2.22", (rowData.getField(13).asDouble().toString()));
168+
Assert.assertEquals("2.22", (rowData.getField(14).asBigDecimal().toString()));
169+
Assert.assertEquals("2022-08-22", (rowData.getField(15).asString()));
170+
Assert.assertEquals(Time.valueOf("18:00:00"), (rowData.getField(16).asTime()));
171+
172+
converter.toExternal(rowData, null);
173+
HashMap<String, Integer> headers = new HashMap<>(8);
174+
headers.put("schema", 0);
175+
rowData.setHeader(headers);
176+
converter.toExternal(rowData, null);
177+
}
178+
179+
@Test
180+
public void testConverterWithTableFIelds() throws Exception {
181+
HashMap<String, Object> data = new HashMap<>();
182+
data.put("id", 1);
183+
data.put("INTEGER", 1);
184+
data.put("BOOLEAN", "true");
185+
data.put("TINYINT", "1");
186+
data.put("CHAR", "char");
187+
data.put("CHARACTER", "char");
188+
data.put("STRING", "tiezhu");
189+
data.put("VARCHAR", "dujie");
190+
data.put("TEXT", "abcdefg");
191+
data.put("SHORT", Short.parseShort("2"));
192+
data.put("LONG", 1234L);
193+
data.put("BIGINT", BigInteger.valueOf(12345L));
194+
data.put("FLOAT", Float.parseFloat("2.22"));
195+
data.put("DOUBLE", 2.22d);
196+
data.put("DECIMAL", new BigDecimal("2.22"));
197+
data.put("DATE", "2022-08-22");
198+
data.put("TIME", "18:00:00");
199+
data.put("DATETIME", "2022-08-12T18:00:00");
200+
data.put("TIMESTAMP", "2022-08-12 18:00:00");
201+
ArrayList<String> tableFields = new ArrayList<>();
202+
data.forEach(
203+
(k, v) -> {
204+
tableFields.add(k);
205+
});
206+
KafkaConf kafkaConf = GsonUtil.GSON.fromJson(kafkaConfContent, KafkaConf.class);
207+
kafkaConf.setTableFields(tableFields);
208+
KafkaColumnConverter converter = new KafkaColumnConverter(kafkaConf);
209+
210+
ColumnRowData rowData = (ColumnRowData) converter.toInternal(GsonUtil.GSON.toJson(data));
211+
Assert.assertEquals(19, rowData.getArity());
212+
213+
Assert.assertEquals(1, (int) (rowData.getField(0).asInt()));
214+
Assert.assertEquals(1, (int) (rowData.getField(1).asInt()));
215+
Assert.assertTrue((rowData.getField(2).asBoolean()));
216+
Assert.assertEquals(1, (int) (rowData.getField(3).asInt()));
217+
Assert.assertEquals("dujie", (rowData.getField(4).asString()));
218+
Assert.assertEquals("char", (rowData.getField(5).asString()));
219+
Assert.assertEquals("char", (rowData.getField(6).asString()));
220+
Assert.assertEquals("tiezhu", (rowData.getField(7).asString()));
221+
Assert.assertEquals("abcdefg", (rowData.getField(8).asString()));
222+
Assert.assertEquals(2, (int) (rowData.getField(9).asInt()));
223+
Assert.assertEquals(1234L, (long) (rowData.getField(10).asLong()));
224+
Assert.assertEquals(12345L, (long) (rowData.getField(11).asLong()));
225+
Assert.assertEquals("2.22", (rowData.getField(12).asFloat().toString()));
226+
Assert.assertEquals("2.22", (rowData.getField(13).asDouble().toString()));
227+
Assert.assertEquals("2.22", (rowData.getField(14).asBigDecimal().toString()));
228+
Assert.assertEquals("2022-08-22", (rowData.getField(15).asString()));
229+
Assert.assertEquals(Time.valueOf("18:00:00"), (rowData.getField(16).asTime()));
230+
Assert.assertEquals(
231+
Timestamp.valueOf("2022-08-12 18:00:00"), (rowData.getField(17).asTimestamp()));
232+
Assert.assertEquals(
233+
Timestamp.valueOf("2022-08-12 18:00:00"), (rowData.getField(18).asTimestamp()));
234+
235+
converter.toExternal(rowData, null);
236+
HashMap<String, Integer> headers = new HashMap<>(8);
237+
headers.put("schema", 0);
238+
rowData.setHeader(headers);
239+
converter.toExternal(rowData, null);
240+
}
241+
}

0 commit comments

Comments
 (0)