88
99import java .io .Serial ;
1010import java .io .Serializable ;
11+ import java .lang .reflect .InvocationTargetException ;
1112import java .nio .charset .StandardCharsets ;
1213import java .util .ArrayList ;
1314import java .util .Collections ;
1415import java .util .List ;
1516import java .util .Objects ;
1617import java .util .Properties ;
17- import java .util .Set ;
1818import java .util .concurrent .ExecutionException ;
1919
2020import com .sngular .kloadgen .exception .KLoadGenException ;
2121import com .sngular .kloadgen .loadgen .BaseLoadGenerator ;
2222import com .sngular .kloadgen .model .HeaderMapping ;
2323import com .sngular .kloadgen .randomtool .generator .StatelessGeneratorTool ;
24- import com .sngular .kloadgen .serializer .AvroSerializer ;
2524import com .sngular .kloadgen .serializer .EnrichedRecord ;
26- import com .sngular .kloadgen .serializer .ProtobufSerializer ;
2725import com .sngular .kloadgen .util .ProducerKeysHelper ;
2826import com .sngular .kloadgen .util .PropsKeysHelper ;
2927import io .apicurio .registry .serde .Legacy4ByteIdHandler ;
3028import io .apicurio .registry .serde .SerdeConfig ;
31- import io .apicurio .registry .serde .avro .AvroKafkaSerializer ;
3229import org .apache .commons .lang3 .StringUtils ;
3330import org .apache .jmeter .config .Arguments ;
3431import org .apache .jmeter .protocol .java .sampler .AbstractJavaSamplerClient ;
4138import org .apache .kafka .clients .producer .ProducerRecord ;
4239import org .apache .kafka .clients .producer .RecordMetadata ;
4340import org .apache .kafka .common .KafkaException ;
44- import org .apache .kafka .common .serialization .StringSerializer ;
41+ import org .apache .kafka .common .serialization .Serializer ;
4542
4643public final class KafkaProducerSampler extends AbstractJavaSamplerClient implements Serializable {
4744
4845 private static final String TEMPLATE = "Topic: %s, partition: %s, offset: %s" ;
4946
50- private static final Set <String > SERIALIZER_SET = Set .of (AvroSerializer .class .getName (), ProtobufSerializer .class .getName ());
51-
5247 @ Serial
5348 private static final long serialVersionUID = 1L ;
5449
@@ -73,42 +68,49 @@ public final class KafkaProducerSampler extends AbstractJavaSamplerClient implem
7368 @ Override
7469 public void setupTest (final JavaSamplerContext context ) {
7570 props = JMeterContextService .getContext ().getProperties ();
76- try {
71+
72+ if (context .getJMeterVariables ().get (PropsKeysHelper .VALUE_SCHEMA ) == null ) {
73+ generator = SamplerUtil .configureKeyGenerator (props );
74+ } else {
7775 generator = SamplerUtil .configureValueGenerator (props );
76+ }
7877
79- if ("true" .equals (JavaSamplerContext .getJMeterVariables ().get (PropsKeysHelper .SCHEMA_KEYED_MESSAGE_KEY ))
80- || "true" .equals (JavaSamplerContext .getJMeterVariables ().get (PropsKeysHelper .SIMPLE_KEYED_MESSAGE_KEY ))) {
81- keyMessageFlag = true ;
82- if (!Objects .isNull (JMeterContextService .getContext ().getVariables ().get (PropsKeysHelper .KEY_SUBJECT_NAME ))) {
83- keyGenerator = SamplerUtil .configureKeyGenerator (props );
84- } else {
85- msgKeyType = props .getProperty (PropsKeysHelper .MESSAGE_KEY_KEY_TYPE );
86- msgKeyValue = PropsKeysHelper .MSG_KEY_VALUE .equalsIgnoreCase (props .getProperty (PropsKeysHelper .MESSAGE_KEY_KEY_VALUE ))
87- ? Collections .emptyList () : Collections .singletonList (props .getProperty (PropsKeysHelper .MESSAGE_KEY_KEY_VALUE ));
88- }
78+ if ("true" .equals (context .getJMeterVariables ().get (PropsKeysHelper .SCHEMA_KEYED_MESSAGE_KEY ))
79+ || "true" .equals (context .getJMeterVariables ().get (PropsKeysHelper .SIMPLE_KEYED_MESSAGE_KEY ))) {
80+ keyMessageFlag = true ;
81+ if (!Objects .isNull (JMeterContextService .getContext ().getVariables ().get (PropsKeysHelper .KEY_SUBJECT_NAME ))) {
82+ keyGenerator = SamplerUtil .configureKeyGenerator (props );
8983 } else {
90- props .put (ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG , ProducerKeysHelper .KEY_SERIALIZER_CLASS_CONFIG_DEFAULT );
91- }
92-
93- if (context .getParameter (ProducerKeysHelper .APICURIO_LEGACY_ID_HANDLER ).equals (ProducerKeysHelper .FLAG_YES )) {
94- props .put (SerdeConfig .ID_HANDLER , Legacy4ByteIdHandler .class .getName ());
95- }
96- if (context .getParameter (ProducerKeysHelper .APICURIO_ENABLE_HEADERS_ID ).equals (ProducerKeysHelper .FLAG_NO )) {
97- props .put (SerdeConfig .ENABLE_HEADERS , "false" );
84+ msgKeyType = props .getProperty (PropsKeysHelper .MESSAGE_KEY_KEY_TYPE );
85+ msgKeyValue = PropsKeysHelper .MSG_KEY_VALUE .equalsIgnoreCase (props .getProperty (PropsKeysHelper .MESSAGE_KEY_KEY_VALUE ))
86+ ? Collections .emptyList () : Collections .singletonList (props .getProperty (PropsKeysHelper .MESSAGE_KEY_KEY_VALUE ));
9887 }
88+ } else {
89+ props .put (ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG , ProducerKeysHelper .KEY_SERIALIZER_CLASS_CONFIG_DEFAULT );
90+ }
9991
100- topic = context .getParameter (ProducerKeysHelper .KAFKA_TOPIC_CONFIG );
92+ if (context .getParameter (ProducerKeysHelper .APICURIO_LEGACY_ID_HANDLER ).equals (ProducerKeysHelper .FLAG_YES )) {
93+ props .put (SerdeConfig .ID_HANDLER , Legacy4ByteIdHandler .class .getName ());
94+ }
95+ if (context .getParameter (ProducerKeysHelper .APICURIO_ENABLE_HEADERS_ID ).equals (ProducerKeysHelper .FLAG_NO )) {
96+ props .put (SerdeConfig .ENABLE_HEADERS , "false" );
97+ }
10198
99+ topic = context .getParameter (ProducerKeysHelper .KAFKA_TOPIC_CONFIG );
100+ try {
102101
103102 props .put (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG , context .getParameter (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG ));
104103 props .put (ProducerConfig .CLIENT_ID_CONFIG , context .getParameter (ProducerConfig .CLIENT_ID_CONFIG ));
105104 props .putIfAbsent (ProducerConfig .ACKS_CONFIG , "all" );
106- props .putIfAbsent (ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG , StringSerializer . class . getName () );
107- props .putIfAbsent (ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG , AvroKafkaSerializer . class . getName () );
105+ props .putIfAbsent (ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG , ProducerKeysHelper . KEY_SERIALIZER_CLASS_CONFIG_DEFAULT );
106+ props .putIfAbsent (ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG , ProducerKeysHelper . VALUE_SERIALIZER_CLASS_CONFIG_DEFAULT );
108107
109- producer = new KafkaProducer <>(props );
110- } catch (final KafkaException ex ) {
108+ producer = new KafkaProducer <>(props , (Serializer ) Class .forName ((String ) props .get (ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG )).getConstructor ().newInstance (),
109+ (Serializer ) Class .forName ((String ) props .get (ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG )).getConstructor ().newInstance ());
110+ } catch (final KafkaException | ClassNotFoundException ex ) {
111111 getNewLogger ().error (ex .getMessage (), ex );
112+ } catch (InvocationTargetException | NoSuchMethodException | InstantiationException | IllegalAccessException e ) {
113+ throw new KLoadGenException (e );
112114 }
113115 }
114116
@@ -135,7 +137,7 @@ public SampleResult runTest(final JavaSamplerContext javaSamplerContext) {
135137
136138 if (Objects .nonNull (messageVal )) {
137139 try {
138- final var producerRecord = getProducerRecord (messageVal , enrichedKeyFlag (), enrichedValueFlag () );
140+ final var producerRecord = getProducerRecord (messageVal );
139141 final var headersSB = new ArrayList <>(SamplerUtil .populateHeaders (kafkaHeaders , producerRecord ));
140142
141143 sampleResult .setRequestHeaders (StringUtils .join (headersSB , "," ));
@@ -169,30 +171,26 @@ private List<HeaderMapping> safeGetKafkaHeaders(final JMeterContext jmeterContex
169171 return headerMappingList ;
170172 }
171173
172- private ProducerRecord <Object , Object > getProducerRecord (final EnrichedRecord messageVal , final boolean keyFlag , final boolean valueFlag ) {
174+ private ProducerRecord <Object , Object > getProducerRecord (final EnrichedRecord messageVal ) {
173175 final ProducerRecord <Object , Object > producerRecord ;
176+
177+ final String keySerializer = (String ) props .get (ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG );
178+ final String valueSerializer = (String ) props .get (ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG );
179+
174180 if (keyMessageFlag ) {
175181 if (Objects .isNull (keyGenerator )) {
176- final var key = statelessGeneratorTool .generateObject ("key" , msgKeyType , 0 , msgKeyValue ). toString () ;
177- producerRecord = new ProducerRecord <>(topic , key , getObject (messageVal , valueFlag ));
182+ final var key = statelessGeneratorTool .generateObject ("key" , msgKeyType , 0 , msgKeyValue );
183+ producerRecord = new ProducerRecord <>(topic , key , getObject (messageVal , valueSerializer ));
178184 } else {
179185 final var key = keyGenerator .nextMessage ();
180- producerRecord = new ProducerRecord <>(topic , getObject (key , keyFlag ), getObject (messageVal , valueFlag ));
186+ producerRecord = new ProducerRecord <>(topic , getObject (key , keySerializer ), getObject (messageVal , valueSerializer ));
181187 }
182188 } else {
183- producerRecord = new ProducerRecord <>(topic , getObject (messageVal , valueFlag ));
189+ producerRecord = new ProducerRecord <>(topic , getObject (messageVal , valueSerializer ), getObject ( messageVal , valueSerializer ));
184190 }
185191 return producerRecord ;
186192 }
187193
188- private Boolean enrichedKeyFlag () {
189- return SERIALIZER_SET .contains (props .get (ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG ).toString ());
190- }
191-
192- private Boolean enrichedValueFlag () {
193- return SERIALIZER_SET .contains (props .get (ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG ).toString ());
194- }
195-
196194 private void fillSamplerResult (final ProducerRecord <Object , Object > producerRecord , final SampleResult sampleResult ) {
197195 final String result = "key: "
198196 + producerRecord .key ()
@@ -220,8 +218,8 @@ private String prettyPrint(final RecordMetadata recordMetadata) {
220218 return String .format (TEMPLATE , recordMetadata .topic (), recordMetadata .partition (), recordMetadata .offset ());
221219 }
222220
223- private Object getObject (final EnrichedRecord messageVal , final boolean isKloadSerializer ) {
224- return isKloadSerializer ? messageVal : messageVal .getGenericRecord ();
221+ private Object getObject (final EnrichedRecord messageVal , final String serializer ) {
222+ return ( serializer . contains ( "com.sngular.kloadgen" ) && ! serializer . contains ( "Generic" )) ? messageVal : messageVal .getGenericRecord ();
225223 }
226224}
227225
0 commit comments