Skip to content

Commit cf8452a

Browse files
fixes
1 parent 5d56c1b commit cf8452a

File tree

5 files changed

+37
-18
lines changed

5 files changed

+37
-18
lines changed

examples/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
kafka-python==2.0.2
1+
kafka-python==2.2.14
22
confluent-kafka==2.3.0
33
aiokafka==0.10.0
44
aws-msk-iam-sasl-signer-python==1.0.2

superclient/agent/metadata.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def _create_consumer_kafka_python(bootstrap: str, base_cfg: Dict[str, Any]):
2424
"enable_auto_commit": False,
2525
"auto_offset_reset": "earliest",
2626
}
27-
copy_client_configuration_properties(base_cfg, consumer_cfg)
27+
copy_client_configuration_properties(base_cfg, consumer_cfg, "kafka-python")
2828
return kafka.KafkaConsumer(**consumer_cfg)
2929

3030
def _create_consumer_aiokafka(bootstrap: str, base_cfg: Dict[str, Any]):
@@ -36,7 +36,7 @@ def _create_consumer_aiokafka(bootstrap: str, base_cfg: Dict[str, Any]):
3636
"enable_auto_commit": False,
3737
"auto_offset_reset": "earliest",
3838
}
39-
copy_client_configuration_properties(base_cfg, consumer_cfg)
39+
copy_client_configuration_properties(base_cfg, consumer_cfg, "aiokafka")
4040
return AIOKafkaConsumer(**consumer_cfg)
4141

4242
def _create_consumer_confluent(bootstrap: str, base_cfg: Dict[str, Any]):
@@ -48,7 +48,7 @@ def _create_consumer_confluent(bootstrap: str, base_cfg: Dict[str, Any]):
4848
"enable.auto.commit": False,
4949
"auto.offset.reset": "earliest",
5050
}
51-
copy_client_configuration_properties(base_cfg, consumer_cfg)
51+
copy_client_configuration_properties(base_cfg, consumer_cfg, "confluent")
5252
return Consumer(consumer_cfg)
5353

5454
_CONSUMER_BUILDERS = {
@@ -108,14 +108,18 @@ def fetch_metadata(
108108

109109
tp = _kafka.TopicPartition(topic, 0)
110110
consumer.assign([tp])
111-
consumer.seek_to_end(tp)
112-
end = consumer.position(tp)
111+
112+
# Get the end offset safely
113+
end_offsets = consumer.end_offsets([tp])
114+
end = end_offsets.get(tp, 0)
115+
113116
if end == 0:
114117
logger.error(
115118
"[ERR-202] Unable to retrieve optimizations data from Superstream – topic empty."
116119
)
117120
consumer.close()
118121
return None
122+
119123
consumer.seek(tp, end - 1)
120124
recs = consumer.poll(timeout_ms=5000)
121125
consumer.close()

superclient/core/manager.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ def fetch_metadata_kafka_python(bootstrap: str, cfg: Dict[str, Any]) -> Optional
6060
"enable_auto_commit": False,
6161
"group_id": "superstream-metadata-fetcher",
6262
}
63-
copy_client_configuration_properties(cfg, kafka_cfg)
63+
copy_client_configuration_properties(cfg, kafka_cfg, "kafka-python")
6464

6565
# Create consumer and fetch message
6666
consumer = KafkaConsumer(topic, **kafka_cfg)
@@ -102,7 +102,7 @@ def fetch_metadata_confluent(bootstrap: str, cfg: Dict[str, Any]) -> Optional[Me
102102
"enable.auto.commit": False,
103103
"group.id": "superstream-metadata-fetcher",
104104
}
105-
copy_client_configuration_properties(cfg, kafka_cfg)
105+
copy_client_configuration_properties(cfg, kafka_cfg, "confluent")
106106

107107
# Create consumer and fetch message
108108
consumer = Consumer(kafka_cfg)
@@ -139,7 +139,7 @@ async def fetch_metadata_aiokafka(bootstrap: str, cfg: Dict[str, Any]) -> Option
139139
"enable_auto_commit": False,
140140
"group_id": "superstream-metadata-fetcher",
141141
}
142-
copy_client_configuration_properties(cfg, kafka_cfg)
142+
copy_client_configuration_properties(cfg, kafka_cfg, "aiokafka")
143143

