Skip to content

Commit 5d56c1b

Browse files
fixes
1 parent 1502367 commit 5d56c1b

File tree

5 files changed

+72
-27
lines changed

5 files changed

+72
-27
lines changed

superclient/agent/interceptor.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,9 @@ def init_patch(self, *args, **kwargs):
6969
opt_cfg = {}
7070
else:
7171
# Get optimized configuration if Superstream is active
72-
opt_cfg = optimal_cfg(metadata, topics_env, orig_cfg, "kafka-python")
72+
opt_cfg, warning_msg = optimal_cfg(metadata, topics_env, orig_cfg, "kafka-python")
73+
if warning_msg:
74+
error_msg = warning_msg
7375

7476
# Apply optimized configuration
7577
for k, v in opt_cfg.items():
@@ -99,9 +101,6 @@ def init_patch(self, *args, **kwargs):
99101
)
100102
Heartbeat.register_tracker(tr)
101103

102-
# Initialize the producer with original configuration
103-
orig_init(self, *args, **kwargs)
104-
105104
# Patch send and close methods if not already patched
106105
if not hasattr(self, "_superstream_patch"):
107106
original_send = self.send
@@ -125,7 +124,7 @@ def close_patch(*a, **kw):
125124
self.close = close_patch
126125
self._superstream_patch = True
127126

128-
# Initialize again with optimized configuration
127+
# Initialize with optimized configuration
129128
orig_init(self, *args, **kwargs)
130129

131130
# Send client registration message
@@ -191,7 +190,9 @@ def init_patch(self, *args, **kwargs):
191190
opt_cfg = {}
192191
else:
193192
# Get optimized configuration if Superstream is active
194-
opt_cfg = optimal_cfg(metadata, topics_env, orig_cfg, "aiokafka")
193+
opt_cfg, warning_msg = optimal_cfg(metadata, topics_env, orig_cfg, "aiokafka")
194+
if warning_msg:
195+
error_msg = warning_msg
195196
for k, v in opt_cfg.items():
196197
current_val = kwargs.get(k)
197198
if current_val != v:
@@ -214,7 +215,6 @@ def init_patch(self, *args, **kwargs):
214215
topics_env=topics_env,
215216
)
216217
Heartbeat.register_tracker(tr)
217-
orig_init(self, *args, **kwargs)
218218
if not hasattr(self, "_superstream_patch"):
219219
original_send = self.send
220220

@@ -294,7 +294,9 @@ def init_patch(self, conf: Dict[str, Any], *args, **kwargs):
294294
opt_cfg = {}
295295
else:
296296
# Get optimized configuration if Superstream is active
297-
opt_cfg = optimal_cfg(metadata, topics_env, conf, "confluent")
297+
opt_cfg, warning_msg = optimal_cfg(metadata, topics_env, conf, "confluent")
298+
if warning_msg:
299+
error_msg = warning_msg
298300
for k, v in opt_cfg.items():
299301
current_val = conf.get(k)
300302
if current_val != v:

