Skip to content

Commit 1502367

Browse files
fixes
1 parent d360601 commit 1502367

File tree

2 files changed

+31
-10
lines changed

2 files changed

+31
-10
lines changed

superclient/agent/interceptor.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,12 @@ def init_patch(self, *args, **kwargs):
7373

7474
# Apply optimized configuration
7575
for k, v in opt_cfg.items():
76-
if kwargs.get(k) != v:
77-
logger.debug("Overriding configuration: {} -> {}", k, v)
76+
current_val = kwargs.get(k)
77+
if current_val != v:
78+
if k in kwargs:
79+
logger.debug("Overriding configuration: {} ({} -> {})", k, current_val, v)
80+
else:
81+
logger.debug("Overriding configuration: {} ((not set) -> {})", k, v)
7882
kwargs[k] = v
7983

8084
# Set up reporting interval
@@ -189,8 +193,12 @@ def init_patch(self, *args, **kwargs):
189193
# Get optimized configuration if Superstream is active
190194
opt_cfg = optimal_cfg(metadata, topics_env, orig_cfg, "aiokafka")
191195
for k, v in opt_cfg.items():
192-
if kwargs.get(k) != v:
193-
logger.debug("Overriding configuration: {} -> {}", k, v)
196+
current_val = kwargs.get(k)
197+
if current_val != v:
198+
if k in kwargs:
199+
logger.debug("Overriding configuration: {} ({} -> {})", k, current_val, v)
200+
else:
201+
logger.debug("Overriding configuration: {} ((not set) -> {})", k, v)
194202
kwargs[k] = v
195203
report_interval = metadata.get("report_interval_ms") if metadata else _DEFAULT_REPORT_INTERVAL_MS
196204
tr = ProducerTracker(
@@ -288,8 +296,12 @@ def init_patch(self, conf: Dict[str, Any], *args, **kwargs):
288296
# Get optimized configuration if Superstream is active
289297
opt_cfg = optimal_cfg(metadata, topics_env, conf, "confluent")
290298
for k, v in opt_cfg.items():
291-
if conf.get(k) != v:
292-
logger.debug("Overriding configuration: {} -> {}", k, v)
299+
current_val = conf.get(k)
300+
if current_val != v:
301+
if k in conf:
302+
logger.debug("Overriding configuration: {} ({} -> {})", k, current_val, v)
303+
else:
304+
logger.debug("Overriding configuration: {} ((not set) -> {})", k, v)
293305
conf[k] = v
294306
report_interval = metadata.get("report_interval_ms") if metadata else _DEFAULT_REPORT_INTERVAL_MS
295307
tr = ProducerTracker(

superclient/agent/metadata.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -147,11 +147,20 @@ def optimal_cfg(metadata: Optional[Dict[str, Any]], topics: list[str], orig: Dic
147147
cfg.setdefault(k, v)
148148
if latency:
149149
cfg.pop("linger.ms", None)
150-
for p in ("batch.size", "linger.ms"):
151-
if p in orig and p in cfg:
150+
151+
# Translate Java-style keys to library-specific keys for comparison
152+
java_keys_to_check = ["batch.size", "linger.ms"]
153+
lib_keys_to_check = []
154+
for java_key in java_keys_to_check:
155+
translated = translate_java_to_lib({java_key: ""}, lib_name)
156+
lib_key = list(translated.keys())[0] if translated else java_key
157+
lib_keys_to_check.append(lib_key)
158+
159+
for java_key, lib_key in zip(java_keys_to_check, lib_keys_to_check):
160+
if lib_key in orig and java_key in cfg:
152161
try:
153-
if int(orig[p]) > int(cfg[p]):
154-
cfg[p] = orig[p]
162+
if int(orig[lib_key]) > int(cfg[java_key]):
163+
cfg[java_key] = orig[lib_key]
155164
except Exception:
156165
pass
157166

0 commit comments

Comments
 (0)