Skip to content

Commit d8decf0

Browse files
support dynamic sample frequency comes as part of the metadata message
1 parent 08d209c commit d8decf0

File tree

6 files changed

+104
-14
lines changed

6 files changed

+104
-14
lines changed

.idea/runConfigurations/kafka_clients_example.xml

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/kafka-clients-example/src/main/java/ai/superstream/examples/MskKafkaExample.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,7 @@ public class MskKafkaExample {
2929
private static final Logger logger = LoggerFactory.getLogger(MskKafkaExample.class);
3030

3131
// === Configuration Constants ===
32-
private static final String DEFAULT_BOOTSTRAP_SERVERS =
33-
"b-23-public.superstreamstgmsk.0y88si.c2.kafka.eu-central-1.amazonaws.com:9198," +
34-
"b-24-public.superstreamstgmsk.0y88si.c2.kafka.eu-central-1.amazonaws.com:9198," +
35-
"b-2-public.superstreamstgmsk.0y88si.c2.kafka.eu-central-1.amazonaws.com:9198";
32+
private static final String DEFAULT_BOOTSTRAP_SERVERS = "b-1-public.superstreamstgmsk.0y88si.c2.kafka.eu-central-1.amazonaws.com:9198,b-2-public.superstreamstgmsk.0y88si.c2.kafka.eu-central-1.amazonaws.com:9198";
3633

3734
// AWS IAM Credentials
3835
private static final String AWS_ACCESS_KEY_ID = "<your-access-key>";

superstream-clients/src/main/java/ai/superstream/agent/KafkaProducerInterceptor.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1404,10 +1404,19 @@ public static boolean markProducerClosed(Object producer) {
14041404
if (info.isActive.getAndSet(false)) {
14051405
logger.debug("Producer {} marked as closed; metrics collection will stop", producerId);
14061406
try {
1407-
info.getReporter().deactivate();
1408-
} catch (Exception ignored) {}
1409-
// Remove from auxiliary map
1407+
ClientStatsReporter reporter = info.getReporter();
1408+
if (reporter != null) {
1409+
// Stop the reporter and deregister it from coordinators
1410+
reporter.deactivate();
1411+
ai.superstream.core.ClientStatsReporter.deregisterReporter(reporter);
1412+
}
1413+
} catch (Exception e) {
1414+
logger.debug("Error deactivating reporter for {}: {}", producerId, e.getMessage());
1415+
}
1416+
1417+
// Remove from lookup maps to free memory
14101418
clientStatsReporters.remove(producerId);
1419+
producerMetricsMap.remove(producerId);
14111420
return true;
14121421
} else {
14131422
return false; // already closed previously

superstream-clients/src/main/java/ai/superstream/core/ClientStatsReporter.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ public class ClientStatsReporter {
3030
private static final SuperstreamLogger logger = SuperstreamLogger.getLogger(ClientStatsReporter.class);
3131
private static final String CLIENTS_TOPIC = "superstream.clients";
3232
private static final ObjectMapper objectMapper = new ObjectMapper();
33-
private static final long REPORT_INTERVAL_MS = 300000; // 5 minutes
33+
// Default reporting interval (5 minutes) – overridden when metadata provides a different value
34+
private static final long DEFAULT_REPORT_INTERVAL_MS = 300000; // 5 minutes
3435
private static final String DISABLED_ENV_VAR = "SUPERSTREAM_DISABLED";
3536

3637
// Shared scheduler for all reporters to minimize thread usage
@@ -282,18 +283,39 @@ private static class ClusterStatsCoordinator {
282283
private final CopyOnWriteArrayList<ClientStatsReporter> reporters = new CopyOnWriteArrayList<>();
283284
private final AtomicBoolean scheduled = new AtomicBoolean(false);
284285

286+
// Report interval for this cluster (milliseconds)
287+
private final long reportIntervalMs;
288+
285289
ClusterStatsCoordinator(String bootstrapServers, Properties baseProps) {
286290
this.bootstrapServers = bootstrapServers;
287291
this.baseProps = baseProps;
292+
293+
long interval = DEFAULT_REPORT_INTERVAL_MS;
294+
try {
295+
ai.superstream.model.MetadataMessage meta = ai.superstream.core.SuperstreamManager.getInstance()
296+
.getOrFetchMetadataMessage(bootstrapServers, baseProps);
297+
if (meta != null && meta.getReportIntervalMs() != null && meta.getReportIntervalMs() > 0) {
298+
interval = meta.getReportIntervalMs();
299+
}
300+
} catch (Exception e) {
301+
logger.warn("Could not obtain report interval from metadata: {}. Using default {} ms", e.getMessage(), DEFAULT_REPORT_INTERVAL_MS);
302+
}
303+
304+
this.reportIntervalMs = interval;
288305
}
289306

290307
void addReporter(ClientStatsReporter r) {
291308
reporters.add(r);
292309
if (scheduled.compareAndSet(false, true)) {
293-
scheduler.scheduleAtFixedRate(this::run, REPORT_INTERVAL_MS, REPORT_INTERVAL_MS, TimeUnit.MILLISECONDS);
310+
scheduler.scheduleAtFixedRate(this::run, reportIntervalMs, reportIntervalMs, TimeUnit.MILLISECONDS);
294311
}
295312
}
296313

314+
// Allows outer class to remove a reporter when the underlying KafkaProducer is closed
315+
void removeReporter(ClientStatsReporter r) {
316+
reporters.remove(r);
317+
}
318+
297319
private void run() {
298320
if (reporters.isEmpty())
299321
return;
@@ -336,4 +358,16 @@ private void run() {
336358
public void deactivate() {
337359
registered.set(false);
338360
}
361+
362+
/**
363+
* Remove the given reporter instance from all cluster coordinators. Called by the
364+
* agent when the application closes its <code>KafkaProducer</code> so that we
365+
* do not retain references to obsolete reporter objects.
366+
*/
367+
public static void deregisterReporter(ClientStatsReporter reporter) {
368+
if (reporter == null) {
369+
return;
370+
}
371+
coordinators.values().forEach(coord -> coord.removeReporter(reporter));
372+
}
339373
}

superstream-clients/src/main/java/ai/superstream/core/SuperstreamManager.java

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -214,21 +214,56 @@ public boolean optimizeProducer(String bootstrapServers, String clientId, Proper
214214
* @return The metadata message, or null if it couldn't be retrieved
215215
*/
216216
public MetadataMessage getOrFetchMetadataMessage(String bootstrapServers, Properties originalProperties) {
217+
// Normalise the bootstrap servers so that different orderings of the same
218+
// hosts/ports map to the *same* cache entry. This prevents duplicate
219+
// Kafka consumers and wasted network calls when the application creates
220+
// multiple producers with logically-identical bootstrap lists such as
221+
// "b1:9092,b2:9092" and "b2:9092,b1:9092".
222+
223+
String cacheKey = normalizeBootstrapServers(bootstrapServers);
224+
217225
// Check the cache first
218-
if (metadataCache.containsKey(bootstrapServers)) {
219-
return metadataCache.get(bootstrapServers);
226+
if (metadataCache.containsKey(cacheKey)) {
227+
return metadataCache.get(cacheKey);
220228
}
221229

222-
// Fetch the metadata
230+
// Fetch the metadata using the *original* string (ordering is irrelevant
231+
// for the Kafka client itself)
223232
MetadataMessage metadataMessage = metadataConsumer.getMetadataMessage(bootstrapServers, originalProperties);
224233

225234
if (metadataMessage != null) {
226-
metadataCache.put(bootstrapServers, metadataMessage);
235+
metadataCache.put(cacheKey, metadataMessage);
227236
}
228237

229238
return metadataMessage;
230239
}
231240

241+
/**
242+
* Produce a canonical representation of the bootstrap servers list.
243+
* <p>
244+
* The input may contain duplicates, whitespace or different ordering – we
245+
* split on commas, trim each entry, drop empties, sort the list
246+
* lexicographically and join it back with commas. The resulting string can
247+
* safely be used as a map key that uniquely identifies a Kafka cluster.
248+
*/
249+
private static String normalizeBootstrapServers(String servers) {
250+
if (servers == null) {
251+
return "";
252+
}
253+
254+
String[] parts = servers.split(",");
255+
java.util.List<String> cleaned = new java.util.ArrayList<>();
256+
for (String p : parts) {
257+
if (p == null) continue;
258+
String trimmed = p.trim();
259+
if (!trimmed.isEmpty()) {
260+
cleaned.add(trimmed);
261+
}
262+
}
263+
java.util.Collections.sort(cleaned);
264+
return String.join(",", cleaned);
265+
}
266+
232267
/**
233268
* Get the list of application topics from the environment variable.
234269
*

superstream-clients/src/main/java/ai/superstream/model/MetadataMessage.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ public class MetadataMessage {
1212
private boolean active;
1313
private List<TopicConfiguration> topicsConfiguration;
1414

15+
// Optional: override for client stats reporting interval (milliseconds). Can be absent (null)
16+
@JsonProperty("report_interval_ms")
17+
private Long reportIntervalMs;
18+
1519
public MetadataMessage() {
1620
// Default constructor for Jackson
1721
}
@@ -50,6 +54,17 @@ public void setTopicsConfiguration(List<TopicConfiguration> topicsConfiguration)
5054
this.topicsConfiguration = topicsConfiguration;
5155
}
5256

57+
// Getter & setter for new report interval field
58+
@JsonProperty("report_interval_ms")
59+
public Long getReportIntervalMs() {
60+
return reportIntervalMs;
61+
}
62+
63+
@JsonProperty("report_interval_ms")
64+
public void setReportIntervalMs(Long reportIntervalMs) {
65+
this.reportIntervalMs = reportIntervalMs;
66+
}
67+
5368
@Override
5469
public boolean equals(Object o) {
5570
if (this == o) return true;

0 commit comments

Comments
 (0)