Skip to content

Commit d806003

Browse files
fixes
1 parent b05cd8a commit d806003

File tree

2 files changed

+39
-1
lines changed

2 files changed

+39
-1
lines changed

superclient/agent/interceptor.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ def init_patch(self, *args, **kwargs):
3737
# Store original configuration
3838
orig_cfg = dict(kwargs)
3939

40+
# Normalize compression type: convert None to "none" string
41+
if "compression_type" in orig_cfg and orig_cfg["compression_type"] is None:
42+
orig_cfg["compression_type"] = "none"
43+
4044
# Get bootstrap servers from args or kwargs
4145
bootstrap = orig_cfg.get("bootstrap_servers") or (args[0] if args else None)
4246
if not bootstrap:
@@ -164,6 +168,11 @@ def init_patch(self, *args, **kwargs):
164168
if is_disabled():
165169
return orig_init(self, *args, **kwargs)
166170
orig_cfg = dict(kwargs)
171+
172+
# Normalize compression type: convert None to "none" string
173+
if "compression_type" in orig_cfg and orig_cfg["compression_type"] is None:
174+
orig_cfg["compression_type"] = "none"
175+
167176
bootstrap = orig_cfg.get("bootstrap_servers")
168177
if not bootstrap and args:
169178
bootstrap = args[0]
@@ -270,6 +279,11 @@ def init_patch(self, conf: Dict[str, Any], *args, **kwargs):
270279
if is_disabled():
271280
return orig_init(self, conf, *args, **kwargs)
272281
conf = dict(conf)
282+
283+
# Normalize compression type: convert None to "none" string
284+
if "compression.type" in conf and conf["compression.type"] is None:
285+
conf["compression.type"] = "none"
286+
273287
bootstrap = conf.get("bootstrap.servers")
274288
if not bootstrap:
275289
return orig_init(self, conf, *args, **kwargs)

superclient/agent/metadata.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,23 @@ def optimal_cfg(metadata: Optional[Dict[str, Any]], topics: list[str], orig: Dic
165165
cfg.setdefault(k, v)
166166

167167
if latency:
168-
cfg.pop("linger.ms", None)
168+
# For latency-sensitive applications, don't apply linger.ms optimization
169+
# Keep the original value if it exists, otherwise use a default
170+
if "linger.ms" in cfg:
171+
# Get the original linger.ms value if it exists
172+
orig_linger_key = None
173+
if lib_name == "kafka-python" or lib_name == "aiokafka":
174+
orig_linger_key = "linger_ms"
175+
elif lib_name == "confluent":
176+
orig_linger_key = "linger.ms"
177+
178+
if orig_linger_key and orig_linger_key in orig:
179+
# Use the original value instead of the optimized one
180+
cfg["linger.ms"] = orig[orig_linger_key]
181+
logger.debug("Using original linger.ms value ({}) for latency-sensitive application", orig[orig_linger_key])
182+
else:
183+
# Remove the optimized value but it will be added back as default later
184+
cfg.pop("linger.ms")
169185

170186
# Translate Java-style keys to library-specific keys for comparison
171187
java_keys_to_check = ["batch.size", "linger.ms"]
@@ -179,9 +195,17 @@ def optimal_cfg(metadata: Optional[Dict[str, Any]], topics: list[str], orig: Dic
179195
if lib_key in orig and java_key in cfg:
180196
try:
181197
if int(orig[lib_key]) > int(cfg[java_key]):
198+
logger.debug("Keeping original {} value ({}) as it's larger than optimized ({})", java_key, orig[lib_key], cfg[java_key])
182199
cfg[java_key] = orig[lib_key]
183200
except Exception:
184201
pass
185202

203+
# Ensure all core optimization parameters are present in the final config
204+
# But don't add linger.ms back if it was removed for latency sensitivity
205+
for k, v in _DEFAULTS.items():
206+
if k not in cfg:
207+
if not (latency and k == "linger.ms"):
208+
cfg[k] = v
209+
186210
# Translate Java-style keys to library-specific keys
187211
return translate_java_to_lib(cfg, lib_name), warning_msg

0 commit comments

Comments
 (0)