Skip to content

Commit e4881c6

Browse files
Adding shadow producer mode
1 parent 71e71fb commit e4881c6

31 files changed

+3983
-516
lines changed

README.md

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,20 @@ Works with any Java library that depends on `kafka-clients`, including:
3636

3737
The library fully supports Java versions 11 through 21.
3838

39+
## Run Modes
40+
41+
Superstream operates in one of two modes per `KafkaProducer` instance:
42+
43+
- **INIT_CONFIG**: Superstream applies init-time configuration optimizations directly to the application's producer configuration before the producer is constructed. No shadow producer is created. The application calls run on the original producer.
44+
- **SHADOW**: Superstream initializes a shadow producer effectively replacing original producer and routes supported producer methods through it. After a learning period, a one-shot optimized configuration is applied to the shadow. The original producer remains in place; routing is transparent.
45+
46+
### Default Optimizations
47+
48+
When no topic-specific configuration is available (or during init-time defaults), Superstream applies:
49+
- `compression.type=zstd`
50+
- `batch.size=65536`
51+
- `linger.ms=5000` (skipped when `SUPERSTREAM_LATENCY_SENSITIVE=true`)
52+
3953
## Important: Producer Configuration Requirements
4054

4155
When initializing your Kafka producers, please ensure you pass the configuration as a mutable object. The Superstream library needs to modify the producer configuration to apply optimizations. The following initialization patterns are supported:
@@ -200,10 +214,6 @@ ENTRYPOINT ["java", "-javaagent:/app/superstream-agent.jar", "-jar", "/app/app.j
200214
Alternatively, you can use a multi-stage build to download the agent from Maven Central:
201215

202216

203-
### Required Environment Variables
204-
205-
- `SUPERSTREAM_TOPICS_LIST`: Comma-separated list of topics your application produces to
206-
207217
### Optional Environment Variables
208218

209219
- `SUPERSTREAM_LATENCY_SENSITIVE`: Set to "true" to prevent any modification to linger.ms values
@@ -212,7 +222,6 @@ Alternatively, you can use a multi-stage build to download the agent from Maven
212222

213223
Example:
214224
```bash
215-
export SUPERSTREAM_TOPICS_LIST=orders,payments,user-events
216225
export SUPERSTREAM_LATENCY_SENSITIVE=true
217226
```
218227

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package ai.superstream.examples.shadow;
2+
3+
import java.time.Duration;
4+
import java.util.Properties;
5+
6+
import org.apache.kafka.clients.producer.KafkaProducer;
7+
import org.apache.kafka.clients.producer.ProducerConfig;
8+
import org.apache.kafka.clients.producer.ProducerRecord;
9+
import org.apache.kafka.common.serialization.StringSerializer;
10+
11+
public class AbortTransactionExample {
12+
public static void main(String[] args) throws Exception {
13+
String bootstrap = System.getenv().getOrDefault("BOOTSTRAP_SERVERS", "localhost:9092");
14+
String topic = System.getenv().getOrDefault("TOPIC", "example-topic");
15+
16+
Properties p = new Properties();
17+
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
18+
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
19+
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
20+
p.put(ProducerConfig.CLIENT_ID_CONFIG, "shadow-abort");
21+
p.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "abort-" + System.currentTimeMillis());
22+
23+
KafkaProducer<String, String> producer = new KafkaProducer<>(p);
24+
try {
25+
producer.initTransactions();
26+
producer.beginTransaction();
27+
producer.send(new ProducerRecord<>(topic, "abort-key", "abort-value"));
28+
producer.abortTransaction();
29+
} finally {
30+
producer.close(Duration.ofSeconds(1));
31+
}
32+
}
33+
}
34+
35+
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package ai.superstream.examples.shadow;
2+
3+
import java.time.Duration;
4+
import java.util.Properties;
5+
6+
import org.apache.kafka.clients.producer.Callback;
7+
import org.apache.kafka.clients.producer.KafkaProducer;
8+
import org.apache.kafka.clients.producer.ProducerConfig;
9+
import org.apache.kafka.clients.producer.ProducerRecord;
10+
import org.apache.kafka.clients.producer.RecordMetadata;
11+
import org.apache.kafka.common.serialization.StringSerializer;
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
14+
15+
public class AsyncCallbackExample {
16+
private static final Logger LOG = LoggerFactory.getLogger(AsyncCallbackExample.class);
17+
18+
public static void main(String[] args) throws Exception {
19+
String bootstrap = System.getenv().getOrDefault("BOOTSTRAP_SERVERS", "localhost:9092");
20+
String topic = System.getenv().getOrDefault("TOPIC", "example-topic");
21+
22+
Properties p = new Properties();
23+
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
24+
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
25+
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
26+
p.put(ProducerConfig.CLIENT_ID_CONFIG, "shadow-async-callback");
27+
28+
KafkaProducer<String, String> producer = new KafkaProducer<>(p);
29+
try {
30+
producer.send(new ProducerRecord<>(topic, "async-key", "async-value"), new Callback() {
31+
@Override
32+
public void onCompletion(RecordMetadata metadata, Exception exception) {
33+
if (exception != null) {
34+
LOG.warn("async send error", exception);
35+
} else if (metadata != null) {
36+
LOG.info("async send success: topic={}, partition={}, offset={}",
37+
metadata.topic(), metadata.partition(), metadata.offset());
38+
} else {
39+
LOG.info("async send completed without metadata");
40+
}
41+
}
42+
});
43+
producer.flush();
44+
} finally {
45+
producer.close(Duration.ofSeconds(1));
46+
}
47+
}
48+
}
49+
50+
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package ai.superstream.examples.shadow;
2+
3+
import java.time.Duration;
4+
import java.util.Properties;
5+
6+
import org.apache.kafka.clients.producer.KafkaProducer;
7+
import org.apache.kafka.clients.producer.ProducerConfig;
8+
import org.apache.kafka.common.serialization.StringSerializer;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
12+
public class CloseOverloadsExample {
13+
private static final Logger LOG = LoggerFactory.getLogger(CloseOverloadsExample.class);
14+
15+
public static void main(String[] args) throws Exception {
16+
String bootstrap = System.getenv().getOrDefault("BOOTSTRAP_SERVERS", "localhost:9092");
17+
18+
Properties p = new Properties();
19+
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
20+
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
21+
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
22+
p.put(ProducerConfig.CLIENT_ID_CONFIG, "shadow-close-overloads");
23+
24+
KafkaProducer<String, String> producer = new KafkaProducer<>(p);
25+
try {
26+
producer.close(Duration.ofMillis(1000));
27+
LOG.info("Invoked close(Duration)");
28+
} finally {
29+
try { producer.close(); } catch (Throwable ignored) { } // shadow should log that it already closed
30+
}
31+
}
32+
}
33+
34+
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package ai.superstream.examples.shadow;
2+
3+
import java.nio.charset.StandardCharsets;
4+
import java.util.Map;
5+
6+
import org.apache.kafka.common.serialization.Serializer;
7+
8+
public class CustomStringSerializer implements Serializer<String> {
9+
@Override
10+
public void configure(Map<String, ?> configs, boolean isKey) {
11+
// no-op
12+
}
13+
14+
@Override
15+
public byte[] serialize(String topic, String data) {
16+
if (data == null) return null;
17+
return data.getBytes(StandardCharsets.UTF_8);
18+
}
19+
20+
@Override
21+
public void close() {
22+
// no-op
23+
}
24+
}
25+
26+
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package ai.superstream.examples.shadow;
2+
3+
import java.time.Duration;
4+
import java.util.Properties;
5+
6+
import org.apache.kafka.clients.producer.KafkaProducer;
7+
import org.apache.kafka.clients.producer.ProducerConfig;
8+
import org.apache.kafka.clients.producer.ProducerRecord;
9+
import org.apache.kafka.common.serialization.StringSerializer;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
12+
13+
public class HotSwapExample {
14+
private static final Logger LOG = LoggerFactory.getLogger(HotSwapExample.class);
15+
16+
public static void main(String[] args) throws Exception {
17+
String bootstrap = System.getenv().getOrDefault("BOOTSTRAP_SERVERS", "localhost:9092");
18+
String topic = System.getenv().getOrDefault("TOPIC", "example-topic");
19+
20+
Properties p = new Properties();
21+
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
22+
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
23+
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
24+
p.put(ProducerConfig.CLIENT_ID_CONFIG, "shadow-hot-swap");
25+
26+
KafkaProducer<String, String> producer = new KafkaProducer<>(p);
27+
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
28+
try {
29+
LOG.info("Shutdown: flushing and closing producer");
30+
producer.flush();
31+
producer.close(Duration.ofSeconds(1));
32+
} catch (Throwable ignored) { }
33+
}));
34+
35+
long i = 0;
36+
while (true) {
37+
producer.send(new ProducerRecord<>(topic, "loop-key", "loop-" + (i++)));
38+
Thread.sleep(5_000L);
39+
}
40+
}
41+
}
42+
43+
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package ai.superstream.examples.shadow;
2+
3+
import java.time.Duration;
4+
import java.util.ArrayList;
5+
import java.util.List;
6+
import java.util.Properties;
7+
8+
import org.apache.kafka.clients.producer.KafkaProducer;
9+
import org.apache.kafka.clients.producer.ProducerConfig;
10+
import org.apache.kafka.clients.producer.ProducerRecord;
11+
import org.apache.kafka.common.serialization.StringSerializer;
12+
13+
public class HundredProducersSendExample {
14+
public static void main(String[] args) throws Exception {
15+
String bootstrap = System.getenv().getOrDefault("BOOTSTRAP_SERVERS", "localhost:9092");
16+
String topic = System.getenv().getOrDefault("TOPIC", "example-topic");
17+
18+
List<KafkaProducer<String, String>> producers = new ArrayList<>();
19+
try {
20+
// Create 100 producers
21+
for (int i = 0; i < 100; i++) {
22+
Properties p = new Properties();
23+
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
24+
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
25+
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
26+
p.put(ProducerConfig.CLIENT_ID_CONFIG, "shadow-100-" + i);
27+
KafkaProducer<String, String> producer = new KafkaProducer<>(p);
28+
producers.add(producer);
29+
}
30+
31+
long startNs = System.nanoTime();
32+
int ticks = 12; // every 10s for 2 minutes total
33+
for (int t = 0; t < ticks; t++) {
34+
// Each producer sends once per tick
35+
for (int i = 0; i < producers.size(); i++) {
36+
producers.get(i).send(new ProducerRecord<>(topic, "many-" + i, "value-t" + t));
37+
}
38+
// Flush all producers after the wave
39+
for (KafkaProducer<String, String> producer : producers) {
40+
producer.flush();
41+
}
42+
// Sleep 10s between ticks except after the last wave
43+
if (t < ticks - 1) {
44+
Thread.sleep(10_000L);
45+
}
46+
}
47+
// Ensure total runtime ~2 minutes
48+
long elapsedMs = (System.nanoTime() - startNs) / 1_000_000L;
49+
long remaining = 120_000L - elapsedMs;
50+
if (remaining > 0) {
51+
Thread.sleep(remaining);
52+
}
53+
} finally {
54+
for (KafkaProducer<String, String> producer : producers) {
55+
try { producer.close(Duration.ofSeconds(1)); } catch (Throwable ignored) { }
56+
}
57+
}
58+
}
59+
}
60+
61+
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package ai.superstream.examples.shadow;
2+
3+
import java.time.Duration;
4+
import java.util.Properties;
5+
6+
import org.apache.kafka.clients.producer.KafkaProducer;
7+
import org.apache.kafka.clients.producer.ProducerConfig;
8+
import org.apache.kafka.clients.producer.ProducerRecord;
9+
10+
public class InitConfigCustomSerializerExample {
11+
public static void main(String[] args) throws Exception {
12+
String bootstrap = System.getenv().getOrDefault("BOOTSTRAP_SERVERS", "localhost:9092");
13+
String topic = System.getenv().getOrDefault("TOPIC", "example-topic");
14+
15+
Properties p = new Properties();
16+
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
17+
// Using custom (non-Kafka-namespace) serializers forces INIT_CONFIG run mode
18+
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, CustomStringSerializer.class.getName());
19+
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomStringSerializer.class.getName());
20+
p.put(ProducerConfig.CLIENT_ID_CONFIG, "init-config-custom-serializer");
21+
22+
KafkaProducer<String, String> producer = new KafkaProducer<>(p);
23+
try {
24+
producer.send(new ProducerRecord<>(topic, "initcfg-key", "initcfg-value")).get();
25+
producer.flush();
26+
} finally {
27+
producer.close(Duration.ofSeconds(1));
28+
}
29+
}
30+
}
31+
32+
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package ai.superstream.examples.shadow;
2+
3+
import java.util.List;
4+
import java.util.Map;
5+
import java.util.Properties;
6+
7+
import org.apache.kafka.clients.producer.KafkaProducer;
8+
import org.apache.kafka.clients.producer.ProducerConfig;
9+
import org.apache.kafka.common.serialization.StringSerializer;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
12+
13+
public class MetricsAndPartitionsExample {
14+
private static final Logger LOG = LoggerFactory.getLogger(MetricsAndPartitionsExample.class);
15+
16+
public static void main(String[] args) throws Exception {
17+
String bootstrap = System.getenv().getOrDefault("BOOTSTRAP_SERVERS", "localhost:9092");
18+
String topic = System.getenv().getOrDefault("TOPIC", "example-topic");
19+
20+
Properties p = new Properties();
21+
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
22+
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
23+
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
24+
p.put(ProducerConfig.CLIENT_ID_CONFIG, "shadow-metrics-partitions");
25+
26+
try (KafkaProducer<String, String> producer = new KafkaProducer<>(p)) {
27+
Map<?, ?> metrics = producer.metrics();
28+
LOG.info("metrics size={}", (metrics != null ? metrics.size() : 0));
29+
if (metrics != null) {
30+
int printed = 0;
31+
for (Map.Entry<?, ?> e : metrics.entrySet()) {
32+
LOG.debug("metric: {} = {}", String.valueOf(e.getKey()), String.valueOf(e.getValue()));
33+
if (++printed >= 5) break;
34+
}
35+
}
36+
37+
List<?> partitions = producer.partitionsFor(topic);
38+
LOG.info("partitionsFor({}) size={}", topic, (partitions != null ? partitions.size() : 0));
39+
if (partitions != null) {
40+
for (Object pInfo : partitions) {
41+
LOG.debug("partitionInfo: {}", String.valueOf(pInfo));
42+
}
43+
}
44+
}
45+
}
46+
}
47+
48+

0 commit comments

Comments
 (0)