Skip to content

Commit 08d209c

Browse files
fix - parsing properties object with BS defined as list
1 parent bc38d1f commit 08d209c

File tree

1 file changed

+53
-5
lines changed

1 file changed

+53
-5
lines changed

superstream-clients/src/main/java/ai/superstream/agent/KafkaProducerInterceptor.java

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,9 @@ public static Properties extractProperties(Object[] args) {
374374

375375
if (arg instanceof Properties) {
376376
logger.debug("extractProperties: Found Properties object");
377-
return (Properties) arg;
377+
Properties props = (Properties) arg;
378+
normalizeBootstrapServers(props);
379+
return props;
378380
}
379381

380382
if (arg instanceof Map) {
@@ -421,7 +423,9 @@ public static Properties extractProperties(Object[] args) {
421423
} else if (fieldValue instanceof Properties) {
422424
logger.debug("extractProperties: Found Properties in ProducerConfig field {}",
423425
fieldName);
424-
return (Properties) fieldValue;
426+
Properties props = (Properties) fieldValue;
427+
normalizeBootstrapServers(props);
428+
return props;
425429
}
426430
} catch (NoSuchFieldException e) {
427431
// Field doesn't exist, try the next one
@@ -458,7 +462,9 @@ public static Properties extractProperties(Object[] args) {
458462
} else if (result instanceof Properties) {
459463
logger.debug("extractProperties: Found Properties in ProducerConfig method {} result",
460464
method.getName());
461-
return (Properties) result;
465+
Properties props = (Properties) result;
466+
normalizeBootstrapServers(props);
467+
return props;
462468
}
463469
}
464470
}
@@ -501,6 +507,48 @@ public static Properties extractProperties(Object[] args) {
501507
return null;
502508
}
503509

510+
/**
511+
* Ensure that the bootstrap.servers property is stored as a comma-separated String even when
512+
* the user supplied it as a Collection (or array) inside a java.util.Properties instance.
513+
* This keeps the rest of the optimisation pipeline – which relies on getProperty(String) – working.
514+
*/
515+
private static void normalizeBootstrapServers(Properties props) {
516+
if (props == null) {
517+
return;
518+
}
519+
520+
Object bsObj = props.get("bootstrap.servers");
521+
if (bsObj == null) {
522+
return;
523+
}
524+
525+
String joined = null;
526+
if (bsObj instanceof java.util.Collection) {
527+
java.util.Collection<?> col = (java.util.Collection<?>) bsObj;
528+
StringBuilder sb = new StringBuilder();
529+
for (Object o : col) {
530+
if (o == null) continue;
531+
if (sb.length() > 0) sb.append(',');
532+
sb.append(o.toString());
533+
}
534+
joined = sb.toString();
535+
} else if (bsObj.getClass().isArray()) {
536+
int len = java.lang.reflect.Array.getLength(bsObj);
537+
StringBuilder sb = new StringBuilder();
538+
for (int i = 0; i < len; i++) {
539+
Object o = java.lang.reflect.Array.get(bsObj, i);
540+
if (o == null) continue;
541+
if (sb.length() > 0) sb.append(',');
542+
sb.append(o.toString());
543+
}
544+
joined = sb.toString();
545+
}
546+
547+
if (joined != null && !joined.isEmpty()) {
548+
props.put("bootstrap.servers", joined);
549+
}
550+
}
551+
504552
/**
505553
* Determines if this constructor call is the initial creation from application
506554
* code
@@ -853,7 +901,7 @@ public boolean collectMetricsForProducer(String producerId, ProducerMetricsInfo
853901
keyString = namePart;
854902
}
855903
} catch (NumberFormatException e) {
856-
logger.debug("Failed to parse node ID: {}", e.getMessage());
904+
// ignore making us ignore the negative node IDs (represents metrics from the controller that we don't care about)
857905
}
858906
}
859907
}
@@ -896,7 +944,7 @@ public boolean collectMetricsForProducer(String producerId, ProducerMetricsInfo
896944
keyString = parts[2];
897945
}
898946
} catch (NumberFormatException e) {
899-
logger.debug("Failed to parse node ID: {}", e.getMessage());
947+
// ignore making us ignore the negative node IDs (represents metrics from the controller that we don't care about)
900948
}
901949
}
902950
}

0 commit comments

Comments
 (0)