superclient/agent/metadata.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -126,25 +126,40 @@ def fetch_metadata(
126126
logger.error("[ERR-203] Failed to fetch metadata: {}", exc)
127127
return None
128128

129-
def optimal_cfg(metadata: Optional[Dict[str, Any]], topics: list[str], orig: Dict[str, Any], lib_name: str) -> Dict[str, Any]:
130-
"""Compute optimal configuration based on metadata and topics."""
129+
def optimal_cfg(metadata: Optional[Dict[str, Any]], topics: list[str], orig: Dict[str, Any], lib_name: str) -> tuple[Dict[str, Any], str]:
130+
"""Compute optimal configuration based on metadata and topics.
131+
132+
Returns:
133+
tuple: (configuration_dict, warning_message)
134+
"""
131135
latency = os.getenv("SUPERSTREAM_LATENCY_SENSITIVE", "false").lower() == "true"
132136
cfg: Dict[str, Any]
137+
warning_msg = ""
138+
133139
if not metadata or not metadata.get("topics_configuration"):
134140
logger.debug("No metadata or topics_configuration found; applying default configuration: %s", _DEFAULTS)
135-
logger.warning("The topics you're publishing to haven't been analyzed yet. For optimal results, either wait for the next analysis cycle or trigger one manually via the SuperClient Console")
141+
if not topics:
142+
warning_msg = "No SUPERSTREAM_TOPICS_LIST environment variable set. Please set it to enable topic-specific optimizations."
143+
else:
144+
warning_msg = "The topics you're publishing to haven't been analyzed yet. For optimal results, either wait for the next analysis cycle or trigger one manually via the SuperClient Console"
145+
logger.warning(warning_msg)
136146
cfg = dict(_DEFAULTS)
137147
else:
138148
matches = [tc for tc in metadata["topics_configuration"] if tc["topic_name"] in topics]
139149
if not matches:
140150
logger.debug("No matching topics found in metadata; applying default configuration: %s", _DEFAULTS)
141-
logger.warning("The topics you're publishing to haven't been analyzed yet. For optimal results, either wait for the next analysis cycle or trigger one manually via the SuperClient Console")
151+
if not topics:
152+
warning_msg = "No SUPERSTREAM_TOPICS_LIST environment variable set. Please set it to enable topic-specific optimizations."
153+
else:
154+
warning_msg = "The topics you're publishing to haven't been analyzed yet. For optimal results, either wait for the next analysis cycle or trigger one manually via the SuperClient Console"
155+
logger.warning(warning_msg)
142156
cfg = dict(_DEFAULTS)
143157
else:
144158
best = max(matches, key=lambda tc: tc["potential_reduction_percentage"] * tc["daily_writes_bytes"])
145159
cfg = dict(best.get("optimized_configuration", {}))
146160
for k, v in _DEFAULTS.items():
147161
cfg.setdefault(k, v)
162+
148163
if latency:
149164
cfg.pop("linger.ms", None)
150165

@@ -165,4 +180,4 @@ def optimal_cfg(metadata: Optional[Dict[str, Any]], topics: list[str], orig: Dic
165180
pass
166181

167182
# Translate Java-style keys to library-specific keys
168-
return translate_java_to_lib(cfg, lib_name)
183+
return translate_java_to_lib(cfg, lib_name), warning_msg

superclient/agent/tracker.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ def determine_topic(self) -> str:
5252
"""Get the most impactful topic for this producer based on metadata analysis."""
5353
if not self.metadata or not self.metadata.get("topics_configuration"):
5454
# Fallback to first topic if no metadata available
55-
logger.warning("No metadata available for producer {}, falling back to first used topic", self.client_id)
5655
return sorted(self.topics)[0] if self.topics else ""
5756

5857
# Find matching topic configurations from metadata based on environment topics only
@@ -63,8 +62,6 @@ def determine_topic(self) -> str:
6362

6463
if not matches:
6564
# Fallback to first environment topic if no matches
66-
logger.warning("No matching topics found in metadata for producer {} (env topics: {}), falling back to first environment topic",
67-
self.client_id, self.topics_env)
6865
return sorted(self.topics_env)[0] if self.topics_env else ""
6966

7067
# Use the same logic as optimal_cfg: find the topic with highest impact

superclient/core/reporter.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,18 +78,26 @@ def internal_send_clients(bootstrap: str, base_cfg: Dict[str, Any], payload: byt
7878
return
7979

8080
try:
81+
# Handle aiokafka (async library)
8182
if lib_name == "aiokafka":
8283
asyncio.run(internal_send_clients_async(bootstrap, base_cfg, payload))
8384
return
8485

85-
prod = builder(bootstrap, base_cfg)
86-
if hasattr(prod, "produce"):
87-
prod.produce("superstream.clients", payload)
88-
prod.flush()
89-
else:
86+
# Handle kafka-python (sync library)
87+
if lib_name == "kafka-python":
88+
prod = builder(bootstrap, base_cfg)
9089
prod.send("superstream.clients", payload)
9190
prod.flush()
9291
prod.close()
92+
return
93+
94+
# Handle confluent-kafka (sync library with different API)
95+
if lib_name == "confluent":
96+
prod = builder(bootstrap, base_cfg)
97+
prod.produce("superstream.clients", payload)
98+
prod.flush()
99+
return
100+
93101
except Exception:
94102
logger.debug("Failed to send clients message via {}", lib_name)
95103

superclient/util/config.py

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,27 @@ def get_topics_list() -> List[str]:
3030

3131
def mask_sensitive(k: str, v: Any) -> Any:
3232
"""Mask sensitive configuration values."""
33-
return "[MASKED]" if "password" in k.lower() or "sasl.jaas.config" in k.lower() else v
33+
sensitive_patterns = [
34+
"password", "sasl.jaas.config", "basic.auth.user.info",
35+
"ssl.key", "ssl.cert", "ssl.truststore", "ssl.keystore",
36+
"sasl.kerberos.keytab", "sasl.kerberos.principal"
37+
]
38+
return "[MASKED]" if any(pattern in k.lower() for pattern in sensitive_patterns) else v
39+
40+
def _serialize_config_value(v: Any) -> Any:
41+
"""Convert configuration values to JSON-serializable format."""
42+
if callable(v):
43+
# Convert functions to string representation
44+
return f"<function: {v.__name__ if hasattr(v, '__name__') else str(v)}>"
45+
elif hasattr(v, '__dict__'):
46+
# Handle objects that might have __dict__ but aren't functions
47+
try:
48+
# Try to serialize as dict, fallback to string representation
49+
return v.__dict__
50+
except:
51+
return f"<object: {type(v).__name__}>"
52+
else:
53+
return v
3454

3555
def copy_client_configuration_properties(src: Dict[str, Any], dst: Dict[str, Any]):
3656
"""Copy essential client configuration properties from source to destination.
@@ -137,7 +157,6 @@ def copy_client_configuration_properties(src: Dict[str, Any], dst: Dict[str, Any
137157
"sasl.kerberos.domain.name": "sasl_kerberos_domain_name",
138158
"sasl.oauth.token.provider": "sasl_oauth_token_provider",
139159
"socks5.proxy": "socks5_proxy",
140-
"compression.codec": "compression_type", # Maps to compression_type
141160
},
142161
"aiokafka": {
143162
# Basic configuration
@@ -159,7 +178,6 @@ def copy_client_configuration_properties(src: Dict[str, Any], dst: Dict[str, Any
159178
"enable.idempotence": "enable_idempotence",
160179
"security.protocol": "security_protocol",
161180
"sasl.mechanism": "sasl_mechanism",
162-
"compression.codec": "compression_type", # Maps to compression_type
163181
},
164182
"confluent": {
165183
# Confluent uses Java-style names directly, so most mappings are 1:1
@@ -256,7 +274,7 @@ def translate_lib_to_java(cfg: Dict[str, Any], lib_name: str) -> Dict[str, Any]:
256274
"enable_idempotence": False,
257275
"delivery_timeout_ms": 120000,
258276
"acks": 1,
259-
"compression_type": None,
277+
"compression_type": "none",
260278
"retries": 0,
261279
"batch_size": 16384,
262280
"linger_ms": 0,
@@ -303,7 +321,7 @@ def translate_lib_to_java(cfg: Dict[str, Any], lib_name: str) -> Dict[str, Any]:
303321

304322
# Producer specific
305323
"acks": 1,
306-
"compression_type": None,
324+
"compression_type": "none",
307325
"max_batch_size": 16384, # aiokafka uses max_batch_size
308326
"linger_ms": 0,
309327
"partitioner": None,
@@ -422,5 +440,10 @@ def get_original_config(orig_cfg: Dict[str, Any], lib_name: str) -> Dict[str, An
422440
for k, v in defaults_java.items():
423441
if k not in user_keys_java:
424442
merged[k] = v
443+
444+
# Serialize any function objects to make them JSON-serializable
445+
serialized: Dict[str, Any] = {}
446+
for k, v in merged.items():
447+
serialized[k] = _serialize_config_value(v)
425448

426-
return merged
449+
return serialized

0 commit comments

Comments
 (0)