Skip to content

Commit 91a1a66

Browse files
committed
Add pass-through Kafka producer config via --kafka-config
- Add --kafka-config for inline JSON config - Add --kafka-config-file for JSON file config - KafkaLoader passes extra config to KafkaProducer - Supports all kafka-python options (compression, SSL, SASL, etc.)
1 parent ffa520e commit 91a1a66

File tree

2 files changed

+46
-17
lines changed

2 files changed

+46
-17
lines changed

apps/kafka_streaming_loader.py

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
"""Stream data to Kafka with resume watermark support."""
33

44
import argparse
5+
import json
56
import logging
67
import os
78
import time
@@ -82,6 +83,7 @@ def main(
8283
auth_token: str = None,
8384
max_retries: int = 5,
8485
retry_delay: float = 1.0,
86+
kafka_config: dict = None,
8587
):
8688
def connect():
8789
return Client(amp_server, auth=auth, auth_token=auth_token)
@@ -98,15 +100,14 @@ def connect():
98100
else:
99101
label_config = None
100102

101-
client.configure_connection(
102-
'kafka',
103-
'kafka',
104-
{
105-
'bootstrap_servers': kafka_brokers,
106-
'client_id': 'amp-kafka-loader',
107-
'state': {'enabled': True, 'storage': 'lmdb', 'data_dir': state_dir},
108-
},
109-
)
103+
connection_config = {
104+
'bootstrap_servers': kafka_brokers,
105+
'client_id': 'amp-kafka-loader',
106+
'state': {'enabled': True, 'storage': 'lmdb', 'data_dir': state_dir},
107+
}
108+
if kafka_config:
109+
connection_config.update(kafka_config)
110+
client.configure_connection('kafka', 'kafka', connection_config)
110111

111112
with open(query_file) as f:
112113
query = f.read()
@@ -154,13 +155,32 @@ def stream_batches():
154155
parser.add_argument('--auth-token', help='Explicit auth token (works independently, does not require --auth)')
155156
parser.add_argument('--max-retries', type=int, default=5, help='Max retries for connection failures (default: 5)')
156157
parser.add_argument('--retry-delay', type=float, default=1.0, help='Initial retry delay in seconds (default: 1.0)')
158+
parser.add_argument(
159+
'--kafka-config',
160+
type=str,
161+
help='Extra Kafka producer config as JSON. Uses kafka-python naming (underscores). '
162+
'Example: \'{"compression_type": "lz4", "linger_ms": 5}\'. '
163+
'See: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html',
164+
)
165+
parser.add_argument(
166+
'--kafka-config-file',
167+
type=Path,
168+
help='Path to JSON file with extra Kafka producer config',
169+
)
157170
parser.add_argument('--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'])
158171
args = parser.parse_args()
159172

160173
logging.basicConfig(level=logging.WARNING, format='%(asctime)s [%(name)s] %(levelname)s: %(message)s')
161174
log_level = getattr(logging, args.log_level) if args.log_level else logging.INFO
162175
logging.getLogger('amp').setLevel(log_level)
163176

177+
kafka_config = {}
178+
if args.kafka_config_file:
179+
kafka_config = json.loads(args.kafka_config_file.read_text())
180+
logger.info(f'Loaded Kafka config from {args.kafka_config_file}')
181+
if args.kafka_config:
182+
kafka_config.update(json.loads(args.kafka_config))
183+
164184
try:
165185
main(
166186
amp_server=args.amp_server,
@@ -176,6 +196,7 @@ def stream_batches():
176196
auth_token=args.auth_token,
177197
max_retries=args.max_retries,
178198
retry_delay=args.retry_delay,
199+
kafka_config=kafka_config or None,
179200
)
180201
except KeyboardInterrupt:
181202
logger.info('Stopped by user')

src/amp/loaders/implementations/kafka_loader.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@ class KafkaConfig:
1616
client_id: str = 'amp-kafka-loader'
1717
key_field: Optional[str] = 'id'
1818

19-
def __post_init__(self):
20-
pass
19+
20+
KAFKA_CONFIG_FIELDS = {'bootstrap_servers', 'client_id', 'key_field'}
21+
RESERVED_CONFIG_FIELDS = {'resilience', 'state', 'checkpoint', 'idempotency'}
2122

2223

2324
class KafkaLoader(DataLoader[KafkaConfig]):
@@ -26,6 +27,9 @@ class KafkaLoader(DataLoader[KafkaConfig]):
2627
SUPPORTS_TRANSACTIONS = True
2728

2829
def __init__(self, config: Dict[str, Any], label_manager=None) -> None:
30+
self._extra_producer_config = {
31+
k: v for k, v in config.items() if k not in KAFKA_CONFIG_FIELDS and k not in RESERVED_CONFIG_FIELDS
32+
}
2933
super().__init__(config, label_manager)
3034
self._producer = None
3135

@@ -34,12 +38,16 @@ def _get_required_config_fields(self) -> list[str]:
3438

3539
def connect(self) -> None:
3640
try:
37-
self._producer = KafkaProducer(
38-
bootstrap_servers=self.config.bootstrap_servers,
39-
client_id=self.config.client_id,
40-
value_serializer=lambda x: json.dumps(x, default=str).encode('utf-8'),
41-
transactional_id=f'{self.config.client_id}-txn',
42-
)
41+
producer_config = {
42+
'bootstrap_servers': self.config.bootstrap_servers,
43+
'client_id': self.config.client_id,
44+
'value_serializer': lambda x: json.dumps(x, default=str).encode('utf-8'),
45+
'transactional_id': f'{self.config.client_id}-txn',
46+
**self._extra_producer_config,
47+
}
48+
if self._extra_producer_config:
49+
self.logger.info(f'Extra Kafka config: {list(self._extra_producer_config.keys())}')
50+
self._producer = KafkaProducer(**producer_config)
4351

4452
self._producer.init_transactions()
4553

0 commit comments

Comments
 (0)