Skip to content

Commit 5e84cba

Browse files
General enhancements
1 parent 66cfd7c commit 5e84cba

File tree

9 files changed

+139
-320
lines changed

9 files changed

+139
-320
lines changed

.idea/runConfigurations/kafka_clients_example.xml

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

superstream-clients/dependency-reduced-pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<groupId>ai.superstream</groupId>
55
<artifactId>superstream-clients</artifactId>
66
<name>Superstream Kafka Client Optimizer</name>
7-
<version>1.0.205</version>
7+
<version>1.0.206</version>
88
<description>A Java library that dynamically optimizes Kafka client configuration based on recommendations</description>
99
<url>https://github.com/superstreamlabs/superstream-clients-java</url>
1010
<developers>

superstream-clients/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>ai.superstream</groupId>
88
<artifactId>superstream-clients</artifactId>
9-
<version>1.0.205</version>
9+
<version>1.0.206</version>
1010
<packaging>jar</packaging>
1111

1212
<name>Superstream Kafka Client Optimizer</name>

superstream-clients/src/main/java/ai/superstream/agent/KafkaProducerInterceptor.java

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import ai.superstream.core.SuperstreamManager;
55
import ai.superstream.model.MetadataMessage;
66
import ai.superstream.util.SuperstreamLogger;
7+
import ai.superstream.util.ClientUtils;
78
import net.bytebuddy.asm.Advice;
89
import java.util.AbstractMap;
910

@@ -18,7 +19,6 @@
1819
import java.util.concurrent.atomic.AtomicBoolean;
1920
import java.util.concurrent.atomic.AtomicReference;
2021
import java.lang.ThreadLocal;
21-
import java.util.Collections;
2222
import java.util.List;
2323
import java.util.Arrays;
2424

@@ -182,24 +182,6 @@ public static void onEnter(@Advice.AllArguments Object[] args) {
182182
immutableOriginalMap.getClass().getName());
183183
logger.error(errMsg);
184184

185-
// Report the error to the client
186-
try {
187-
// Get metadata message before reporting
188-
AbstractMap.SimpleEntry<MetadataMessage, String> metadataResult = SuperstreamManager.getInstance().getOrFetchMetadataMessage(bootstrapServers, properties);
189-
MetadataMessage metadataMessage = metadataResult.getKey();
190-
SuperstreamManager.getInstance().reportClientInformation(
191-
bootstrapServers,
192-
properties,
193-
metadataMessage,
194-
clientId,
195-
properties,
196-
Collections.emptyMap(), // no optimized configuration since we can't optimize
197-
errMsg
198-
);
199-
} catch (Exception e) {
200-
logger.error("[ERR-058] Failed to report client error: {}", e.getMessage(), e);
201-
}
202-
203185
// Push ConfigInfo with error and original config for stats reporting
204186
java.util.Deque<ConfigInfo> cfgStack = TL_CFG_STACK.get();
205187
cfgStack.push(new ConfigInfo(propertiesToMap(properties), new java.util.HashMap<>(), errMsg));
@@ -349,7 +331,7 @@ public static void onExit(@Advice.This Object producer) {
349331
}
350332
if (cfgInfo != null) {
351333
// Use the original configuration from ConfigInfo and get complete config with defaults
352-
java.util.Map<String, Object> completeConfig = ai.superstream.core.ClientReporter.getCompleteProducerConfig(cfgInfo.originalConfig);
334+
java.util.Map<String, Object> completeConfig = ClientUtils.getCompleteProducerConfig(cfgInfo.originalConfig);
353335
java.util.Map<String, Object> optimizedConfig = cfgInfo.optimizedConfig != null ? cfgInfo.optimizedConfig : new java.util.HashMap<>();
354336
reporter.setConfigurations(completeConfig, optimizedConfig);
355337
// If optimizedConfig is empty and there is an error, set the error on the reporter
@@ -360,7 +342,7 @@ public static void onExit(@Advice.This Object producer) {
360342
// No ConfigInfo available, so no optimization was performed
361343
// Use the producer properties as both original and optimized (since no changes were made)
362344
java.util.Map<String, Object> originalPropsMap = propertiesToMap(producerProps);
363-
java.util.Map<String, Object> completeConfig = ai.superstream.core.ClientReporter.getCompleteProducerConfig(originalPropsMap);
345+
java.util.Map<String, Object> completeConfig = ClientUtils.getCompleteProducerConfig(originalPropsMap);
364346
reporter.setConfigurations(completeConfig, new java.util.HashMap<>());
365347
}
366348

superstream-clients/src/main/java/ai/superstream/core/ClientReporter.java

Lines changed: 0 additions & 232 deletions
This file was deleted.

superstream-clients/src/main/java/ai/superstream/core/ClientStatsReporter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import ai.superstream.util.NetworkUtils;
66
import ai.superstream.util.SuperstreamLogger;
77
import ai.superstream.util.KafkaPropertiesUtils;
8+
import ai.superstream.util.ClientUtils;
89

910
import com.fasterxml.jackson.databind.ObjectMapper;
1011
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -139,7 +140,7 @@ void drainInto(Producer<String, String> producer) {
139140
NetworkUtils.getLocalIpAddress(),
140141
totalBytesBefore,
141142
totalBytesAfter,
142-
ClientReporter.getClientVersion(),
143+
ClientUtils.getClientVersion(),
143144
NetworkUtils.getHostname(),
144145
producerUuid);
145146

@@ -336,7 +337,8 @@ private void run() {
336337
baseProps.forEach((key, value) -> {
337338
// Mask sensitive values
338339
if (key.toString().toLowerCase().contains("password") ||
339-
key.toString().toLowerCase().contains("sasl.jaas.config")) {
340+
key.toString().toLowerCase().contains("sasl.jaas.config") ||
341+
key.toString().equals("basic.auth.user.info")) {
340342
configLog.append(key).append("=[MASKED], ");
341343
} else {
342344
configLog.append(key).append("=").append(value).append(", ");

superstream-clients/src/main/java/ai/superstream/core/MetadataConsumer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ public java.util.AbstractMap.SimpleEntry<MetadataMessage, String> getMetadataMes
5050
properties.forEach((key, value) -> {
5151
// Mask sensitive values
5252
if (key.toString().toLowerCase().contains("password") ||
53-
key.toString().toLowerCase().contains("sasl.jaas.config")) {
53+
key.toString().toLowerCase().contains("sasl.jaas.config") ||
54+
key.toString().equals("basic.auth.user.info")) {
5455
configLog.append(key).append("=[MASKED], ");
5556
} else {
5657
configLog.append(key).append("=").append(value).append(", ");

0 commit comments

Comments
 (0)