Skip to content

Commit 209b36d

Browse files
fixes
1 parent d806003 commit 209b36d

File tree

3 files changed

+72
-7
lines changed

3 files changed

+72
-7
lines changed

superclient/agent/interceptor.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from ..util.logger import get_logger
77
from ..util.config import get_topics_list, is_disabled
8-
from .metadata import fetch_metadata, optimal_cfg, _DEFAULTS
8+
from .metadata import fetch_metadata_sync, optimal_cfg, _DEFAULTS
99
from ..core.reporter import send_clients_msg
1010
from ..core.manager import normalize_bootstrap
1111
from .tracker import ProducerTracker, Heartbeat
@@ -57,7 +57,7 @@ def init_patch(self, *args, **kwargs):
5757
try:
5858
# Get topics and metadata for optimization
5959
topics_env = get_topics_list()
60-
metadata = fetch_metadata(bootstrap, orig_cfg, "kafka-python")
60+
metadata = fetch_metadata_sync(bootstrap, orig_cfg, "kafka-python")
6161

6262
# Check if Superstream is active for this cluster
6363
error_msg = ""
@@ -185,7 +185,7 @@ def init_patch(self, *args, **kwargs):
185185

186186
try:
187187
topics_env = get_topics_list()
188-
metadata = fetch_metadata(bootstrap, orig_cfg, "aiokafka")
188+
metadata = fetch_metadata_sync(bootstrap, orig_cfg, "aiokafka")
189189
error_msg = ""
190190
if metadata is None:
191191
error_msg = "[ERR-304] Failed to fetch metadata for producer with client id {}: Unable to connect to Superstream service".format(client_id)
@@ -294,7 +294,7 @@ def init_patch(self, conf: Dict[str, Any], *args, **kwargs):
294294

295295
try:
296296
topics_env = get_topics_list()
297-
metadata = fetch_metadata(bootstrap, conf, "confluent")
297+
metadata = fetch_metadata_sync(bootstrap, conf, "confluent")
298298
error_msg = ""
299299
if metadata is None:
300300
error_msg = "[ERR-304] Failed to fetch metadata for producer with client id {}: Unable to connect to Superstream service".format(client_id)

superclient/agent/metadata.py

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def _create_consumer_confluent(bootstrap: str, base_cfg: Dict[str, Any]):
5757
"confluent": _create_consumer_confluent,
5858
}
5959

60-
def fetch_metadata(
60+
async def fetch_metadata(
6161
bootstrap: str,
6262
cfg: Dict[str, Any],
6363
lib_name: Literal["kafka-python", "aiokafka", "confluent"],
@@ -103,7 +103,7 @@ def fetch_metadata(
103103
if msg and msg.value():
104104
return json.loads(msg.value().decode())
105105

106-
else: # kafka-python / aiokafka share similar API
106+
elif lib_name == "kafka-python":
107107
import kafka as _kafka # type: ignore
108108

109109
tp = _kafka.TopicPartition(topic, 0)
@@ -126,10 +126,46 @@ def fetch_metadata(
126126
for batch in recs.values():
127127
for rec in batch:
128128
return json.loads(rec.value.decode())
129+
130+
elif lib_name == "aiokafka":
131+
# aiokafka uses its own TopicPartition and async API
132+
from aiokafka import TopicPartition # type: ignore
133+
134+
tp = TopicPartition(topic, 0)
135+
consumer.assign([tp])
136+
137+
# Get the end offset safely using aiokafka's API
138+
end_offsets = await consumer.end_offsets([tp])
139+
end = end_offsets.get(tp, 0)
140+
141+
if end == 0:
142+
logger.error(
143+
"[ERR-202] Unable to retrieve optimizations data from Superstream – topic empty."
144+
)
145+
consumer.close()
146+
return None
147+
148+
consumer.seek(tp, end - 1)
149+
recs = await consumer.getmany(timeout_ms=5000)
150+
consumer.close()
151+
for batch in recs.values():
152+
for rec in batch:
153+
return json.loads(rec.value.decode())
129154
except Exception as exc:
130155
logger.error("[ERR-203] Failed to fetch metadata: {}", exc)
131156
return None
132157

158+
def fetch_metadata_sync(
159+
bootstrap: str,
160+
cfg: Dict[str, Any],
161+
lib_name: Literal["kafka-python", "aiokafka", "confluent"],
162+
) -> Optional[Dict[str, Any]]:
163+
"""Synchronous wrapper for fetch_metadata."""
164+
import asyncio
165+
166+
# Run the async function synchronously for all libraries
167+
return asyncio.run(fetch_metadata(bootstrap, cfg, lib_name))
168+
133169
def optimal_cfg(metadata: Optional[Dict[str, Any]], topics: list[str], orig: Dict[str, Any], lib_name: str) -> tuple[Dict[str, Any], str]:
134170
"""Compute optimal configuration based on metadata and topics.
135171

superclient/util/config.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,31 @@ def get_topics_list() -> List[str]:
3131
def mask_sensitive(k: str, v: Any) -> Any:
3232
"""Mask sensitive configuration values."""
3333
sensitive_patterns = [
34+
# Existing patterns
3435
"password", "sasl.jaas.config", "basic.auth.user.info",
3536
"ssl.key", "ssl.cert", "ssl.truststore", "ssl.keystore",
36-
"sasl.kerberos.keytab", "sasl.kerberos.principal"
37+
"sasl.kerberos.keytab", "sasl.kerberos.principal",
38+
39+
# Additional SSL properties
40+
"ssl.cafile", "ssl.certfile", "ssl.keyfile",
41+
"ssl.certificate.location", "ssl.certificate.pem",
42+
"ssl.ca.location", "ssl.ca.pem",
43+
"ssl.ca.certificate.stores", "ssl.crl.location",
44+
"ssl.providers", "ssl.context",
45+
46+
# Additional SASL properties
47+
"sasl.username", "sasl.password",
48+
"sasl.plain.username", "sasl.plain.password",
49+
"sasl.oauthbearer.config", "sasl.oauthbearer.client.secret",
50+
"sasl.oauthbearer.extensions",
51+
52+
# OAuth callback configurations
53+
"sasl.oauth.token.provider", "oauth_cb", "sasl_oauth_token_provider",
54+
55+
# Library-specific variations
56+
"sasl_plain_username", "sasl_plain_password",
57+
"ssl_cafile", "ssl_certfile", "ssl_keyfile",
58+
"sasl_oauth_token_provider"
3759
]
3860
return "[MASKED]" if any(pattern in k.lower() for pattern in sensitive_patterns) else v
3961

@@ -129,6 +151,13 @@ def copy_client_configuration_properties(src: Dict[str, Any], dst: Dict[str, Any
129151
lib_key = _JAVA_TO_LIB_MAPPING[lib_name].get(java_key, java_key)
130152
if lib_key in src and lib_key not in dst:
131153
dst[lib_key] = src[lib_key]
154+
155+
# Debug log to show config comparison
156+
# Mask sensitive data before logging
157+
src_masked = {k: mask_sensitive(k, v) for k, v in src.items()}
158+
dst_masked = {k: mask_sensitive(k, v) for k, v in dst.items()}
159+
160+
logger.debug("copy_client_configuration_properties - Source config: {}, Destination config: {}", src_masked, dst_masked)
132161

133162
# ---------------------------------------------------------------------------
134163
# Field name mapping between Java-style and library-specific representations

0 commit comments

Comments
 (0)