Skip to content

Commit b6e48d4

Browse files
reporting error in case of error while fetching the metadata message
1 parent 515525f commit b6e48d4

File tree

3 files changed

+33
-22
lines changed

3 files changed

+33
-22
lines changed

superstream-clients/src/main/java/ai/superstream/core/ClientStatsReporter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ private static class ClusterStatsCoordinator {
292292
long interval = DEFAULT_REPORT_INTERVAL_MS;
293293
try {
294294
ai.superstream.model.MetadataMessage meta = ai.superstream.core.SuperstreamManager.getInstance()
295-
.getOrFetchMetadataMessage(bootstrapServers, baseProps);
295+
.getOrFetchMetadataMessage(bootstrapServers, baseProps).getKey();
296296
if (meta != null && meta.getReportIntervalMs() != null && meta.getReportIntervalMs() > 0) {
297297
interval = meta.getReportIntervalMs();
298298
}

superstream-clients/src/main/java/ai/superstream/core/MetadataConsumer.java

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ public class MetadataConsumer {
3030
* Get the metadata message from the Kafka cluster.
3131
*
3232
* @param bootstrapServers The Kafka bootstrap servers
33-
* @return The metadata message, or null if there was an error
33+
* @return A pair containing the metadata message (or null if error) and the error message (or null if no error)
3434
*/
35-
public MetadataMessage getMetadataMessage(String bootstrapServers, Properties originalClientProperties) {
35+
public java.util.AbstractMap.SimpleEntry<MetadataMessage, String> getMetadataMessage(String bootstrapServers, Properties originalClientProperties) {
3636
Properties properties = new Properties();
3737

3838
// Copy essential client configuration properties from the original client
@@ -67,8 +67,9 @@ public MetadataMessage getMetadataMessage(String bootstrapServers, Properties or
6767
// Check if the metadata topic exists
6868
Set<String> topics = consumer.listTopics().keySet();
6969
if (!topics.contains(METADATA_TOPIC)) {
70-
logger.error("[ERR-034] Superstream internal topic is missing. This topic is required for Superstream to function properly. Please make sure the Kafka user has read/write/describe permissions on superstream.* topics.");
71-
return null;
70+
String errMsg = "[ERR-034] Superstream internal topic is missing. This topic is required for Superstream to function properly. Please make sure the Kafka user has read/write/describe permissions on superstream.* topics.";
71+
logger.error(errMsg);
72+
return new java.util.AbstractMap.SimpleEntry<>(null, errMsg);
7273
}
7374

7475
// Assign the metadata topic
@@ -80,8 +81,9 @@ public MetadataMessage getMetadataMessage(String bootstrapServers, Properties or
8081
long endOffset = consumer.position(partition);
8182

8283
if (endOffset == 0) {
83-
logger.error("[ERR-035] Unable to retrieve optimizations data from Superstream. This is required for optimization. Please contact the Superstream team if the issue persists.");
84-
return null;
84+
String errMsg = "[ERR-035] Unable to retrieve optimizations data from Superstream. This is required for optimization. Please contact the Superstream team if the issue persists.";
85+
logger.error(errMsg);
86+
return new java.util.AbstractMap.SimpleEntry<>(null, errMsg);
8587
}
8688

8789
// Seek to the last message
@@ -90,20 +92,23 @@ public MetadataMessage getMetadataMessage(String bootstrapServers, Properties or
9092
// Poll for the message
9193
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
9294
if (records.isEmpty()) {
93-
logger.error("[ERR-036] Unable to retrieve optimizations data from Superstream. This is required for optimization. Please contact the Superstream team if the issue persists.");
94-
return null;
95+
String errMsg = "[ERR-036] Unable to retrieve optimizations data from Superstream. This is required for optimization. Please contact the Superstream team if the issue persists.";
96+
logger.error(errMsg);
97+
return new java.util.AbstractMap.SimpleEntry<>(null, errMsg);
9598
}
9699
logger.debug("Successfully retrieved a message from the {} topic", METADATA_TOPIC);
97100

98101
// Parse the message
99102
String json = records.iterator().next().value();
100-
return objectMapper.readValue(json, MetadataMessage.class);
103+
return new java.util.AbstractMap.SimpleEntry<>(objectMapper.readValue(json, MetadataMessage.class), null);
101104
} catch (IOException e) {
102-
logger.error("[ERR-027] Unable to retrieve optimizations data from Superstream. This is required for optimization. Please contact the Superstream team if the issue persists: {}", e.getMessage(), e);
103-
return null;
105+
String errMsg = "[ERR-027] Unable to retrieve optimizations data from Superstream. This is required for optimization. Please contact the Superstream team if the issue persists: " + e.getMessage();
106+
logger.error(errMsg, e);
107+
return new java.util.AbstractMap.SimpleEntry<>(null, errMsg);
104108
} catch (Exception e) {
105-
logger.error("[ERR-028] Unable to retrieve optimizations data from Superstream. This is required for optimization. Please make sure the Kafka user has read/write/describe permissions on superstream.* topics: {}", e.getMessage(), e);
106-
return null;
109+
String errMsg = "[ERR-028] Unable to retrieve optimizations data from Superstream. This is required for optimization. Please make sure the Kafka user has read/write/describe permissions on superstream.* topics: " + e.getMessage();
110+
logger.error(errMsg, e);
111+
return new java.util.AbstractMap.SimpleEntry<>(null, errMsg);
107112
}
108113
}
109114
}

superstream-clients/src/main/java/ai/superstream/core/SuperstreamManager.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,15 @@ public boolean optimizeProducer(String bootstrapServers, String clientId, Proper
113113
setOptimizationInProgress(true);
114114

115115
// Get or fetch the metadata message
116-
MetadataMessage metadataMessage = getOrFetchMetadataMessage(bootstrapServers, properties);
116+
java.util.AbstractMap.SimpleEntry<MetadataMessage, String> result = getOrFetchMetadataMessage(bootstrapServers, properties);
117+
MetadataMessage metadataMessage = result.getKey();
118+
String error = result.getValue();
119+
117120
if (metadataMessage == null) {
118-
// log is inside the getOrFetchMetadataMessage method
121+
// Error is already logged in getOrFetchMetadataMessage
122+
// Push ConfigInfo with error and original config for stats reporting
123+
java.util.Deque<ai.superstream.agent.KafkaProducerInterceptor.ConfigInfo> cfgStack = ai.superstream.agent.KafkaProducerInterceptor.TL_CFG_STACK.get();
124+
cfgStack.push(new ai.superstream.agent.KafkaProducerInterceptor.ConfigInfo(convertPropertiesToMap(properties), new java.util.HashMap<>(), error));
119125
return false;
120126
}
121127

@@ -215,11 +221,10 @@ public boolean optimizeProducer(String bootstrapServers, String clientId, Proper
215221
* Get the metadata message for a given Kafka cluster.
216222
*
217223
* @param bootstrapServers The Kafka bootstrap servers
218-
* @return The metadata message, or null if it couldn't be retrieved
224+
* @return A pair containing the metadata message (or null if error) and the error message (or null if no error)
219225
*/
220-
public MetadataMessage getOrFetchMetadataMessage(String bootstrapServers, Properties originalProperties) {
226+
public java.util.AbstractMap.SimpleEntry<MetadataMessage, String> getOrFetchMetadataMessage(String bootstrapServers, Properties originalProperties) {
221227
// Normalise the bootstrap servers so that different orderings of the same
222-
// hosts/ports map to the *same* cache entry. This prevents duplicate
223228
// Kafka consumers and wasted network calls when the application creates
224229
// multiple producers with logically-identical bootstrap lists such as
225230
// "b1:9092,b2:9092" and "b2:9092,b1:9092".
@@ -228,18 +233,19 @@ public MetadataMessage getOrFetchMetadataMessage(String bootstrapServers, Proper
228233

229234
// Check the cache first
230235
if (metadataCache.containsKey(cacheKey)) {
231-
return metadataCache.get(cacheKey);
236+
return new java.util.AbstractMap.SimpleEntry<>(metadataCache.get(cacheKey), null);
232237
}
233238

234239
// Fetch the metadata using the *original* string (ordering is irrelevant
235240
// for the Kafka client itself)
236-
MetadataMessage metadataMessage = metadataConsumer.getMetadataMessage(bootstrapServers, originalProperties);
241+
java.util.AbstractMap.SimpleEntry<MetadataMessage, String> result = metadataConsumer.getMetadataMessage(bootstrapServers, originalProperties);
242+
MetadataMessage metadataMessage = result.getKey();
237243

238244
if (metadataMessage != null) {
239245
metadataCache.put(cacheKey, metadataMessage);
240246
}
241247

242-
return metadataMessage;
248+
return result;
243249
}
244250

245251
/**

0 commit comments

Comments
 (0)