Skip to content

Commit a77e4db

Browse files
remove unused code from metadata report
1 parent 43a499d commit a77e4db

File tree

6 files changed

+59
-49
lines changed

6 files changed

+59
-49
lines changed

README.md

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,6 @@ KafkaProducer<String, String> producer = new KafkaProducer<>(
7777
Map.of("bootstrap.servers", "localhost:9092")
7878
);
7979

80-
// Using ProducerConfig.originals() which returns an unmodifiable copy
81-
ProducerConfig config = new ProducerConfig(props);
82-
KafkaProducer<String, String> producer = new KafkaProducer<>(config.originals());
83-
8480
// Using KafkaTemplate's getProducerFactory().getConfigurationProperties()
8581
// which returns an unmodifiable map
8682
KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory);

examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaProducerExample.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,23 @@ public class KafkaProducerExample {
2727
private static final String MESSAGE_VALUE = generateLargeCompressibleMessage();
2828

2929
public static void main(String[] args) {
30-
// Create a Map with the configuration
31-
Map<String, Object> props = new HashMap<>();
32-
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, DEFAULT_BOOTSTRAP_SERVERS);
33-
props.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID);
34-
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
35-
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
36-
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, COMPRESSION_TYPE);
37-
props.put(ProducerConfig.BATCH_SIZE_CONFIG, BATCH_SIZE);
38-
props.put(ProducerConfig.LINGER_MS_CONFIG, 500);
30+
// Build the configuration map first using a mutable map
31+
Map<String, Object> mutableProps = new java.util.HashMap<>();
32+
mutableProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, DEFAULT_BOOTSTRAP_SERVERS);
33+
mutableProps.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID);
34+
mutableProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
35+
mutableProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
36+
mutableProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, COMPRESSION_TYPE);
37+
mutableProps.put(ProducerConfig.BATCH_SIZE_CONFIG, BATCH_SIZE);
38+
mutableProps.put(ProducerConfig.LINGER_MS_CONFIG, 500);
3939

40+
// Wrap the map to make it immutable – simulates a user supplying an unmodifiable configuration object
41+
// Map<String, Object> props = java.util.Collections.unmodifiableMap(mutableProps);
42+
43+
// Pass the immutable map directly to the KafkaProducer constructor
44+
Producer<String, String> producer = new KafkaProducer<String, String>(mutableProps);
4045

