Skip to content

Commit 515525f

Browse files
populate stats messages with error that was missed
1 parent 680dd6f commit 515525f

File tree

2 files changed

+15
-6
lines changed

2 files changed

+15
-6
lines changed

superstream-clients/src/main/java/ai/superstream/agent/KafkaProducerInterceptor.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import java.util.concurrent.atomic.AtomicReference;
1919
import java.lang.ThreadLocal;
2020
import java.util.Collections;
21+
import java.util.List;
22+
import java.util.Arrays;
2123

2224
/**
2325
* Intercepts KafkaProducer constructor calls to optimize configurations and
@@ -206,7 +208,10 @@ public static void onEnter(@Advice.AllArguments Object[] args) {
206208
properties.putAll(originalProperties);
207209
// Push ConfigInfo with original config for stats reporting
208210
java.util.Deque<ConfigInfo> cfgStack = TL_CFG_STACK.get();
209-
cfgStack.push(new ConfigInfo(originalPropertiesMap, new java.util.HashMap<>()));
211+
// Check if there's an existing ConfigInfo with an error
212+
ConfigInfo existingConfig = cfgStack.isEmpty() ? null : cfgStack.peek();
213+
String error = existingConfig != null ? existingConfig.error : null;
214+
cfgStack.push(new ConfigInfo(originalPropertiesMap, new java.util.HashMap<>(), error));
210215
}
211216
} catch (Exception e) {
212217
logger.error("[ERR-053] Error during producer optimization: {}", e.getMessage(), e);
@@ -291,21 +296,21 @@ public static void onExit(@Advice.This Object producer) {
291296

292297
// Set the most impactful topic if possible
293298
try {
294-
ai.superstream.model.MetadataMessage metadataMessage = null;
295-
java.util.List<String> topics = null;
299+
MetadataMessage metadataMessage = null;
300+
List<String> topics = null;
296301
// Try to get metadata and topics if available
297302
if (producerProps != null) {
298303
String bootstrapServersProp = producerProps.getProperty("bootstrap.servers");
299304
if (bootstrapServersProp != null) {
300-
metadataMessage = ai.superstream.core.SuperstreamManager.getInstance().getOrFetchMetadataMessage(bootstrapServersProp, producerProps);
305+
metadataMessage = SuperstreamManager.getInstance().getOrFetchMetadataMessage(bootstrapServersProp, producerProps);
301306
}
302307
String topicsEnv = System.getenv("SUPERSTREAM_TOPICS_LIST");
303308
if (topicsEnv != null && !topicsEnv.trim().isEmpty()) {
304-
topics = java.util.Arrays.asList(topicsEnv.split(","));
309+
topics = Arrays.asList(topicsEnv.split(","));
305310
}
306311
}
307312
if (metadataMessage != null && topics != null) {
308-
String mostImpactfulTopic = ai.superstream.core.SuperstreamManager.getInstance().getConfigurationOptimizer().getMostImpactfulTopicName(metadataMessage, topics);
313+
String mostImpactfulTopic = SuperstreamManager.getInstance().getConfigurationOptimizer().getMostImpactfulTopicName(metadataMessage, topics);
309314
if (mostImpactfulTopic == null) {
310315
mostImpactfulTopic = "";
311316
}

superstream-clients/src/main/java/ai/superstream/core/SuperstreamManager.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,10 @@ public boolean optimizeProducer(String bootstrapServers, String clientId, Proper
128128
String errMsg = "[ERR-054] Superstream optimization is not active for this kafka cluster, please head to the Superstream console and activate it.";
129129
logger.error(errMsg);
130130
reportClientInformation(bootstrapServers, properties, metadataMessage, clientId, originalProperties, Collections.emptyMap(), errMsg);
131+
132+
// Push ConfigInfo with error and original config for stats reporting
133+
java.util.Deque<ai.superstream.agent.KafkaProducerInterceptor.ConfigInfo> cfgStack = ai.superstream.agent.KafkaProducerInterceptor.TL_CFG_STACK.get();
134+
cfgStack.push(new ai.superstream.agent.KafkaProducerInterceptor.ConfigInfo(convertPropertiesToMap(properties), new java.util.HashMap<>(), errMsg));
131135
return false;
132136
}
133137

0 commit comments

Comments
 (0)