Skip to content

Commit c057d94

Browse files
fixes
1 parent 209b36d commit c057d94

File tree

3 files changed

+161
-77
lines changed

3 files changed

+161
-77
lines changed

superclient/agent/metadata.py

Lines changed: 142 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -57,21 +57,63 @@ def _create_consumer_confluent(bootstrap: str, base_cfg: Dict[str, Any]):
5757
"confluent": _create_consumer_confluent,
5858
}
5959

60-
async def fetch_metadata(
60+
def fetch_metadata_kafka_python(
6161
bootstrap: str,
6262
cfg: Dict[str, Any],
63-
lib_name: Literal["kafka-python", "aiokafka", "confluent"],
6463
) -> Optional[Dict[str, Any]]:
65-
"""Fetch the latest optimization metadata from the Superstream internal topic.
64+
"""Fetch metadata using kafka-python library."""
65+
66+
builder = _CONSUMER_BUILDERS.get("kafka-python")
67+
if builder is None:
68+
logger.error("[ERR-204] Unsupported Kafka library: kafka-python")
69+
return None
6670

67-
The consumer is created using the *same* Kafka library that the application
68-
itself employs (`lib_name`). This guarantees compatibility with user
69-
dependencies and avoids version-related conflicts.
70-
"""
71+
topic = "superstream.metadata_v1"
72+
try:
73+
consumer = builder(bootstrap, cfg)
74+
75+
if not consumer.partitions_for_topic(topic):
76+
logger.error(
77+
"[ERR-201] Superstream internal topic is missing. Please ensure permissions for superstream.* topics."
78+
)
79+
consumer.close()
80+
return None
81+
82+
import kafka as _kafka # type: ignore
7183

72-
builder = _CONSUMER_BUILDERS.get(lib_name)
84+
tp = _kafka.TopicPartition(topic, 0)
85+
consumer.assign([tp])
86+
87+
# Get the end offset safely
88+
end_offsets = consumer.end_offsets([tp])
89+
end = end_offsets.get(tp, 0)
90+
91+
if end == 0:
92+
logger.error(
93+
"[ERR-202] Unable to retrieve optimizations data from Superstream – topic empty."
94+
)
95+
consumer.close()
96+
return None
97+
98+
consumer.seek(tp, end - 1)
99+
recs = consumer.poll(timeout_ms=5000)
100+
consumer.close()
101+
for batch in recs.values():
102+
for rec in batch:
103+
return json.loads(rec.value.decode())
104+
except Exception as exc:
105+
logger.error("[ERR-203] Failed to fetch metadata: {}", exc)
106+
return None
107+
108+
def fetch_metadata_confluent(
109+
bootstrap: str,
110+
cfg: Dict[str, Any],
111+
) -> Optional[Dict[str, Any]]:
112+
"""Fetch metadata using confluent-kafka library."""
113+
114+
builder = _CONSUMER_BUILDERS.get("confluent")
73115
if builder is None:
74-
logger.error("[ERR-204] Unsupported Kafka library: {}", lib_name)
116+
logger.error("[ERR-204] Unsupported Kafka library: confluent")
75117
return None
76118

77119
topic = "superstream.metadata_v1"
@@ -85,74 +127,84 @@ async def fetch_metadata(
85127
consumer.close()
86128
return None
87129

88-
if lib_name == "confluent":
89-
from confluent_kafka import TopicPartition # type: ignore
90-
91-
tp = TopicPartition(topic, 0)
92-
consumer.assign([tp])
93-
low, high = consumer.get_watermark_offsets(tp, timeout=5.0)
94-
if high == 0:
95-
logger.error(
96-
"[ERR-202] Unable to retrieve optimizations data from Superstream – topic empty."
97-
)
98-
consumer.close()
99-
return None
100-
consumer.seek(TopicPartition(topic, 0, high - 1))
101-
msg = consumer.poll(timeout=5.0)
130+
from confluent_kafka import TopicPartition # type: ignore
131+
132+
tp = TopicPartition(topic, 0)
133+
consumer.assign([tp])
134+
low, high = consumer.get_watermark_offsets(tp, timeout=5.0)
135+
if high == 0:
136+
logger.error(
137+
"[ERR-202] Unable to retrieve optimizations data from Superstream – topic empty."
138+
)
102139
consumer.close()
103-
if msg and msg.value():
104-
return json.loads(msg.value().decode())
140+
return None
141+
consumer.seek(TopicPartition(topic, 0, high - 1))
142+
msg = consumer.poll(timeout=5.0)
143+
consumer.close()
144+
if msg and msg.value():
145+
return json.loads(msg.value().decode())
146+
except Exception as exc:
147+
logger.error("[ERR-203] Failed to fetch metadata: {}", exc)
148+
return None
105149

106-
elif lib_name == "kafka-python":
107-
import kafka as _kafka # type: ignore
150+
async def fetch_metadata_aiokafka(
151+
bootstrap: str,
152+
cfg: Dict[str, Any],
153+
) -> Optional[Dict[str, Any]]:
154+
"""Fetch metadata using aiokafka library."""
155+
156+
builder = _CONSUMER_BUILDERS.get("aiokafka")
157+
if builder is None:
158+
logger.error("[ERR-204] Unsupported Kafka library: aiokafka")
159+
return None
108160

109-
tp = _kafka.TopicPartition(topic, 0)
110-
consumer.assign([tp])
111-
112-
# Get the end offset safely
113-
end_offsets = consumer.end_offsets([tp])
114-
end = end_offsets.get(tp, 0)
115-
116-
if end == 0:
117-
logger.error(
118-
"[ERR-202] Unable to retrieve optimizations data from Superstream – topic empty."
119-
)
120-
consumer.close()
121-
return None
122-
123-
consumer.seek(tp, end - 1)
124-
recs = consumer.poll(timeout_ms=5000)
125-
consumer.close()
126-
for batch in recs.values():
127-
for rec in batch:
128-
return json.loads(rec.value.decode())
161+
topic = "superstream.metadata_v1"
162+
consumer = None
163+
try:
164+
consumer = builder(bootstrap, cfg)
165+
166+
# Start the consumer first
167+
await consumer.start()
168+
169+
# For aiokafka, partitions_for_topic returns a set, not a coroutine
170+
partitions = consumer.partitions_for_topic(topic)
171+
if not partitions:
172+
logger.error(
173+
"[ERR-201] Superstream internal topic is missing. Please ensure permissions for superstream.* topics."
174+
)
175+
await consumer.stop()
176+
return None
129177

130-
elif lib_name == "aiokafka":
131-
# aiokafka uses its own TopicPartition and async API
132-
from aiokafka import TopicPartition # type: ignore
178+
# aiokafka uses its own TopicPartition and async API
179+
from aiokafka import TopicPartition # type: ignore
133180

134-
tp = TopicPartition(topic, 0)
135-
consumer.assign([tp])
136-
137-
# Get the end offset safely using aiokafka's API
138-
end_offsets = await consumer.end_offsets([tp])
139-
end = end_offsets.get(tp, 0)
181+
tp = TopicPartition(topic, 0)
182+
consumer.assign([tp])
183+
184+
# Get the end offset safely using aiokafka's API
185+
end_offsets = await consumer.end_offsets([tp])
186+
end = end_offsets.get(tp, 0)
187+
188+
if end == 0:
189+
logger.error(
190+
"[ERR-202] Unable to retrieve optimizations data from Superstream – topic empty."
191+
)
192+
await consumer.stop()
193+
return None
140194

141-
if end == 0:
142-
logger.error(
143-
"[ERR-202] Unable to retrieve optimizations data from Superstream – topic empty."
144-
)
145-
consumer.close()
146-
return None
147-
148-
consumer.seek(tp, end - 1)
149-
recs = await consumer.getmany(timeout_ms=5000)
150-
consumer.close()
151-
for batch in recs.values():
152-
for rec in batch:
153-
return json.loads(rec.value.decode())
195+
consumer.seek(tp, end - 1)
196+
recs = await consumer.getmany(timeout_ms=5000)
197+
await consumer.stop()
198+
for batch in recs.values():
199+
for rec in batch:
200+
return json.loads(rec.value.decode())
154201
except Exception as exc:
155202
logger.error("[ERR-203] Failed to fetch metadata: {}", exc)
203+
if consumer:
204+
try:
205+
await consumer.stop()
206+
except:
207+
pass
156208
return None
157209

158210
def fetch_metadata_sync(
@@ -163,8 +215,27 @@ def fetch_metadata_sync(
163215
"""Synchronous wrapper for fetch_metadata."""
164216
import asyncio
165217

166-
# Run the async function synchronously for all libraries
167-
return asyncio.run(fetch_metadata(bootstrap, cfg, lib_name))
218+
if lib_name == "kafka-python":
219+
return fetch_metadata_kafka_python(bootstrap, cfg)
220+
elif lib_name == "confluent":
221+
return fetch_metadata_confluent(bootstrap, cfg)
222+
elif lib_name == "aiokafka":
223+
# For aiokafka, we need to handle the async case
224+
try:
225+
# Try to get the current event loop
226+
loop = asyncio.get_running_loop()
227+
# If we get here, there's already a running event loop
228+
# We need to create a new event loop in a separate thread
229+
import concurrent.futures
230+
with concurrent.futures.ThreadPoolExecutor() as executor:
231+
future = executor.submit(asyncio.run, fetch_metadata_aiokafka(bootstrap, cfg))
232+
return future.result()
233+
except RuntimeError:
234+
# No event loop is running, we can use asyncio.run()
235+
return asyncio.run(fetch_metadata_aiokafka(bootstrap, cfg))
236+
else:
237+
logger.error("[ERR-204] Unsupported Kafka library: {}", lib_name)
238+
return None
168239

169240
def optimal_cfg(metadata: Optional[Dict[str, Any]], topics: list[str], orig: Dict[str, Any], lib_name: str) -> tuple[Dict[str, Any], str]:
170241
"""Compute optimal configuration based on metadata and topics.

superclient/core/reporter.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ async def _create_producer_aiokafka(bootstrap: str, base_cfg: Dict[str, Any]):
5555
"bootstrap_servers": bootstrap,
5656
"client_id": _SUPERLIB_PREFIX + "client-reporter",
5757
"compression_type": "zstd",
58-
"batch_size": 16_384,
58+
"max_batch_size": 16_384,
5959
"linger_ms": 1000,
6060
}
6161
copy_client_configuration_properties(base_cfg, cfg, "aiokafka")
@@ -80,7 +80,19 @@ def internal_send_clients(bootstrap: str, base_cfg: Dict[str, Any], payload: byt
8080
try:
8181
# Handle aiokafka (async library)
8282
if lib_name == "aiokafka":
83-
asyncio.run(internal_send_clients_async(bootstrap, base_cfg, payload))
83+
# Handle the case where an event loop is already running
84+
try:
85+
# Try to get the current event loop
86+
loop = asyncio.get_running_loop()
87+
# If we get here, there's already a running event loop
88+
# We need to create a new event loop in a separate thread
89+
import concurrent.futures
90+
with concurrent.futures.ThreadPoolExecutor() as executor:
91+
future = executor.submit(asyncio.run, internal_send_clients_async(bootstrap, base_cfg, payload))
92+
future.result()
93+
except RuntimeError:
94+
# No event loop is running, we can use asyncio.run()
95+
asyncio.run(internal_send_clients_async(bootstrap, base_cfg, payload))
8496
return
8597

8698
# Handle kafka-python (sync library)
@@ -98,8 +110,8 @@ def internal_send_clients(bootstrap: str, base_cfg: Dict[str, Any], payload: byt
98110
prod.flush()
99111
return
100112

101-
except Exception:
102-
logger.debug("Failed to send clients message via {}", lib_name)
113+
except Exception as e:
114+
logger.error("Failed to send clients message via {}: {}", lib_name, e)
103115

104116

105117
async def internal_send_clients_async(bootstrap: str, base_cfg: Dict[str, Any], payload: bytes) -> None:
@@ -111,8 +123,8 @@ async def internal_send_clients_async(bootstrap: str, base_cfg: Dict[str, Any],
111123
await prod.send_and_wait("superstream.clients", payload)
112124
finally:
113125
await prod.stop()
114-
except Exception:
115-
logger.debug("Failed to send clients message via aiokafka")
126+
except Exception as e:
127+
logger.error("Failed to send clients message via aiokafka: {}", e)
116128

117129

118130
def send_clients_msg(tracker: Any, error: str = "") -> None:

superclient/util/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,7 @@ def copy_client_configuration_properties(src: Dict[str, Any], dst: Dict[str, Any
229229
"enable.idempotence": "enable_idempotence",
230230
"security.protocol": "security_protocol",
231231
"sasl.mechanism": "sasl_mechanism",
232+
"ssl.context": "ssl_context",
232233
},
233234
"confluent": {
234235
# Confluent uses Java-style names directly, so most mappings are 1:1

0 commit comments

Comments
 (0)