11package com .ing .engine .commands .kafka ;
22
3- import static com .ing .engine .commands .browser .Command .before ;
43import com .ing .engine .commands .browser .General ;
54import com .ing .engine .core .CommandControl ;
65import com .ing .engine .core .Control ;
@@ -48,15 +47,6 @@ public KafkaOperations(CommandControl cc) {
4847 super (cc );
4948 }
5049
51- private static final Map <String , String > serializerMap = new HashMap <>();
52-
53- static {
54- serializerMap .put ("string" , StringSerializer .class .getName ());
55- serializerMap .put ("json" , StringSerializer .class .getName ());
56- serializerMap .put ("bytearray" , ByteArraySerializer .class .getName ());
57- serializerMap .put ("avro" , KafkaAvroSerializer .class .getName ());
58- }
59-
6050 @ Action (object = ObjectType .KAFKA , desc = "Add Kafka Header" , input = InputType .YES )
6151 public void addKafkaHeader () {
6252 try {
@@ -178,7 +168,7 @@ public void setKey() {
178168 Report .updateTestLog (Action , "Error in setting Key: " + "\n " + ex .getMessage (), Status .DEBUG );
179169 }
180170 }
181-
171+
182172 @ Action (object = ObjectType .KAFKA , desc = "Set Consumer GroupId" , input = InputType .YES , condition = InputType .NO )
183173 public void setConsumerGroupId () {
184174 try {
@@ -193,7 +183,11 @@ public void setConsumerGroupId() {
193183 @ Action (object = ObjectType .KAFKA , desc = "Set Partition" , input = InputType .YES , condition = InputType .NO )
194184 public void setPartition () {
195185 try {
196- kafkaPartition .put (key , Integer .valueOf (Data ));
186+ if (Data .toLowerCase ().equals ("null" )) {
187+ kafkaPartition .put (key , null );
188+ } else {
189+ kafkaPartition .put (key , Integer .valueOf (Data ));
190+ }
197191 Report .updateTestLog (Action , "Partition has been set successfully" , Status .DONE );
198192 } catch (NumberFormatException ex ) {
199193 Logger .getLogger (this .getClass ().getName ()).log (Level .SEVERE , "Exception during Partition setup" , ex );
@@ -234,6 +228,17 @@ public void setValueSerializer() {
234228 }
235229 }
236230
231+ @ Action (object = ObjectType .KAFKA , desc = "Set Value Deserializer" , input = InputType .YES , condition = InputType .NO )
232+ public void setValueDeserializer () {
233+ try {
234+ kafkaValueDeserializer .put (key , Data );
235+ Report .updateTestLog (Action , "Value Deserializer has been set successfully" , Status .DONE );
236+ } catch (Exception ex ) {
237+ Logger .getLogger (this .getClass ().getName ()).log (Level .SEVERE , "Exception during Value Deserializer setup" , ex );
238+ Report .updateTestLog (Action , "Error in setting Value Deserializer: " + "\n " + ex .getMessage (), Status .DEBUG );
239+ }
240+ }
241+
237242 @ Action (object = ObjectType .KAFKA , desc = "Produce Kafka Message" , input = InputType .YES , condition = InputType .NO )
238243 public void produceMessage () {
239244 try {
@@ -248,7 +253,7 @@ public void produceMessage() {
248253 produceMessage (kafkaProducerTopic .get (key ), kafkaPartition .get (key ), kafkaKey .get (key ), kafkaValue .get (key ), kafkaHeaders .get (key ));
249254 } else if (kafkaTimeStamp .get (key ) != null ) {
250255 produceMessage (kafkaProducerTopic .get (key ), kafkaPartition .get (key ), kafkaTimeStamp .get (key ), kafkaKey .get (key ), kafkaValue .get (key ));
251- } else if (kafkaPartition .get (key ) != null ) {
256+ } else if (kafkaPartition .containsKey (key )) {
252257 produceMessage (kafkaProducerTopic .get (key ), kafkaPartition .get (key ), kafkaKey .get (key ), kafkaValue .get (key ));
253258 } else if (kafkaKey .get (key ) != null ) {
254259 produceMessage (kafkaProducerTopic .get (key ), kafkaKey .get (key ), kafkaValue .get (key ));
@@ -266,17 +271,19 @@ public void produceMessage() {
266271 }
267272 }
268273
269- @ Action (object = ObjectType .QUEUE , desc = "Send Message" , input = InputType .NO , condition = InputType .NO )
274+ @ Action (object = ObjectType .KAFKA , desc = "Send Message" , input = InputType .NO , condition = InputType .NO )
270275 public void sendKafkaMessage () {
271276 try {
272277 createProducer (kafkaValueSerializer .get (key ));
273- before .put (key , Instant .now ());
278+ // before.put(key, Instant.now());
274279 kafkaProducer .get (key ).send (kafkaProducerRecord .get (key ));
275280 Report .updateTestLog (Action , "Record sent" , Status .DONE );
276281 kafkaProducer .get (key ).close ();
277282 } catch (Exception ex ) {
278283 Logger .getLogger (this .getClass ().getName ()).log (Level .SEVERE , "Exception while sending record" , ex );
279284 Report .updateTestLog (Action , "Error in sending record: " + "\n " + ex .getMessage (), Status .DEBUG );
285+ } finally {
286+ clearProducerDetails ();
280287 }
281288 }
282289
@@ -301,32 +308,26 @@ private void createProducer(String serializer) {
301308
302309 private void produceMessage (String topic , String value ) {
303310 kafkaProducerRecord .put (key , new ProducerRecord <>(topic , value ));
304- System .out .println ("Record Produced with topic '" + topic + "': " + value );
305311 }
306312
307- private void produceMessage (String topic , String key , String value ) {
308- kafkaProducerRecord .put (key , new ProducerRecord <>(topic , key , value ));
309- System .out .println ("Record Produced with topic '" + topic + "' with key '" + key + "': " + value );
313+ private void produceMessage (String topic , String kafkaKey , String value ) {
314+ kafkaProducerRecord .put (key , new ProducerRecord <>(topic , kafkaKey , value ));
310315 }
311316
312- private void produceMessage (String topic , int partition , String key , String value ) {
313- kafkaProducerRecord .put (key , new ProducerRecord <>(topic , partition , key , value ));
314- System .out .println ("Record Produced with topic '" + topic + "' to partition '" + partition + "' with key '" + key + "': " + value );
317+ private void produceMessage (String topic , Integer partition , String kafkaKey , String value ) {
318+ kafkaProducerRecord .put (key , new ProducerRecord <>(topic , partition , kafkaKey , value ));
315319 }
316320
317- private void produceMessage (String topic , int partition , long timestamp , String key , String value ) {
318- kafkaProducerRecord .put (key , new ProducerRecord <>(topic , partition , timestamp , key , value ));
319- System .out .println ("Record Produced with topic '" + topic + "' to partition '" + partition + "' with timestamp '" + timestamp + "' and key '" + key + "': " + value );
321+ private void produceMessage (String topic , Integer partition , long timestamp , String kafkaKey , String value ) {
322+ kafkaProducerRecord .put (key , new ProducerRecord <>(topic , partition , timestamp , kafkaKey , value ));
320323 }
321324
322- private void produceMessage (String topic , int partition , String key , String value , List <Header > headers ) {
323- kafkaProducerRecord .put (key , new ProducerRecord <>(topic , partition , key , value , headers ));
324- System .out .println ("Record Produced with topic '" + topic + "' to partition '" + partition + "' with headers '" + headers + "' and key '" + key + "': " + value );
325+ private void produceMessage (String topic , Integer partition , String kafkaKey , String value , List <Header > headers ) {
326+ kafkaProducerRecord .put (key , new ProducerRecord <>(topic , partition , kafkaKey , value , headers ));
325327 }
326328
327- private void produceMessage (String topic , int partition , long timestamp , String key , String value , List <Header > headers ) {
328- kafkaProducerRecord .put (key , new ProducerRecord <>(topic , partition , timestamp , key , value , headers ));
329- System .out .println ("Record Produced with topic '" + topic + "' to partition '" + partition + "' with timestamp '" + timestamp + "' and key '" + key + "': " + value + "' with headers '" + headers );
329+ private void produceMessage (String topic , Integer partition , long timestamp , String kafkaKey , String value , List <Header > headers ) {
330+ kafkaProducerRecord .put (key , new ProducerRecord <>(topic , partition , timestamp , kafkaKey , value , headers ));
330331 }
331332
332333 private String handleDataSheetVariables (String payloadstring ) {
@@ -359,6 +360,19 @@ private String handleuserDefinedVariables(String payloadstring) {
359360 return payloadstring ;
360361 }
361362
363+ private void clearProducerDetails () {
364+ kafkaKey .clear ();
365+ kafkaHeaders .clear ();
366+ kafkaProducerTopic .clear ();
367+ kafkaPartition .clear ();
368+ kafkaTimeStamp .clear ();
369+ kafkaKeySerializer .clear ();
370+ kafkaValue .clear ();
371+ kafkaValueSerializer .clear ();
372+ kafkaProducer .clear ();
373+ kafkaProducerRecord .clear ();
374+ }
375+
362376 public void createConsumer (String deserializer ) {
363377 Properties props = new Properties ();
364378
@@ -382,17 +396,18 @@ public void createConsumer(String deserializer) {
382396 kafkaConsumer .put (key , new KafkaConsumer <>(props ));
383397 }
384398
385- @ Action (object = ObjectType .KAFKA , desc = "Consume Kafka Message" , input = InputType .YES )
399+ @ Action (object = ObjectType .KAFKA , desc = "Consume Kafka Message" , input = InputType .NO )
386400 public void consumeKafkaMessage () {
387401
388402 createConsumer (kafkaValueDeserializer .get (key ));
389403 kafkaConsumer .get (key ).subscribe (Arrays .asList (kafkaConsumerTopic .get (key )));
390404 try {
391405 ConsumerRecord record = pollKafkaConsumer ();
392- if (record != null )
406+ if (record != null ) {
393407 Report .updateTestLog (Action , "Kafka message consumed successfully. " , Status .DONE );
394- else
408+ } else {
395409 Report .updateTestLog (Action , "Kafka message not received. " , Status .FAIL );
410+ }
396411 } catch (Exception e ) {
397412 e .printStackTrace ();
398413 Report .updateTestLog (Action , "Error while consuming Kafka message: " + e .getMessage (), Status .FAIL );
@@ -416,7 +431,7 @@ private ConsumerRecord<String, Object> pollKafkaConsumer() {
416431 return null ;
417432 }
418433
419- @ Action (object = ObjectType .KAFKA , desc = "Store XML tag In DataSheet " , input = InputType .YES , condition = InputType .YES )
434+ @ Action (object = ObjectType .KAFKA , desc = "Store XML tag In DataSheet " , input = InputType .YES , condition = InputType .NO )
420435 public void storeKafkaXMLtagInDataSheet () {
421436
422437 try {
@@ -591,8 +606,8 @@ public void storeKafkaJSONtagInDataSheet() {
591606 Status .DEBUG );
592607 }
593608 }
594-
595- @ Action (object = ObjectType .KAFKA , desc = "Store Response In DataSheet " , input = InputType .YES , condition = InputType .YES )
609+
610+ @ Action (object = ObjectType .KAFKA , desc = "Store Response In DataSheet " , input = InputType .YES , condition = InputType .NO )
596611 public void storeKafkaResponseInDataSheet () {
597612
598613 try {
@@ -622,5 +637,4 @@ public void storeKafkaResponseInDataSheet() {
622637 }
623638 }
624639
625-
626640}
0 commit comments