144144
# Create consumer and fetch message
145145
consumer = AIOKafkaConsumer(topic, **kafka_cfg)

superclient/core/reporter.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def _create_producer_kafka_python(bootstrap: str, base_cfg: Dict[str, Any]):
2929
"batch.size": 16_384,
3030
"linger.ms": 1000,
3131
}
32-
copy_client_configuration_properties(base_cfg, cfg)
32+
copy_client_configuration_properties(base_cfg, cfg, "kafka-python")
3333
kafka_cfg = {k.replace(".", "_"): v for k, v in cfg.items()}
3434
return kafka.KafkaProducer(**kafka_cfg)
3535

@@ -44,7 +44,7 @@ def _create_producer_confluent(bootstrap: str, base_cfg: Dict[str, Any]):
4444
"batch.size": 16384,
4545
"linger.ms": 1000,
4646
}
47-
copy_client_configuration_properties(base_cfg, cfg)
47+
copy_client_configuration_properties(base_cfg, cfg, "confluent")
4848
return _CProducer(cfg)
4949

5050

@@ -58,7 +58,7 @@ async def _create_producer_aiokafka(bootstrap: str, base_cfg: Dict[str, Any]):
5858
"batch_size": 16_384,
5959
"linger_ms": 1000,
6060
}
61-
copy_client_configuration_properties(base_cfg, cfg)
61+
copy_client_configuration_properties(base_cfg, cfg, "aiokafka")
6262
return AIOKafkaProducer(**cfg)
6363

6464

superclient/util/config.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,18 @@ def _serialize_config_value(v: Any) -> Any:
5252
else:
5353
return v
5454

55-
def copy_client_configuration_properties(src: Dict[str, Any], dst: Dict[str, Any]):
55+
def copy_client_configuration_properties(src: Dict[str, Any], dst: Dict[str, Any], lib_name: str = "confluent"):
5656
"""Copy essential client configuration properties from source to destination.
5757
This ensures internal Kafka clients have the same security, network, and connection
5858
configurations as the user's Kafka clients.
5959
Only copies properties that are explicitly set in the source configuration.
60+
61+
Args:
62+
src: Source configuration (in library-specific syntax)
63+
dst: Destination configuration (in library-specific syntax)
64+
lib_name: Library name for key translation
6065
"""
61-
# List of all possible auth/network related configs
66+
# List of all possible auth/network related configs in Java-style syntax
6267
possible_keys = [
6368
# Security protocol
6469
"security.protocol",
@@ -108,10 +113,20 @@ def copy_client_configuration_properties(src: Dict[str, Any], dst: Dict[str, Any
108113
"retries"
109114
]
110115

111-
# Only copy properties that are explicitly set in the source configuration
112-
for k in possible_keys:
113-
if k in src and k not in dst:
114-
dst[k] = src[k]
116+
# For each Java-style key, find if it exists in the source config
117+
# by checking both the Java-style key and its library-specific equivalent
118+
for java_key in possible_keys:
119+
# Check if the Java-style key exists in source
120+
if java_key in src:
121+
if java_key not in dst:
122+
dst[java_key] = src[java_key]
123+
continue
124+
125+
# Check if the library-specific equivalent exists in source
126+
if lib_name in _JAVA_TO_LIB_MAPPING:
127+
lib_key = _JAVA_TO_LIB_MAPPING[lib_name].get(java_key, java_key)
128+
if lib_key in src and lib_key not in dst:
129+
dst[lib_key] = src[lib_key]
115130

116131
# ---------------------------------------------------------------------------
117132
# Field name mapping between Java-style and library-specific representations

0 commit comments

Comments
 (0)