Skip to content

Commit 04a51d9

Browse files
general improvements
1 parent 76487c4 commit 04a51d9

File tree

9 files changed

+275
-448
lines changed

9 files changed

+275
-448
lines changed

superclient/agent/clients.py

Lines changed: 0 additions & 125 deletions
This file was deleted.

superclient/agent/interceptor.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ def init_patch(self, *args, **kwargs):
6969
opt_cfg = {}
7070
else:
7171
# Get optimized configuration if Superstream is active
72-
opt_cfg = optimal_cfg(metadata, topics_env, orig_cfg)
72+
opt_cfg = optimal_cfg(metadata, topics_env, orig_cfg, "kafka-python")
7373

7474
# Apply optimized configuration
7575
for k, v in opt_cfg.items():
@@ -91,6 +91,8 @@ def init_patch(self, *args, **kwargs):
9191
opt_cfg=opt_cfg,
9292
report_interval_ms=int(report_interval or _DEFAULT_REPORT_INTERVAL_MS),
9393
error=error_msg, # Store error message in tracker
94+
metadata=metadata,
95+
topics_env=topics_env,
9496
)
9597
Heartbeat.register_tracker(tr)
9698

@@ -186,7 +188,7 @@ def init_patch(self, *args, **kwargs):
186188
opt_cfg = {}
187189
else:
188190
# Get optimized configuration if Superstream is active
189-
opt_cfg = optimal_cfg(metadata, topics_env, orig_cfg)
191+
opt_cfg = optimal_cfg(metadata, topics_env, orig_cfg, "aiokafka")
190192
for k, v in opt_cfg.items():
191193
if kwargs.get(k) != v:
192194
logger.debug("Overriding configuration: {} -> {}", k, v)
@@ -201,6 +203,8 @@ def init_patch(self, *args, **kwargs):
201203
opt_cfg=opt_cfg,
202204
report_interval_ms=int(report_interval or _DEFAULT_REPORT_INTERVAL_MS),
203205
error=error_msg, # Store error message in tracker
206+
metadata=metadata,
207+
topics_env=topics_env,
204208
)
205209
Heartbeat.register_tracker(tr)
206210
orig_init(self, *args, **kwargs)
@@ -283,7 +287,7 @@ def init_patch(self, conf: Dict[str, Any], *args, **kwargs):
283287
opt_cfg = {}
284288
else:
285289
# Get optimized configuration if Superstream is active
286-
opt_cfg = optimal_cfg(metadata, topics_env, conf)
290+
opt_cfg = optimal_cfg(metadata, topics_env, conf, "confluent")
287291
for k, v in opt_cfg.items():
288292
if conf.get(k) != v:
289293
logger.debug("Overriding configuration: {} -> {}", k, v)
@@ -298,6 +302,8 @@ def init_patch(self, conf: Dict[str, Any], *args, **kwargs):
298302
opt_cfg=opt_cfg,
299303
report_interval_ms=int(report_interval or _DEFAULT_REPORT_INTERVAL_MS),
300304
error=error_msg, # Store error message in tracker
305+
metadata=metadata,
306+
topics_env=topics_env,
301307
)
302308
Heartbeat.register_tracker(tr)
303309
orig_init(self, conf, *args, **kwargs)

superclient/agent/metadata.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from typing import Any, Dict, Optional, Literal
66

77
from ..util.logger import get_logger
8-
from ..util.config import copy_client_configuration_properties
8+
from ..util.config import copy_client_configuration_properties, translate_java_to_lib
99

1010
logger = get_logger("agent.metadata")
1111

@@ -126,7 +126,7 @@ def fetch_metadata(
126126
logger.error("[ERR-203] Failed to fetch metadata: {}", exc)
127127
return None
128128

129-
def optimal_cfg(metadata: Optional[Dict[str, Any]], topics: list[str], orig: Dict[str, Any]) -> Dict[str, Any]:
129+
def optimal_cfg(metadata: Optional[Dict[str, Any]], topics: list[str], orig: Dict[str, Any], lib_name: str) -> Dict[str, Any]:
130130
"""Compute optimal configuration based on metadata and topics."""
131131
latency = os.getenv("SUPERSTREAM_LATENCY_SENSITIVE", "false").lower() == "true"
132132
cfg: Dict[str, Any]
@@ -154,4 +154,6 @@ def optimal_cfg(metadata: Optional[Dict[str, Any]], topics: list[str], orig: Dic
154154
cfg[p] = orig[p]
155155
except Exception:
156156
pass
157-
return cfg
157+
158+
# Translate Java-style keys to library-specific keys
159+
return translate_java_to_lib(cfg, lib_name)

0 commit comments

Comments
 (0)