3131
3232import org .apache .kafka .connect .data .Schema ;
3333import org .apache .kafka .connect .source .SourceRecord ;
34+ import org .junit .After ;
3435import org .junit .Test ;
3536
3637public class MQSourceTaskIT extends AbstractJMSContextIT {
3738
39+ private MQSourceTask connectTask = null ;
40+
41+ @ After
42+ public void cleanup () throws InterruptedException {
43+ SourceTaskStopper stopper = new SourceTaskStopper (connectTask );
44+ stopper .run ();
45+ }
46+
47+
3848 private static final String MQ_QUEUE = "DEV.QUEUE.1" ;
3949
4050 private Map <String , String > createDefaultConnectorProperties () {
@@ -51,42 +61,42 @@ private Map<String, String> createDefaultConnectorProperties() {
5161
5262 @ Test
5363 public void verifyJmsTextMessages () throws Exception {
54- MQSourceTask newConnectTask = new MQSourceTask ();
64+ connectTask = new MQSourceTask ();
5565
5666 Map <String , String > connectorConfigProps = createDefaultConnectorProperties ();
5767 connectorConfigProps .put ("mq.message.body.jms" , "true" );
5868 connectorConfigProps .put ("mq.record.builder" , "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder" );
5969
60- newConnectTask .start (connectorConfigProps );
70+ connectTask .start (connectorConfigProps );
6171
6272 TextMessage message1 = getJmsContext ().createTextMessage ("hello" );
6373 TextMessage message2 = getJmsContext ().createTextMessage ("world" );
6474 putAllMessagesToQueue (MQ_QUEUE , Arrays .asList (message1 , message2 ));
6575
66- List <SourceRecord > kafkaMessages = newConnectTask .poll ();
76+ List <SourceRecord > kafkaMessages = connectTask .poll ();
6777 assertEquals (2 , kafkaMessages .size ());
6878 for (SourceRecord kafkaMessage : kafkaMessages ) {
6979 assertNull (kafkaMessage .key ());
7080 assertNull (kafkaMessage .valueSchema ());
81+
82+ connectTask .commitRecord (kafkaMessage );
7183 }
7284
7385 assertEquals ("hello" , kafkaMessages .get (0 ).value ());
7486 assertEquals ("world" , kafkaMessages .get (1 ).value ());
75-
76- newConnectTask .stop ();
7787 }
7888
7989
8090
8191 @ Test
8292 public void verifyJmsJsonMessages () throws Exception {
83- MQSourceTask newConnectTask = new MQSourceTask ();
93+ connectTask = new MQSourceTask ();
8494
8595 Map <String , String > connectorConfigProps = createDefaultConnectorProperties ();
8696 connectorConfigProps .put ("mq.message.body.jms" , "true" );
8797 connectorConfigProps .put ("mq.record.builder" , "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder" );
8898
89- newConnectTask .start (connectorConfigProps );
99+ connectTask .start (connectorConfigProps );
90100
91101 List <Message > messages = new ArrayList <>();
92102 for (int i = 0 ; i < 5 ; i ++) {
@@ -97,7 +107,7 @@ public void verifyJmsJsonMessages() throws Exception {
97107 }
98108 putAllMessagesToQueue (MQ_QUEUE , messages );
99109
100- List <SourceRecord > kafkaMessages = newConnectTask .poll ();
110+ List <SourceRecord > kafkaMessages = connectTask .poll ();
101111 assertEquals (5 , kafkaMessages .size ());
102112 for (int i = 0 ; i < 5 ; i ++) {
103113 SourceRecord kafkaMessage = kafkaMessages .get (i );
@@ -106,23 +116,23 @@ public void verifyJmsJsonMessages() throws Exception {
106116
107117 Map <?, ?> value = (Map <?, ?>) kafkaMessage .value ();
108118 assertEquals (Long .valueOf (i ), value .get ("i" ));
109- }
110119
111- newConnectTask .stop ();
120+ connectTask .commitRecord (kafkaMessage );
121+ }
112122 }
113123
114124
115125
116126 @ Test
117127 public void verifyJmsMessageHeaders () throws Exception {
118- MQSourceTask newConnectTask = new MQSourceTask ();
128+ connectTask = new MQSourceTask ();
119129
120130 Map <String , String > connectorConfigProps = createDefaultConnectorProperties ();
121131 connectorConfigProps .put ("mq.message.body.jms" , "true" );
122132 connectorConfigProps .put ("mq.record.builder" , "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder" );
123133 connectorConfigProps .put ("mq.jms.properties.copy.to.kafka.headers" , "true" );
124134
125- newConnectTask .start (connectorConfigProps );
135+ connectTask .start (connectorConfigProps );
126136
127137 TextMessage message = getJmsContext ().createTextMessage ("helloworld" );
128138 message .setStringProperty ("teststring" , "myvalue" );
@@ -131,7 +141,7 @@ public void verifyJmsMessageHeaders() throws Exception {
131141
132142 putAllMessagesToQueue (MQ_QUEUE , Arrays .asList (message ));
133143
134- List <SourceRecord > kafkaMessages = newConnectTask .poll ();
144+ List <SourceRecord > kafkaMessages = connectTask .poll ();
135145 assertEquals (1 , kafkaMessages .size ());
136146 SourceRecord kafkaMessage = kafkaMessages .get (0 );
137147 assertNull (kafkaMessage .key ());
@@ -143,21 +153,21 @@ public void verifyJmsMessageHeaders() throws Exception {
143153 assertEquals ("11" , kafkaMessage .headers ().lastWithName ("volume" ).value ());
144154 assertEquals ("42.0" , kafkaMessage .headers ().lastWithName ("decimalmeaning" ).value ());
145155
146- newConnectTask . stop ( );
156+ connectTask . commitRecord ( kafkaMessage );
147157 }
148158
149159
150160
151161 @ Test
152162 public void verifyMessageBatchIndividualCommits () throws Exception {
153- MQSourceTask newConnectTask = new MQSourceTask ();
163+ connectTask = new MQSourceTask ();
154164
155165 Map <String , String > connectorConfigProps = createDefaultConnectorProperties ();
156166 connectorConfigProps .put ("mq.message.body.jms" , "true" );
157167 connectorConfigProps .put ("mq.record.builder" , "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder" );
158168 connectorConfigProps .put ("mq.batch.size" , "10" );
159169
160- newConnectTask .start (connectorConfigProps );
170+ connectTask .start (connectorConfigProps );
161171
162172 List <Message > messages = new ArrayList <>();
163173 for (int i = 1 ; i <= 35 ; i ++) {
@@ -169,49 +179,47 @@ public void verifyMessageBatchIndividualCommits() throws Exception {
169179
170180 List <SourceRecord > kafkaMessages ;
171181
172- kafkaMessages = newConnectTask .poll ();
182+ kafkaMessages = connectTask .poll ();
173183 assertEquals (10 , kafkaMessages .size ());
174184 for (SourceRecord kafkaMessage : kafkaMessages ) {
175185 assertEquals ("batch message " + (nextExpectedMessage ++), kafkaMessage .value ());
176- newConnectTask .commitRecord (kafkaMessage );
186+ connectTask .commitRecord (kafkaMessage );
177187 }
178188
179- kafkaMessages = newConnectTask .poll ();
189+ kafkaMessages = connectTask .poll ();
180190 assertEquals (10 , kafkaMessages .size ());
181191 for (SourceRecord kafkaMessage : kafkaMessages ) {
182192 assertEquals ("batch message " + (nextExpectedMessage ++), kafkaMessage .value ());
183- newConnectTask .commitRecord (kafkaMessage );
193+ connectTask .commitRecord (kafkaMessage );
184194 }
185195
186- kafkaMessages = newConnectTask .poll ();
196+ kafkaMessages = connectTask .poll ();
187197 assertEquals (10 , kafkaMessages .size ());
188198 for (SourceRecord kafkaMessage : kafkaMessages ) {
189199 assertEquals ("batch message " + (nextExpectedMessage ++), kafkaMessage .value ());
190- newConnectTask .commitRecord (kafkaMessage );
200+ connectTask .commitRecord (kafkaMessage );
191201 }
192202
193- kafkaMessages = newConnectTask .poll ();
203+ kafkaMessages = connectTask .poll ();
194204 assertEquals (5 , kafkaMessages .size ());
195205 for (SourceRecord kafkaMessage : kafkaMessages ) {
196206 assertEquals ("batch message " + (nextExpectedMessage ++), kafkaMessage .value ());
197- newConnectTask .commitRecord (kafkaMessage );
207+ connectTask .commitRecord (kafkaMessage );
198208 }
199-
200- newConnectTask .stop ();
201209 }
202210
203211
204212
205213 @ Test
206214 public void verifyMessageBatchGroupCommits () throws Exception {
207- MQSourceTask newConnectTask = new MQSourceTask ();
215+ connectTask = new MQSourceTask ();
208216
209217 Map <String , String > connectorConfigProps = createDefaultConnectorProperties ();
210218 connectorConfigProps .put ("mq.message.body.jms" , "true" );
211219 connectorConfigProps .put ("mq.record.builder" , "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder" );
212220 connectorConfigProps .put ("mq.batch.size" , "10" );
213221
214- newConnectTask .start (connectorConfigProps );
222+ connectTask .start (connectorConfigProps );
215223
216224 List <Message > messages = new ArrayList <>();
217225 for (int i = 1 ; i <= 35 ; i ++) {
@@ -221,46 +229,48 @@ public void verifyMessageBatchGroupCommits() throws Exception {
221229
222230 List <SourceRecord > kafkaMessages ;
223231
224- kafkaMessages = newConnectTask .poll ();
232+ kafkaMessages = connectTask .poll ();
225233 assertEquals (10 , kafkaMessages .size ());
226- newConnectTask .commit ();
227- newConnectTask .commit ();
234+ for (SourceRecord m : kafkaMessages ) {
235+ connectTask .commitRecord (m );
236+ }
228237
229- kafkaMessages = newConnectTask .poll ();
238+ kafkaMessages = connectTask .poll ();
230239 assertEquals (10 , kafkaMessages .size ());
231- newConnectTask .commit ();
232- newConnectTask .commit ();
240+ for (SourceRecord m : kafkaMessages ) {
241+ connectTask .commitRecord (m );
242+ }
233243
234- kafkaMessages = newConnectTask .poll ();
244+ kafkaMessages = connectTask .poll ();
235245 assertEquals (10 , kafkaMessages .size ());
236- newConnectTask .commit ();
237- newConnectTask .commit ();
246+ for (SourceRecord m : kafkaMessages ) {
247+ connectTask .commitRecord (m );
248+ }
238249
239- kafkaMessages = newConnectTask .poll ();
250+ kafkaMessages = connectTask .poll ();
240251 assertEquals (5 , kafkaMessages .size ());
241- newConnectTask .commit ();
242- newConnectTask .commit ();
243-
244- newConnectTask .stop ();
252+ for (SourceRecord m : kafkaMessages ) {
253+ connectTask .commitRecord (m );
254+ }
245255 }
246256
247257
248258
249259 @ Test
250260 public void verifyMessageIdAsKey () throws Exception {
251- MQSourceTask newConnectTask = new MQSourceTask ();
261+ connectTask = new MQSourceTask ();
252262
253263 Map <String , String > connectorConfigProps = createDefaultConnectorProperties ();
254264 connectorConfigProps .put ("mq.message.body.jms" , "true" );
255265 connectorConfigProps .put ("mq.record.builder" , "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder" );
256266 connectorConfigProps .put ("mq.record.builder.key.header" , "JMSMessageID" );
257267
258- newConnectTask .start (connectorConfigProps );
268+ connectTask .start (connectorConfigProps );
259269
260270 TextMessage message = getJmsContext ().createTextMessage ("testmessage" );
261271 putAllMessagesToQueue (MQ_QUEUE , Arrays .asList (message ));
262272
263- List <SourceRecord > kafkaMessages = newConnectTask .poll ();
273+ List <SourceRecord > kafkaMessages = connectTask .poll ();
264274 assertEquals (1 , kafkaMessages .size ());
265275
266276 SourceRecord kafkaMessage = kafkaMessages .get (0 );
@@ -270,62 +280,62 @@ public void verifyMessageIdAsKey() throws Exception {
270280
271281 assertEquals ("testmessage" , kafkaMessage .value ());
272282
273- newConnectTask . stop ( );
283+ connectTask . commitRecord ( kafkaMessage );
274284 }
275285
276286
277287
278288 @ Test
279289 public void verifyCorrelationIdAsKey () throws Exception {
280- MQSourceTask newConnectTask = new MQSourceTask ();
290+ connectTask = new MQSourceTask ();
281291
282292 Map <String , String > connectorConfigProps = createDefaultConnectorProperties ();
283293 connectorConfigProps .put ("mq.message.body.jms" , "true" );
284294 connectorConfigProps .put ("mq.record.builder" , "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder" );
285295 connectorConfigProps .put ("mq.record.builder.key.header" , "JMSCorrelationID" );
286296
287- newConnectTask .start (connectorConfigProps );
297+ connectTask .start (connectorConfigProps );
288298
289299 TextMessage message1 = getJmsContext ().createTextMessage ("first message" );
290300 message1 .setJMSCorrelationID ("verifycorrel" );
291301 TextMessage message2 = getJmsContext ().createTextMessage ("second message" );
292302 message2 .setJMSCorrelationID ("ID:5fb4a18030154fe4b09a1dfe8075bc101dfe8075bc104fe4" );
293303 putAllMessagesToQueue (MQ_QUEUE , Arrays .asList (message1 , message2 ));
294304
295- List <SourceRecord > kafkaMessages = newConnectTask .poll ();
305+ List <SourceRecord > kafkaMessages = connectTask .poll ();
296306 assertEquals (2 , kafkaMessages .size ());
297307
298308 SourceRecord kafkaMessage1 = kafkaMessages .get (0 );
299309 assertEquals ("verifycorrel" , kafkaMessage1 .key ());
300310 assertEquals (Schema .OPTIONAL_STRING_SCHEMA , kafkaMessage1 .keySchema ());
301311 assertEquals ("first message" , kafkaMessage1 .value ());
312+ connectTask .commitRecord (kafkaMessage1 );
302313
303314 SourceRecord kafkaMessage2 = kafkaMessages .get (1 );
304315 assertEquals ("5fb4a18030154fe4b09a1dfe8075bc101dfe8075bc104fe4" , kafkaMessage2 .key ());
305316 assertEquals (Schema .OPTIONAL_STRING_SCHEMA , kafkaMessage2 .keySchema ());
306317 assertEquals ("second message" , kafkaMessage2 .value ());
307-
308- newConnectTask .stop ();
318+ connectTask .commitRecord (kafkaMessage2 );
309319 }
310320
311321
312322
313323 @ Test
314324 public void verifyCorrelationIdBytesAsKey () throws Exception {
315- MQSourceTask newConnectTask = new MQSourceTask ();
325+ connectTask = new MQSourceTask ();
316326
317327 Map <String , String > connectorConfigProps = createDefaultConnectorProperties ();
318328 connectorConfigProps .put ("mq.message.body.jms" , "true" );
319329 connectorConfigProps .put ("mq.record.builder" , "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder" );
320330 connectorConfigProps .put ("mq.record.builder.key.header" , "JMSCorrelationIDAsBytes" );
321331
322- newConnectTask .start (connectorConfigProps );
332+ connectTask .start (connectorConfigProps );
323333
324334 TextMessage message = getJmsContext ().createTextMessage ("testmessagewithcorrelbytes" );
325335 message .setJMSCorrelationID ("verifycorrelbytes" );
326336 putAllMessagesToQueue (MQ_QUEUE , Arrays .asList (message ));
327337
328- List <SourceRecord > kafkaMessages = newConnectTask .poll ();
338+ List <SourceRecord > kafkaMessages = connectTask .poll ();
329339 assertEquals (1 , kafkaMessages .size ());
330340
331341 SourceRecord kafkaMessage = kafkaMessages .get (0 );
@@ -334,26 +344,26 @@ public void verifyCorrelationIdBytesAsKey() throws Exception {
334344
335345 assertEquals ("testmessagewithcorrelbytes" , kafkaMessage .value ());
336346
337- newConnectTask . stop ( );
347+ connectTask . commitRecord ( kafkaMessage );
338348 }
339349
340350
341351
342352 @ Test
343353 public void verifyDestinationAsKey () throws Exception {
344- MQSourceTask newConnectTask = new MQSourceTask ();
354+ connectTask = new MQSourceTask ();
345355
346356 Map <String , String > connectorConfigProps = createDefaultConnectorProperties ();
347357 connectorConfigProps .put ("mq.message.body.jms" , "true" );
348358 connectorConfigProps .put ("mq.record.builder" , "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder" );
349359 connectorConfigProps .put ("mq.record.builder.key.header" , "JMSDestination" );
350360
351- newConnectTask .start (connectorConfigProps );
361+ connectTask .start (connectorConfigProps );
352362
353363 TextMessage message = getJmsContext ().createTextMessage ("testmessagewithdest" );
354364 putAllMessagesToQueue (MQ_QUEUE , Arrays .asList (message ));
355365
356- List <SourceRecord > kafkaMessages = newConnectTask .poll ();
366+ List <SourceRecord > kafkaMessages = connectTask .poll ();
357367 assertEquals (1 , kafkaMessages .size ());
358368
359369 SourceRecord kafkaMessage = kafkaMessages .get (0 );
@@ -362,6 +372,6 @@ public void verifyDestinationAsKey() throws Exception {
362372
363373 assertEquals ("testmessagewithdest" , kafkaMessage .value ());
364374
365- newConnectTask . stop ( );
375+ connectTask . commitRecord ( kafkaMessage );
366376 }
367377}
0 commit comments