Skip to content

Commit d179891

Browse files
collect more metrics + fix loginmodule bug + fix stop collection on producer close + fix inconsistent metrics collection on idle times
1 parent a66f2db commit d179891

File tree

13 files changed

+553
-143
lines changed

13 files changed

+553
-143
lines changed

.idea/runConfigurations/Aiven_example.xml

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/runConfigurations/spring_example.xml

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaProducerExample.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ public class KafkaProducerExample {
2929
public static void main(String[] args) {
3030
// Build the configuration map first using a mutable map
3131
Map<String, Object> mutableProps = new java.util.HashMap<>();
32-
mutableProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, DEFAULT_BOOTSTRAP_SERVERS);
33-
// mutableProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList(DEFAULT_BOOTSTRAP_SERVERS));
32+
// mutableProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, DEFAULT_BOOTSTRAP_SERVERS);
33+
mutableProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList(DEFAULT_BOOTSTRAP_SERVERS));
3434
// mutableProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, List.of(DEFAULT_BOOTSTRAP_SERVERS));
35-
// mutableProps.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID);
35+
mutableProps.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID);
3636
mutableProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
3737
mutableProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
3838
mutableProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, COMPRESSION_TYPE);
@@ -45,6 +45,9 @@ public static void main(String[] args) {
4545

4646
// Pass the immutable map directly to the KafkaProducer constructor
4747
Producer<String, String> producer = new KafkaProducer<String, String>(mutableProps);
48+
49+
mutableProps.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID+"1");
50+
Producer<String, String> producer1 = new KafkaProducer<String, String>(mutableProps);
4851
long recordCount = 50; // Number of messages to send
4952
try {
5053
while (true) {
@@ -53,10 +56,12 @@ public static void main(String[] args) {
5356
String messageKey = MESSAGE_KEY + "-" + i;
5457
String messageValue = MESSAGE_VALUE + "-" + i + "-" + System.currentTimeMillis();
5558
producer.send(new ProducerRecord<>(TOPIC_NAME, messageKey, messageValue));
59+
producer.send(new ProducerRecord<>(TOPIC_NAME+"1", messageKey, messageValue));
60+
producer1.send(new ProducerRecord<>(TOPIC_NAME+"1", messageKey, messageValue));
5661
}
5762

5863
producer.flush();
59-
Thread.sleep(10000);
64+
Thread.sleep(100000);
6065
}
6166
} catch (Exception e) {
6267
logger.error("Error sending message", e);

superstream-clients/dependency-reduced-pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<groupId>ai.superstream</groupId>
55
<artifactId>superstream-clients</artifactId>
66
<name>Superstream Kafka Client Optimizer</name>
7-
<version>1.0.201</version>
7+
<version>1.0.202</version>
88
<description>A Java library that dynamically optimizes Kafka client configuration based on recommendations</description>
99
<url>https://github.com/superstreamlabs/superstream-clients-java</url>
1010
<developers>

superstream-clients/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>ai.superstream</groupId>
88
<artifactId>superstream-clients</artifactId>
9-
<version>1.0.201</version>
9+
<version>1.0.202</version>
1010
<packaging>jar</packaging>
1111

1212
<name>Superstream Kafka Client Optimizer</name>
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package ai.superstream.agent;
2+
3+
import ai.superstream.util.SuperstreamLogger;
4+
import net.bytebuddy.asm.Advice;
5+
6+
/**
7+
* Intercepts {@code KafkaProducer.close(..)} so that Superstream stops collecting
8+
* metrics and sending statistics for that producer instance after the
9+
* application has closed it.
10+
*/
11+
public class KafkaProducerCloseInterceptor {
12+
13+
// Must be non-private because instrumented KafkaProducer.close() accesses this field via generated bytecode.
14+
public static final SuperstreamLogger logger = SuperstreamLogger.getLogger(KafkaProducerCloseInterceptor.class);
15+
16+
@Advice.OnMethodEnter
17+
public static void onEnter(@Advice.This Object producer) {
18+
if (KafkaProducerInterceptor.isDisabled()) {
19+
return;
20+
}
21+
22+
// Delegate to the main interceptor helper which handles thread-safety and
23+
// registry updates. We log only the first successful deactivation to
24+
// avoid duplicate messages when close() delegates internally.
25+
try {
26+
// KafkaProducer has three close(...) overloads that delegate to each other
27+
// (close(), close(Duration), close(long, TimeUnit)). Our agent advice is
28+
// woven into *all* of them, so this method is invoked once per layer of
29+
// delegation. We therefore rely on markProducerClosed() to tell us
30+
// whether this is the *first* invocation for the given producer object
31+
// and suppress further logging when it returns false.
32+
KafkaProducerInterceptor.markProducerClosed(producer);
33+
34+
} catch (Throwable ignored) {
35+
// We swallow any error so that we never affect the application's own
36+
// close() behaviour.
37+
}
38+
}
39+
}

0 commit comments

Comments
 (0)