88import org .slf4j .Logger ;
99import org .slf4j .LoggerFactory ;
1010
11+ import java .nio .charset .StandardCharsets ;
1112import java .util .Properties ;
12- import java .util .concurrent .ExecutionException ;
1313
1414/**
1515 * Example application that uses the Kafka Clients API to produce messages.
1616 * Run with:
17- * java -javaagent:path/to/superstream-clients-1.0.0.jar -Dlogback.configurationFile=logback.xml -jar kafka-clients-example-1.0.0-jar-with-dependencies.jar
17+ * java -javaagent:path/to/superstream-clients-1.0.0.jar
18+ * -Dlogback.configurationFile=logback.xml -jar
19+ * kafka-clients-example-1.0.0-jar-with-dependencies.jar
1820 *
1921 * Prerequisites:
2022 * 1. A Kafka server with the following topics:
21- * - superstream.metadata_v1 - with a configuration message
22- * - superstream.clients - for client reports
23- * - example-topic - for test messages
23+ * - superstream.metadata_v1 - with a configuration message
24+ * - superstream.clients - for client reports
25+ * - example-topic - for test messages
2426 *
2527 * Environment variables:
26- * - KAFKA_BOOTSTRAP_SERVERS: The Kafka bootstrap servers (default: localhost:9092)
27- * - SUPERSTREAM_TOPICS_LIST: Comma-separated list of topics to optimize for (default: example-topic)
28+ * - KAFKA_BOOTSTRAP_SERVERS: The Kafka bootstrap servers (default:
29+ * localhost:9092)
30+ * - SUPERSTREAM_TOPICS_LIST: Comma-separated list of topics to optimize for
31+ * (default: example-topic)
2832 */
2933public class KafkaProducerExample {
3034 private static final Logger logger = LoggerFactory .getLogger (KafkaProducerExample .class );
@@ -33,61 +37,85 @@ public class KafkaProducerExample {
3337 private static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092" ;
3438
3539 private static final String CLIENT_ID = "superstream-example-producer" ;
36- private static final String COMPRESSION_TYPE = "gzip" ;
37- private static final Integer BATCH_SIZE = 16384 ;
40+ private static final String COMPRESSION_TYPE = "zstd" ; // Changed from gzip to snappy for better visibility
41+ private static final Integer BATCH_SIZE = 1048576 ; // 1MB batch size
3842
3943 private static final String TOPIC_NAME = "example-topic" ;
4044 private static final String MESSAGE_KEY = "test-key" ;
41- private static final String MESSAGE_VALUE = "Hello, Superstream!" ;
45+ // Create a larger message that will compress well
46+ private static final String MESSAGE_VALUE = generateLargeCompressibleMessage ();
4247
4348 public static void main (String [] args ) {
44- String bootstrapServers = System .getenv ("KAFKA_BOOTSTRAP_SERVERS" );
45- if (bootstrapServers == null || bootstrapServers .isEmpty ()) {
46- bootstrapServers = DEFAULT_BOOTSTRAP_SERVERS ;
47- }
48-
49- // Configure the producer
5049 Properties props = new Properties ();
51- props .put (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers );
50+ props .put (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG , DEFAULT_BOOTSTRAP_SERVERS );
5251 props .put ("client.id" , CLIENT_ID );
5352 props .put (ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG , StringSerializer .class .getName ());
5453 props .put (ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer .class .getName ());
55-
56- // Set some basic configuration
5754 props .put (ProducerConfig .COMPRESSION_TYPE_CONFIG , COMPRESSION_TYPE );
5855 props .put (ProducerConfig .BATCH_SIZE_CONFIG , BATCH_SIZE );
56+ props .put (ProducerConfig .LINGER_MS_CONFIG , 500 ); // Force batching by waiting 500ms
5957
60- logger .info ("Creating producer with bootstrap servers: {}" , bootstrapServers );
61- logger .info ("Original producer configuration:" );
62- props .forEach ((k , v ) -> logger .info (" {} = {}" , k , v ));
63-
58+ long recordCount = 50 ; // Number of messages to send
6459 try (Producer <String , String > producer = new KafkaProducer <>(props )) {
65- // The Superstream Agent should have intercepted the producer creation
66- // and potentially optimized the configuration
67-
68- // Log the actual configuration used by the producer
69- logger .info ("Actual producer configuration (after potential Superstream optimization):" );
70-
71- // Get the actual configuration from the producer via reflection
72- java .lang .reflect .Field configField = producer .getClass ().getDeclaredField ("producerConfig" );
73- configField .setAccessible (true );
74- org .apache .kafka .clients .producer .ProducerConfig actualConfig =
75- (org .apache .kafka .clients .producer .ProducerConfig ) configField .get (producer );
60+ while (true ) {
61+ // Send 50 large messages to see compression benefits
62+ logger .info ("Starting to send {} large messages..." , recordCount );
63+ for (int i = 1 ; i <= recordCount ; i ++) {
64+ String messageKey = MESSAGE_KEY + "-" + i ;
65+ String messageValue = MESSAGE_VALUE + "-" + i + "-" + System .currentTimeMillis ();
66+ producer .send (new ProducerRecord <>(TOPIC_NAME , messageKey , messageValue ));
67+ }
7668
77- logger .info (" compression.type = {}" , actualConfig .getString (ProducerConfig .COMPRESSION_TYPE_CONFIG ));
78- logger .info (" batch.size = {}" , actualConfig .getInt (ProducerConfig .BATCH_SIZE_CONFIG ));
79-
80- // Send a test message
81- logger .info ("Sending message to topic {}: key={}, value={}" , TOPIC_NAME , MESSAGE_KEY , MESSAGE_VALUE );
82- producer .send (new ProducerRecord <>(TOPIC_NAME , MESSAGE_KEY , MESSAGE_VALUE )).get ();
83- logger .info ("Message sent successfully!" );
84- } catch (InterruptedException e ) {
85- Thread .currentThread ().interrupt ();
86- logger .error ("Interrupted while sending message" , e );
87- } catch (ExecutionException e ) {
88- logger .error ("Error sending message" , e );
69+ logger .info ("All 50 large messages queued successfully! Adding a producer.flush() to send them all at once..." );
70+ producer .flush ();
71+ Thread .sleep (7000000 );
72+ logger .info ("Waking up and preparing to send the next batch of messages" );
73+ // return;
74+ }
8975 } catch (Exception e ) {
90- logger .error ("Unexpected error" , e );
76+ logger .error ("Error sending message" , e );
77+ }
78+ }
79+
80+ private static String generateLargeCompressibleMessage () {
81+ // Return a 1KB JSON string with repeating data that can be compressed well
82+ StringBuilder json = new StringBuilder ();
83+ json .append ("{\n " );
84+ json .append (" \" metadata\" : {\n " );
85+ json .append (" \" id\" : \" 12345\" ,\n " );
86+ json .append (" \" type\" : \" example\" ,\n " );
87+ json .append (" \" timestamp\" : 1635954438000\n " );
88+ json .append (" },\n " );
89+ json .append (" \" data\" : {\n " );
90+ json .append (" \" metrics\" : [\n " );
91+
92+ // Add repeating metrics data to reach ~1KB
93+ for (int i = 0 ; i < 15 ; i ++) {
94+ if (i > 0 ) json .append (",\n " );
95+ json .append (" {\n " );
96+ json .append (" \" name\" : \" metric" ).append (i ).append ("\" ,\n " );
97+ json .append (" \" value\" : " ).append (i * 10 ).append (",\n " );
98+ json .append (" \" tags\" : [\" tag1\" , \" tag2\" , \" tag3\" ],\n " );
99+ json .append (" \" properties\" : {\n " );
100+ json .append (" \" property1\" : \" value1\" ,\n " );
101+ json .append (" \" property2\" : \" value2\" \n " );
102+ json .append (" }\n " );
103+ json .append (" }" );
91104 }
105+
106+ json .append ("\n ]\n " );
107+ json .append (" },\n " );
108+ json .append (" \" config\" : {\n " );
109+ json .append (" \" sampling\" : \" full\" ,\n " );
110+ json .append (" \" retention\" : \" 30d\" ,\n " );
111+ json .append (" \" compression\" : true,\n " );
112+ json .append (" \" encryption\" : false\n " );
113+ json .append (" }\n " );
114+ json .append ("}" );
115+
116+ String result = json .toString ();
117+ logger .debug ("Generated compressible message of {} bytes" , result .getBytes (StandardCharsets .UTF_8 ).length );
118+
119+ return result ;
92120 }
93121}
0 commit comments