Skip to content

Commit 6cc6ea6

Browse files
update readme + examples
1 parent 060d0bd commit 6cc6ea6

File tree

5 files changed

+59
-21
lines changed

5 files changed

+59
-21
lines changed

.idea/runConfigurations/kafka_clients_example.xml

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ Works with any Java library that depends on `kafka-clients`, including:
2020
- Apache Kafka Clients
2121
- Spring Kafka
2222
- Alpakka Kafka (Akka Kafka)
23+
- Pekko Kafka (Apache Pekko)
2324
- Kafka Streams
2425
- Kafka Connect
2526
- Any custom wrapper around the Kafka Java client
@@ -115,6 +116,43 @@ public class KafkaConfig {
115116
}
116117
```
117118

119+
### Pekko/Akka Kafka Applications
120+
Pekko and Akka Kafka applications typically use immutable configuration maps internally, which prevents Superstream from applying optimizations. To enable Superstream optimizations with Pekko/Akka, you need to create the KafkaProducer manually with a mutable configuration.
121+
122+
**Native Pekko/Akka pattern (optimizations won't be applied)**:
123+
```java
124+
ProducerSettings<String, String> producerSettings = ProducerSettings
125+
.create(system, new StringSerializer(), new StringSerializer())
126+
.withBootstrapServers("localhost:9092");
127+
128+
Source.single(ProducerMessage.single(record))
129+
.via(Producer.flexiFlow(producerSettings))
130+
.runWith(Sink.ignore, system);
131+
```
132+
133+
**Superstream-optimized pattern**:
134+
```java
135+
// Add these lines to create a mutable producer
136+
Map<String, Object> configProps = new HashMap<>();
137+
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
138+
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
139+
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
140+
141+
org.apache.kafka.clients.producer.Producer<String, String> kafkaProducer = new KafkaProducer<>(configProps);
142+
143+
ProducerSettings<String, String> producerSettings = ProducerSettings
144+
.create(system, new StringSerializer(), new StringSerializer())
145+
.withProducer(kafkaProducer);
146+
147+
Source.single(ProducerMessage.single(record))
148+
.via(Producer.flexiFlow(producerSettings))
149+
.runWith(Sink.ignore, system);
150+
```
151+
152+
This approach ensures that:
153+
- All Pekko/Akka Kafka features remain available
154+
- Superstream can optimize the producer configuration
155+
118156
### Why This Matters
119157
The Superstream library needs to modify your producer's configuration to apply optimizations based on your cluster's characteristics. This includes adjusting settings like compression, batch size, and other performance parameters. When the configuration is immutable, these optimizations cannot be applied.
120158

examples/akka-kafka-example/src/main/java/ai/superstream/examples/AkkaKafkaExample.java

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
import akka.kafka.ProducerSettings;
66
import akka.kafka.javadsl.Producer;
77
import akka.stream.javadsl.Source;
8+
import akka.NotUsed;
9+
import org.apache.kafka.clients.producer.KafkaProducer;
810
import org.apache.kafka.clients.producer.ProducerConfig;
911
import org.apache.kafka.clients.producer.ProducerRecord;
1012
import org.apache.kafka.common.serialization.StringSerializer;
@@ -15,6 +17,7 @@
1517
import java.util.Map;
1618
import java.util.concurrent.CompletableFuture;
1719
import java.util.concurrent.TimeUnit;
20+
import java.util.Properties;
1821

1922
/**
2023
* Example application that uses Akka Kafka to produce messages.
@@ -47,26 +50,17 @@ public static void main(String[] args) {
4750
try {
4851
// Configure the producer
4952
Map<String, Object> configProps = new HashMap<>();
50-
configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
51-
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
52-
configProps.put(ProducerConfig.LINGER_MS_CONFIG, 0);
53+
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
54+
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
55+
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
5356

54-
logger.info("Creating Akka Kafka producer with bootstrap servers: {}", bootstrapServers);
55-
logger.info("Original producer configuration:");
56-
configProps.forEach((k, v) -> logger.info(" {} = {}", k, v));
57+
58+
org.apache.kafka.clients.producer.Producer<String, String> kafkaProducer = new KafkaProducer<>(configProps);
5759

58-
// Create producer settings
59-
// Create a new map with String values
60-
Map<String, String> stringProps = new HashMap<>();
61-
for (Map.Entry<String, Object> entry : configProps.entrySet()) {
62-
stringProps.put(entry.getKey(), entry.getValue().toString());
63-
}
64-
65-
// Use the string map with withProperties
60+
// Create producer settings with the producer
6661
ProducerSettings<String, String> producerSettings = ProducerSettings
6762
.create(system, new StringSerializer(), new StringSerializer())
68-
.withBootstrapServers(bootstrapServers)
69-
.withProperties(stringProps);
63+
.withProducer(kafkaProducer);
7064

7165
// Send a test message
7266
String topic = "example-topic";
@@ -80,9 +74,15 @@ public static void main(String[] args) {
8074
// Create a source with a single message and send it to Kafka
8175
CompletableFuture<Void> completionFuture = Source.single(ProducerMessage.single(record))
8276
.via(Producer.flexiFlow(producerSettings))
83-
.runForeach(result -> logger.info("Message sent: {}", result.toString()), system)
77+
.runForeach(result -> {
78+
if (result instanceof ProducerMessage.Results) {
79+
ProducerMessage.Results<String, String, NotUsed> res =
80+
(ProducerMessage.Results<String, String, NotUsed>) result;
81+
logger.info("Message sent successfully");
82+
}
83+
}, system)
8484
.toCompletableFuture()
85-
.thenApply(done -> null); // Convert CompletableFuture<Done> to CompletableFuture<Void>
85+
.thenApply(done -> null);
8686

8787
// Wait for the message to be sent
8888
completionFuture.get(10, TimeUnit.SECONDS);

superstream-clients/dependency-reduced-pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<groupId>ai.superstream</groupId>
55
<artifactId>superstream-clients</artifactId>
66
<name>Superstream Kafka Client Optimizer</name>
7-
<version>1.0.203</version>
7+
<version>1.0.204</version>
88
<description>A Java library that dynamically optimizes Kafka client configuration based on recommendations</description>
99
<url>https://github.com/superstreamlabs/superstream-clients-java</url>
1010
<developers>

superstream-clients/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>ai.superstream</groupId>
88
<artifactId>superstream-clients</artifactId>
9-
<version>1.0.203</version>
9+
<version>1.0.204</version>
1010
<packaging>jar</packaging>
1111

1212
<name>Superstream Kafka Client Optimizer</name>

0 commit comments

Comments
 (0)