4146
long recordCount = 50; // Number of messages to send
42-
Producer<String, String> producer = new KafkaProducer<>(props);
4347
try {
4448
while (true) {
4549
// Send 50 large messages to see compression benefits

superstream-clients/src/main/java/ai/superstream/agent/KafkaProducerInterceptor.java

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,32 @@ public static void onEnter(@Advice.AllArguments Object[] args) {
123123
return;
124124
}
125125

126+
// Detect immutable Map argument (e.g., Collections.unmodifiableMap) so we can skip optimisation early
127+
boolean immutableConfigDetected = false;
128+
java.util.Map<String,Object> immutableOriginalMap = null;
129+
for (Object arg : args) {
130+
if (arg instanceof java.util.Map && arg.getClass().getName().contains("UnmodifiableMap")) {
131+
immutableConfigDetected = true;
132+
@SuppressWarnings("unchecked")
133+
java.util.Map<String,Object> tmp = (java.util.Map<String,Object>) arg;
134+
immutableOriginalMap = tmp;
135+
break;
136+
}
137+
}
138+
139+
if (immutableConfigDetected && immutableOriginalMap != null) {
140+
String errMsg = String.format("[ERR-010] Cannot optimize KafkaProducer configuration: received an unmodifiable Map (%s). Please pass a mutable java.util.Properties or java.util.Map instead.",
141+
immutableOriginalMap.getClass().getName());
142+
logger.error(errMsg);
143+
144+
// Clean up ThreadLocals so that onExit knows to skip reporter setup
145+
TL_PROPS_STACK.remove();
146+
TL_UUID_STACK.remove();
147+
148+
// Do NOT attempt optimisation
149+
return;
150+
}
151+
126152
// Make a copy of the original properties in case we need to restore them
127153
Properties originalProperties = new Properties();
128154
originalProperties.putAll(properties);
@@ -286,14 +312,7 @@ public static Properties extractProperties(Object[] args) {
286312
if (arg instanceof Map) {
287313
logger.debug("extractProperties: Found Map object of type: {}", arg.getClass().getName());
288314

289-
// If the map is unmodifiable we cannot change producer configuration -> cannot optimise
290-
if (arg.getClass().getName().contains("UnmodifiableMap")) {
291-
logger.error("[ERR-010] Cannot optimize KafkaProducer configuration: received an unmodifiable Map ({}). " +
292-
"Please pass a mutable java.util.Properties or java.util.Map instead.",
293-
arg.getClass().getName());
294-
return null; // signal caller to skip optimisation
295-
}
296-
315+
// If the map is unmodifiable we cannot actually modify it later; we still let the caller decide
297316
try {
298317
@SuppressWarnings("unchecked")
299318
Map<String, Object> map = (Map<String, Object>) arg;

superstream-clients/src/main/java/ai/superstream/core/ClientReporter.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,8 @@
1515
import java.io.InputStream;
1616
import java.lang.reflect.Field;
1717
import java.util.HashMap;
18-
import java.util.List;
1918
import java.util.Map;
2019
import java.util.Properties;
21-
import java.util.concurrent.ExecutionException;
22-
import java.util.concurrent.TimeUnit;
23-
import java.util.concurrent.TimeoutException;
2420

2521
/**
2622
* Reports client information to the superstream.clients topic.
@@ -42,15 +38,14 @@ public class ClientReporter {
4238
* @param clientId The client ID
4339
* @param originalConfiguration The original configuration
4440
* @param optimizedConfiguration The optimized configuration
45-
* @param topics The list of topics
4641
* @param mostImpactfulTopic The most impactful topic
4742
* @param producerUuid The producer UUID to include in the report
4843
* @return True if the message was sent successfully, false otherwise
4944
*/
5045
public boolean reportClient(String bootstrapServers, Properties originalClientProperties, int superstreamClusterId, boolean active,
5146
String clientId, Map<String, Object> originalConfiguration,
5247
Map<String, Object> optimizedConfiguration,
53-
List<String> topics, String mostImpactfulTopic, String producerUuid) {
48+
String mostImpactfulTopic, String producerUuid) {
5449
Properties properties = new Properties();
5550

5651
// Copy all authentication-related and essential properties from the original client
@@ -77,7 +72,6 @@ public boolean reportClient(String bootstrapServers, Properties originalClientPr
7772
CLIENT_TYPE,
7873
getCompleteProducerConfig(originalConfiguration),
7974
optimizedConfiguration,
80-
topics,
8175
mostImpactfulTopic,
8276
NetworkUtils.getHostname(),
8377
producerUuid

superstream-clients/src/main/java/ai/superstream/core/SuperstreamManager.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,6 @@ private void reportClientInformation(String bootstrapServers, Properties origina
271271
clientId,
272272
originalConfiguration1,
273273
optimizedConfiguration,
274-
topics,
275274
mostImpactfulTopic,
276275
producerUuid
277276
);

superstream-clients/src/main/java/ai/superstream/model/ClientMessage.java

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import com.fasterxml.jackson.annotation.JsonProperty;
44

5-
import java.util.List;
65
import java.util.Map;
76
import java.util.Objects;
87

@@ -19,19 +18,19 @@ public class ClientMessage {
1918
private String clientType;
2019
private Map<String, Object> originalConfiguration;
2120
private Map<String, Object> optimizedConfiguration;
22-
private List<String> topics;
2321
private String mostImpactfulTopic;
2422
private Map<String, String> environmentVariables;
2523
private String hostname;
2624
private String producerUuid;
25+
private String error;
2726

2827
public ClientMessage() {
2928
// Default constructor for Jackson
3029
}
3130

3231
public ClientMessage(int superstreamClusterId, boolean active, String clientId, String ipAddress, String clientVersion, String language, String clientType,
3332
Map<String, Object> originalConfiguration, Map<String, Object> optimizedConfiguration,
34-
List<String> topics, String mostImpactfulTopic, String hostname, String producerUuid) {
33+
String mostImpactfulTopic, String hostname, String producerUuid) {
3534
this.superstreamClusterId = superstreamClusterId;
3635
this.active = active;
3736
this.clientId = clientId;
@@ -41,7 +40,6 @@ public ClientMessage(int superstreamClusterId, boolean active, String clientId,
4140
this.clientType = clientType;
4241
this.originalConfiguration = originalConfiguration;
4342
this.optimizedConfiguration = optimizedConfiguration;
44-
this.topics = topics;
4543
this.mostImpactfulTopic = mostImpactfulTopic;
4644
this.environmentVariables = ai.superstream.util.EnvironmentVariables.getSuperstreamEnvironmentVariables();
4745
this.hostname = hostname;
@@ -138,16 +136,6 @@ public void setOptimizedConfiguration(Map<String, Object> optimizedConfiguration
138136
this.optimizedConfiguration = optimizedConfiguration;
139137
}
140138

141-
@JsonProperty("topics")
142-
public List<String> getTopics() {
143-
return topics;
144-
}
145-
146-
@JsonProperty("topics")
147-
public void setTopics(List<String> topics) {
148-
this.topics = topics;
149-
}
150-
151139
@JsonProperty("most_impactful_topic")
152140
public String getMostImpactfulTopic() {
153141
return mostImpactfulTopic;
@@ -188,6 +176,16 @@ public void setProducerUuid(String producerUuid) {
188176
this.producerUuid = producerUuid;
189177
}
190178

179+
@JsonProperty("error")
180+
public String getError() {
181+
return error;
182+
}
183+
184+
@JsonProperty("error")
185+
public void setError(String error) {
186+
this.error = error;
187+
}
188+
191189
@Override
192190
public boolean equals(Object o) {
193191
if (this == o) return true;
@@ -202,18 +200,18 @@ public boolean equals(Object o) {
202200
Objects.equals(clientType, that.clientType) &&
203201
Objects.equals(originalConfiguration, that.originalConfiguration) &&
204202
Objects.equals(optimizedConfiguration, that.optimizedConfiguration) &&
205-
Objects.equals(topics, that.topics) &&
206203
Objects.equals(mostImpactfulTopic, that.mostImpactfulTopic) &&
207204
Objects.equals(environmentVariables, that.environmentVariables) &&
208205
Objects.equals(hostname, that.hostname) &&
209-
Objects.equals(producerUuid, that.producerUuid);
206+
Objects.equals(producerUuid, that.producerUuid) &&
207+
Objects.equals(error, that.error);
210208
}
211209

212210
@Override
213211
public int hashCode() {
214212
return Objects.hash(superstreamClusterId, active, clientId, ipAddress, clientVersion, language, clientType,
215-
originalConfiguration, optimizedConfiguration, topics, mostImpactfulTopic,
216-
environmentVariables, hostname, producerUuid);
213+
originalConfiguration, optimizedConfiguration, mostImpactfulTopic,
214+
environmentVariables, hostname, producerUuid, error);
217215
}
218216

219217
@Override
@@ -228,11 +226,11 @@ public String toString() {
228226
", client_type='" + clientType + '\'' +
229227
", original_configuration=" + originalConfiguration +
230228
", optimized_configuration=" + optimizedConfiguration +
231-
", topics=" + topics +
232229
", most_impactful_topic='" + mostImpactfulTopic + '\'' +
233230
", environment_variables=" + environmentVariables +
234231
", hostname='" + hostname + '\'' +
235232
", superstream_client_uid='" + producerUuid + '\'' +
233+
", error='" + error + '\'' +
236234
'}';
237235
}
238236
}

0 commit comments

Comments
 (0)