Skip to content

Commit c294521

Browse files
Merge pull request #46 from superstreamlabs/master
Release
2 parents 82ad1fe + 6e90baa commit c294521

32 files changed

+4039
-517
lines changed

Jenkinsfile

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@ def isManualTrigger() {
22
return currentBuild.getBuildCauses().any { cause -> cause._class == 'hudson.model.Cause$UserIdCause' }
33
}
44

5+
@Library('shared-library') _
6+
57
pipeline {
68

79
agent {
810
docker {
9-
label 'memphis-jenkins-big-fleet,'
11+
label 'big-ec2-fleet'
1012
image 'maven:3.8.4-openjdk-17'
1113
args '-u root'
1214
}
@@ -19,6 +21,17 @@ pipeline {
1921

2022
stages {
2123
stage('Read Version from pom.xml') {
24+
when {
25+
anyOf {
26+
allOf {
27+
branch 'master'
28+
triggeredBy 'UserIdCause' // Manual trigger on master
29+
}
30+
allOf {
31+
branch 'latest'
32+
}
33+
}
34+
}
2235
steps {
2336
dir('superstream-clients'){
2437
script {
@@ -120,6 +133,48 @@ pipeline {
120133
always {
121134
cleanWs()
122135
}
136+
success {
137+
script {
138+
if (env.GIT_BRANCH == 'latest') {
139+
sendSlackNotification('SUCCESS')
140+
notifySuccessful()
141+
}
142+
}
143+
}
144+
145+
failure {
146+
script {
147+
if (env.GIT_BRANCH == 'latest') {
148+
sendSlackNotification('FAILURE')
149+
notifyFailed()
150+
}
151+
}
152+
}
153+
aborted {
154+
script {
155+
if (env.BRANCH_NAME == 'latest') {
156+
sendSlackNotification('ABORTED')
157+
}
158+
// Get the build log to check for the specific exception and retry job
159+
AgentOfflineException()
160+
}
161+
}
123162
}
124163
}
125164

165+
def notifySuccessful() {
166+
emailext (
167+
subject: "SUCCESSFUL: Job '${env.JOB_NAME} [${env.BUILD_NUMBER}]'",
168+
body: """SUCCESSFUL: Job '${env.JOB_NAME} [${env.BUILD_NUMBER}]':
169+
Check console output and connection attributes at ${env.BUILD_URL}""",
170+
171+
)
172+
}
173+
def notifyFailed() {
174+
emailext (
175+
subject: "FAILED: Job '${env.JOB_NAME} [${env.BUILD_NUMBER}]'",
176+
body: """FAILED: Job '${env.JOB_NAME} [${env.BUILD_NUMBER}]':
177+
Check console output at ${env.BUILD_URL}""",
178+
179+
)
180+
}

README.md

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,20 @@ Works with any Java library that depends on `kafka-clients`, including:
3636

3737
The library fully supports Java versions 11 through 21.
3838

39+
## Run Modes
40+
41+
Superstream operates in one of two modes per `KafkaProducer` instance:
42+
43+
- **INIT_CONFIG**: Superstream applies init-time configuration optimizations directly to the application's producer configuration before the producer is constructed. No shadow producer is created. The application calls run on the original producer.
44+
- **SHADOW**: Superstream initializes a shadow producer effectively replacing original producer and routes supported producer methods through it. After a learning period, a one-shot optimized configuration is applied to the shadow. The original producer remains in place; routing is transparent.
45+
46+
### Default Optimizations
47+
48+
When no topic-specific configuration is available (or during init-time defaults), Superstream applies:
49+
- `compression.type=zstd`
50+
- `batch.size=65536`
51+
- `linger.ms=5000` (skipped when `SUPERSTREAM_LATENCY_SENSITIVE=true`)
52+
3953
## Important: Producer Configuration Requirements
4054

4155
When initializing your Kafka producers, please ensure you pass the configuration as a mutable object. The Superstream library needs to modify the producer configuration to apply optimizations. The following initialization patterns are supported:
@@ -200,10 +214,6 @@ ENTRYPOINT ["java", "-javaagent:/app/superstream-agent.jar", "-jar", "/app/app.j
200214
Alternatively, you can use a multi-stage build to download the agent from Maven Central:
201215

202216

203-
### Required Environment Variables
204-
205-
- `SUPERSTREAM_TOPICS_LIST`: Comma-separated list of topics your application produces to
206-
207217
### Optional Environment Variables
208218

209219
- `SUPERSTREAM_LATENCY_SENSITIVE`: Set to "true" to prevent any modification to linger.ms values
@@ -212,7 +222,6 @@ Alternatively, you can use a multi-stage build to download the agent from Maven
212222

213223
Example:
214224
```bash
215-
export SUPERSTREAM_TOPICS_LIST=orders,payments,user-events
216225
export SUPERSTREAM_LATENCY_SENSITIVE=true
217226
```
218227

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package ai.superstream.examples.shadow;
2+
3+
import java.time.Duration;
4+
import java.util.Properties;
5+
6+
import org.apache.kafka.clients.producer.KafkaProducer;
7+
import org.apache.kafka.clients.producer.ProducerConfig;
8+
import org.apache.kafka.clients.producer.ProducerRecord;
9+
import org.apache.kafka.common.serialization.StringSerializer;
10+
11+
public class AbortTransactionExample {
12+
public static void main(String[] args) throws Exception {
13+
String bootstrap = System.getenv().getOrDefault("BOOTSTRAP_SERVERS", "localhost:9092");
14+
String topic = System.getenv().getOrDefault("TOPIC", "example-topic");
15+
16+
Properties p = new Properties();
17+
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
18+
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
19+
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
20+
p.put(ProducerConfig.CLIENT_ID_CONFIG, "shadow-abort");
21+
p.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "abort-" + System.currentTimeMillis());
22+
23+
KafkaProducer<String, String> producer = new KafkaProducer<>(p);
24+
try {
25+
producer.initTransactions();
26+
producer.beginTransaction();
27+
producer.send(new ProducerRecord<>(topic, "abort-key", "abort-value"));
28+
producer.abortTransaction();
29+
} finally {
30+
producer.close(Duration.ofSeconds(1));
31+
}
32+
}
33+
}
34+
35+
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package ai.superstream.examples.shadow;
2+
3+
import java.time.Duration;
4+
import java.util.Properties;
5+
6+
import org.apache.kafka.clients.producer.Callback;
7+
import org.apache.kafka.clients.producer.KafkaProducer;
8+
import org.apache.kafka.clients.producer.ProducerConfig;
9+
import org.apache.kafka.clients.producer.ProducerRecord;
10+
import org.apache.kafka.clients.producer.RecordMetadata;
11+
import org.apache.kafka.common.serialization.StringSerializer;
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
14+
15+
public class AsyncCallbackExample {
16+
private static final Logger LOG = LoggerFactory.getLogger(AsyncCallbackExample.class);
17+
18+
public static void main(String[] args) throws Exception {
19+
String bootstrap = System.getenv().getOrDefault("BOOTSTRAP_SERVERS", "localhost:9092");
20+
String topic = System.getenv().getOrDefault("TOPIC", "example-topic");
21+
22+
Properties p = new Properties();
23+
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
24+
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
25+
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
26+
p.put(ProducerConfig.CLIENT_ID_CONFIG, "shadow-async-callback");
27+
28+
KafkaProducer<String, String> producer = new KafkaProducer<>(p);
29+
try {
30+
producer.send(new ProducerRecord<>(topic, "async-key", "async-value"), new Callback() {
31+
@Override
32+
public void onCompletion(RecordMetadata metadata, Exception exception) {
33+
if (exception != null) {
34+
LOG.warn("async send error", exception);
35+
} else if (metadata != null) {
36+
LOG.info("async send success: topic={}, partition={}, offset={}",
37+
metadata.topic(), metadata.partition(), metadata.offset());
38+
} else {
39+
LOG.info("async send completed without metadata");
40+
}
41+
}
42+
});
43+
producer.flush();
44+
} finally {
45+
producer.close(Duration.ofSeconds(1));
46+
}
47+
}
48+
}
49+
50+
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package ai.superstream.examples.shadow;
2+
3+
import java.time.Duration;
4+
import java.util.Properties;
5+
6+
import org.apache.kafka.clients.producer.KafkaProducer;
7+
import org.apache.kafka.clients.producer.ProducerConfig;
8+
import org.apache.kafka.common.serialization.StringSerializer;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
12+
public class CloseOverloadsExample {
13+
private static final Logger LOG = LoggerFactory.getLogger(CloseOverloadsExample.class);
14+
15+
public static void main(String[] args) throws Exception {
16+
String bootstrap = System.getenv().getOrDefault("BOOTSTRAP_SERVERS", "localhost:9092");
17+
18+
Properties p = new Properties();
19+
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
20+
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
21+
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
22+
p.put(ProducerConfig.CLIENT_ID_CONFIG, "shadow-close-overloads");
23+
24+
KafkaProducer<String, String> producer = new KafkaProducer<>(p);
25+
try {
26+
producer.close(Duration.ofMillis(1000));
27+
LOG.info("Invoked close(Duration)");
28+
} finally {
29+
try { producer.close(); } catch (Throwable ignored) { } // shadow should log that it already closed
30+
}
31+
}
32+
}
33+
34+
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package ai.superstream.examples.shadow;
2+
3+
import java.nio.charset.StandardCharsets;
4+
import java.util.Map;
5+
6+
import org.apache.kafka.common.serialization.Serializer;
7+
8+
public class CustomStringSerializer implements Serializer<String> {
9+
@Override
10+
public void configure(Map<String, ?> configs, boolean isKey) {
11+
// no-op
12+
}
13+
14+
@Override
15+
public byte[] serialize(String topic, String data) {
16+
if (data == null) return null;
17+
return data.getBytes(StandardCharsets.UTF_8);
18+
}
19+
20+
@Override
21+
public void close() {
22+
// no-op
23+
}
24+
}
25+
26+
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package ai.superstream.examples.shadow;
2+
3+
import java.time.Duration;
4+
import java.util.Properties;
5+
6+
import org.apache.kafka.clients.producer.KafkaProducer;
7+
import org.apache.kafka.clients.producer.ProducerConfig;
8+
import org.apache.kafka.clients.producer.ProducerRecord;
9+
import org.apache.kafka.common.serialization.StringSerializer;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
12+
13+
public class HotSwapExample {
14+
private static final Logger LOG = LoggerFactory.getLogger(HotSwapExample.class);
15+
16+
public static void main(String[] args) throws Exception {
17+
String bootstrap = System.getenv().getOrDefault("BOOTSTRAP_SERVERS", "localhost:9092");
18+
String topic = System.getenv().getOrDefault("TOPIC", "example-topic");
19+
20+
Properties p = new Properties();
21+
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
22+
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
23+
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
24+
p.put(ProducerConfig.CLIENT_ID_CONFIG, "shadow-hot-swap");
25+
26+
KafkaProducer<String, String> producer = new KafkaProducer<>(p);
27+
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
28+
try {
29+
LOG.info("Shutdown: flushing and closing producer");
30+
producer.flush();
31+
producer.close(Duration.ofSeconds(1));
32+
} catch (Throwable ignored) { }
33+
}));
34+
35+
long i = 0;
36+
while (true) {
37+
producer.send(new ProducerRecord<>(topic, "loop-key", "loop-" + (i++)));
38+
Thread.sleep(5_000L);
39+
}
40+
}
41+
}
42+
43+
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package ai.superstream.examples.shadow;
2+
3+
import java.time.Duration;
4+
import java.util.ArrayList;
5+
import java.util.List;
6+
import java.util.Properties;
7+
8+
import org.apache.kafka.clients.producer.KafkaProducer;
9+
import org.apache.kafka.clients.producer.ProducerConfig;
10+
import org.apache.kafka.clients.producer.ProducerRecord;
11+
import org.apache.kafka.common.serialization.StringSerializer;
12+
13+
public class HundredProducersSendExample {
14+
public static void main(String[] args) throws Exception {
15+
String bootstrap = System.getenv().getOrDefault("BOOTSTRAP_SERVERS", "localhost:9092");
16+
String topic = System.getenv().getOrDefault("TOPIC", "example-topic");
17+
18+
List<KafkaProducer<String, String>> producers = new ArrayList<>();
19+
try {
20+
// Create 100 producers
21+
for (int i = 0; i < 100; i++) {
22+
Properties p = new Properties();
23+
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
24+
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
25+
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
26+
p.put(ProducerConfig.CLIENT_ID_CONFIG, "shadow-100-" + i);
27+
KafkaProducer<String, String> producer = new KafkaProducer<>(p);
28+
producers.add(producer);
29+
}
30+
31+
long startNs = System.nanoTime();
32+
int ticks = 12; // every 10s for 2 minutes total
33+
for (int t = 0; t < ticks; t++) {
34+
// Each producer sends once per tick
35+
for (int i = 0; i < producers.size(); i++) {
36+
producers.get(i).send(new ProducerRecord<>(topic, "many-" + i, "value-t" + t));
37+
}
38+
// Flush all producers after the wave
39+
for (KafkaProducer<String, String> producer : producers) {
40+
producer.flush();
41+
}
42+
// Sleep 10s between ticks except after the last wave
43+
if (t < ticks - 1) {
44+
Thread.sleep(10_000L);
45+
}
46+
}
47+
// Ensure total runtime ~2 minutes
48+
long elapsedMs = (System.nanoTime() - startNs) / 1_000_000L;
49+
long remaining = 120_000L - elapsedMs;
50+
if (remaining > 0) {
51+
Thread.sleep(remaining);
52+
}
53+
} finally {
54+
for (KafkaProducer<String, String> producer : producers) {
55+
try { producer.close(Duration.ofSeconds(1)); } catch (Throwable ignored) { }
56+
}
57+
}
58+
}
59+
}
60+
61+

0 commit comments

Comments
 (0)