Skip to content

Commit c30c2b7

Browse files
report errors as metadata
1 parent 655978b commit c30c2b7

File tree

4 files changed

+42
-21
lines changed

4 files changed

+42
-21
lines changed

examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaProducerExample.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public static void main(String[] args) {
3838
mutableProps.put(ProducerConfig.LINGER_MS_CONFIG, 500);
3939

4040
// Wrap the map to make it immutable – simulates a user supplying an unmodifiable configuration object
41-
// Map<String, Object> props = java.util.Collections.unmodifiableMap(mutableProps);
41+
Map<String, Object> props = java.util.Collections.unmodifiableMap(mutableProps);
4242

4343
// Pass the immutable map directly to the KafkaProducer constructor
4444
Producer<String, String> producer = new KafkaProducer<String, String>(mutableProps);

superstream-clients/src/main/java/ai/superstream/agent/KafkaProducerInterceptor.java

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import ai.superstream.core.ClientStatsReporter;
44
import ai.superstream.core.SuperstreamManager;
5+
import ai.superstream.model.MetadataMessage;
56
import ai.superstream.util.SuperstreamLogger;
67
import net.bytebuddy.asm.Advice;
78

@@ -16,6 +17,7 @@
1617
import java.util.concurrent.atomic.AtomicBoolean;
1718
import java.util.concurrent.atomic.AtomicReference;
1819
import java.lang.ThreadLocal;
20+
import java.util.Collections;
1921

2022
/**
2123
* Intercepts KafkaProducer constructor calls to optimize configurations and
@@ -120,6 +122,7 @@ public static void onEnter(@Advice.AllArguments Object[] args) {
120122
TL_UUID_STACK.get().push(producerUuid);
121123
} else {
122124
logger.error("[ERR-002] Could not extract properties from producer arguments");
125+
// here we can not report the error to the clients topic as we were not able to extract the properties
123126
return;
124127
}
125128

@@ -136,19 +139,6 @@ public static void onEnter(@Advice.AllArguments Object[] args) {
136139
}
137140
}
138141

139-
if (immutableConfigDetected && immutableOriginalMap != null) {
140-
String errMsg = String.format("[ERR-010] Cannot optimize KafkaProducer configuration: received an unmodifiable Map (%s). Please pass a mutable java.util.Properties or java.util.Map instead.",
141-
immutableOriginalMap.getClass().getName());
142-
logger.error(errMsg);
143-
144-
// Clean up ThreadLocals so that onExit knows to skip reporter setup
145-
TL_PROPS_STACK.remove();
146-
TL_UUID_STACK.remove();
147-
148-
// Do NOT attempt optimisation
149-
return;
150-
}
151-
152142
// Make a copy of the original properties in case we need to restore them
153143
Properties originalProperties = new Properties();
154144
originalProperties.putAll(properties);
@@ -179,6 +169,37 @@ public static void onEnter(@Advice.AllArguments Object[] args) {
179169
return;
180170
}
181171

172+
if (immutableConfigDetected && immutableOriginalMap != null) {
173+
String errMsg = String.format("[ERR-010] Cannot optimize KafkaProducer configuration: received an unmodifiable Map (%s). Please pass a mutable java.util.Properties or java.util.Map instead.",
174+
immutableOriginalMap.getClass().getName());
175+
logger.error(errMsg);
176+
177+
// Report the error to the client
178+
try {
179+
// Get metadata message before reporting
180+
MetadataMessage metadataMessage = SuperstreamManager.getInstance().getOrFetchMetadataMessage(bootstrapServers, properties);
181+
182+
SuperstreamManager.getInstance().reportClientInformation(
183+
bootstrapServers,
184+
properties,
185+
metadataMessage,
186+
clientId,
187+
properties,
188+
Collections.emptyMap(), // no optimized configuration since we can't optimize
189+
errMsg
190+
);
191+
} catch (Exception e) {
192+
logger.error("[ERR-058] Failed to report client error: {}", e.getMessage(), e);
193+
}
194+
195+
// Clean up ThreadLocals so that onExit knows to skip reporter setup
196+
TL_PROPS_STACK.remove();
197+
TL_UUID_STACK.remove();
198+
199+
// Do NOT attempt optimisation
200+
return;
201+
}
202+
182203
// Optimize the producer
183204
boolean success = SuperstreamManager.getInstance().optimizeProducer(bootstrapServers, clientId, properties);
184205
if (!success) {

superstream-clients/src/main/java/ai/superstream/core/SuperstreamManager.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ public boolean optimizeProducer(String bootstrapServers, String clientId, Proper
208208
* @param bootstrapServers The Kafka bootstrap servers
209209
* @return The metadata message, or null if it couldn't be retrieved
210210
*/
211-
private MetadataMessage getOrFetchMetadataMessage(String bootstrapServers, Properties originalProperties) {
211+
public MetadataMessage getOrFetchMetadataMessage(String bootstrapServers, Properties originalProperties) {
212212
// Check the cache first
213213
if (metadataCache.containsKey(bootstrapServers)) {
214214
return metadataCache.get(bootstrapServers);
@@ -250,7 +250,7 @@ private List<String> getApplicationTopics() {
250250
* @param originalConfiguration The original configuration
251251
* @param optimizedConfiguration The optimized configuration
252252
*/
253-
private void reportClientInformation(String bootstrapServers, Properties originalProperties, MetadataMessage metadataMessage,
253+
public void reportClientInformation(String bootstrapServers, Properties originalProperties, MetadataMessage metadataMessage,
254254
String clientId, Properties originalConfiguration,
255255
Map<String, Object> optimizedConfiguration,
256256
String error) {
@@ -269,8 +269,8 @@ private void reportClientInformation(String bootstrapServers, Properties origina
269269
boolean success = clientReporter.reportClient(
270270
bootstrapServers,
271271
originalProperties,
272-
metadataMessage.getSuperstreamClusterId(),
273-
metadataMessage.isActive(),
272+
metadataMessage != null ? metadataMessage.getSuperstreamClusterId() : null,
273+
metadataMessage != null ? metadataMessage.isActive() : false,
274274
clientId,
275275
originalConfiguration1,
276276
optimizedConfiguration,

superstream-clients/src/main/java/ai/superstream/util/NetworkUtils.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public static String getLocalIpAddress() {
2929
Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
3030
if (interfaces == null) {
3131
logger.warn("No network interfaces found");
32-
return "unknown";
32+
return "";
3333
}
3434
while (interfaces.hasMoreElements()) {
3535
NetworkInterface networkInterface = interfaces.nextElement();
@@ -53,7 +53,7 @@ public static String getLocalIpAddress() {
5353
return cachedIpAddress;
5454
} catch (SocketException | UnknownHostException e) {
5555
logger.error("[ERR-033] Failed to determine local IP address", e);
56-
return "unknown";
56+
return "";
5757
}
5858
}
5959

@@ -73,7 +73,7 @@ public static String getHostname() {
7373
return cachedHostname;
7474
} catch (UnknownHostException e) {
7575
logger.error("[ERR-034] Failed to determine local hostname", e);
76-
return "unknown";
76+
return "";
7777
}
7878
}
7979
}

0 commit comments

Comments
 (0)