Skip to content

Commit 05c2436

Browse files
author
Clemens Vasters
committed
Allow non-SSL Kafka connections across all projects
- Make SASL/SSL conditional in all 15 projects: only applied when credentials are provided. Plain Kafka works without any SASL config. - Update parse_connection_string in all projects to support BootstrapServer=host:port for plain Kafka and conditionally add SASL only when SharedAccessKeyName is present. - Fix NOAA null portscode bug: sanitize API data before marshmallow load. - Expand Docker E2E Kafka flow tests from 3 to 14 projects. - Update CI workflow matrix to cover all testable projects.
1 parent 0928759 commit 05c2436

File tree

17 files changed

+488
-190
lines changed

17 files changed

+488
-190
lines changed

.github/workflows/test-docker-e2e.yml

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,20 @@ jobs:
6565
fail-fast: false
6666
matrix:
6767
project:
68-
- { dir: chmi-hydro, topic: test-chmi-hydro, test_class: TestCHMIHydroDockerFlow }
69-
- { dir: imgw-hydro, topic: test-imgw-hydro, test_class: TestIMGWHydroDockerFlow }
70-
- { dir: smhi-hydro, topic: test-smhi-hydro, test_class: TestSMHIHydroDockerFlow }
68+
- { dir: chmi-hydro, test_class: TestCHMIHydroDockerFlow }
69+
- { dir: hubeau-hydrometrie, test_class: TestHubeauDockerFlow }
70+
- { dir: imgw-hydro, test_class: TestIMGWHydroDockerFlow }
71+
- { dir: noaa, test_class: TestNOAADockerFlow }
72+
- { dir: noaa-goes, test_class: TestNOAAGoesDockerFlow }
73+
- { dir: noaa-ndbc, test_class: TestNOAANdbcDockerFlow }
74+
- { dir: noaa-nws, test_class: TestNOAANwsDockerFlow }
75+
- { dir: pegelonline, test_class: TestPegelonlineDockerFlow }
76+
- { dir: rws-waterwebservices, test_class: TestRWSDockerFlow }
77+
- { dir: smhi-hydro, test_class: TestSMHIHydroDockerFlow }
78+
- { dir: uk-ea-flood-monitoring, test_class: TestUKEADockerFlow }
79+
- { dir: usgs-earthquakes, test_class: TestUSGSEarthquakesDockerFlow }
80+
- { dir: usgs-iv, test_class: TestUSGSIVDockerFlow }
81+
- { dir: waterinfo-vmm, test_class: TestWaterinfoVMMDockerFlow }
7182
steps:
7283
- name: Checkout repository
7384
uses: actions/checkout@v6

bluesky/bluesky/bluesky.py

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -115,21 +115,25 @@ def parse_connection_string(connection_string: str) -> Dict[str, str]:
115115
Returns:
116116
Dict with bootstrap.servers and kafka_topic
117117
"""
118-
config_dict = {
119-
'security.protocol': 'SASL_SSL',
120-
'sasl.mechanism': 'PLAIN',
121-
'sasl.username': '$ConnectionString',
122-
'sasl.password': connection_string.strip(),
123-
}
118+
config_dict = {}
124119
try:
125120
for part in connection_string.split(';'):
126121
if 'Endpoint' in part:
127122
config_dict['bootstrap.servers'] = part.split('=')[1].strip(
128123
'"').replace('sb://', '').replace('/', '') + ':9093'
129124
elif 'EntityPath' in part:
130125
config_dict['kafka_topic'] = part.split('=')[1].strip('"')
126+
elif 'SharedAccessKeyName' in part:
127+
config_dict['sasl.username'] = '$ConnectionString'
128+
elif 'SharedAccessKey' in part:
129+
config_dict['sasl.password'] = connection_string.strip()
130+
elif 'BootstrapServer' in part:
131+
config_dict['bootstrap.servers'] = part.split('=', 1)[1].strip()
131132
except IndexError as e:
132133
raise ValueError("Invalid connection string format") from e
134+
if 'sasl.username' in config_dict:
135+
config_dict['security.protocol'] = 'SASL_SSL'
136+
config_dict['sasl.mechanism'] = 'PLAIN'
133137
return config_dict
134138

135139
def send_cloudevent(self, event_type: str, source: str, subject: str, data: dict) -> None:
@@ -562,12 +566,17 @@ def main():
562566
# Build Kafka configuration
563567
kafka_config = {
564568
'bootstrap.servers': args.kafka_bootstrap_servers or os.getenv('KAFKA_BOOTSTRAP_SERVERS'),
565-
'security.protocol': 'SASL_SSL',
566-
'sasl.mechanism': 'PLAIN',
567-
'sasl.username': args.sasl_username or os.getenv('SASL_USERNAME'),
568-
'sasl.password': args.sasl_password or os.getenv('SASL_PASSWORD'),
569569
'client.id': 'bluesky-firehose-producer',
570570
}
571+
sasl_username = args.sasl_username or os.getenv('SASL_USERNAME')
572+
sasl_password = args.sasl_password or os.getenv('SASL_PASSWORD')
573+
if sasl_username and sasl_password:
574+
kafka_config.update({
575+
'security.protocol': 'SASL_SSL',
576+
'sasl.mechanism': 'PLAIN',
577+
'sasl.username': sasl_username,
578+
'sasl.password': sasl_password,
579+
})
571580

572581
kafka_topic = args.kafka_topic or os.getenv('KAFKA_TOPIC')
573582

gtfs/gtfs_rt_bridge/src/gtfs_rt_bridge/gtfs_cli.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1034,23 +1034,22 @@ async def feed_realtime_messages(agency_id: str, kafka_bootstrap_servers: str, k
10341034
raise ValueError("No Kafka bootstrap servers specified")
10351035
if not kafka_topic:
10361036
raise ValueError("No Kafka topic specified")
1037-
if not sasl_username:
1038-
raise ValueError("No SASL username specified")
1039-
if not sasl_password:
1040-
raise ValueError("No SASL password specified")
10411037

10421038
kafka_config = {
10431039
"bootstrap.servers": kafka_bootstrap_servers,
1044-
"sasl.mechanisms": "PLAIN",
1045-
"security.protocol": "SASL_SSL",
1046-
"sasl.username": sasl_username,
1047-
"sasl.password": sasl_password,
10481040
"acks": "all",
10491041
"linger.ms": 100,
10501042
"retries": 5,
10511043
"retry.backoff.ms": 1000,
10521044
"batch.size": (1024*1024)-512
10531045
}
1046+
if sasl_username and sasl_password:
1047+
kafka_config.update({
1048+
"sasl.mechanisms": "PLAIN",
1049+
"security.protocol": "SASL_SSL",
1050+
"sasl.username": sasl_username,
1051+
"sasl.password": sasl_password,
1052+
})
10541053
producer: Producer = Producer(kafka_config, logger=logger)
10551054
gtfs_rt_producer = GeneralTransitFeedRealTimeEventProducer(producer, kafka_topic,cloudevents_mode)
10561055
gtfs_static_producer = GeneralTransitFeedStaticEventProducer(producer, kafka_topic, cloudevents_mode)
@@ -1498,19 +1497,25 @@ def parse_connection_string(connection_string: str) -> Dict[str, str]:
14981497
Returns:
14991498
Dict[str, str]: Extracted connection parameters.
15001499
"""
1501-
config_dict = {
1502-
'sasl.username': '$ConnectionString',
1503-
'sasl.password': connection_string.strip(),
1504-
}
1500+
config_dict = {}
15051501
try:
15061502
for part in connection_string.split(';'):
15071503
if 'Endpoint' in part:
15081504
config_dict['bootstrap.servers'] = part.split('=')[1].strip(
15091505
'"').strip().replace('sb://', '').replace('/', '')+':9093'
15101506
elif 'EntityPath' in part:
15111507
config_dict['kafka_topic'] = part.split('=')[1].strip('"').strip()
1508+
elif 'SharedAccessKeyName' in part:
1509+
config_dict['sasl.username'] = '$ConnectionString'
1510+
elif 'SharedAccessKey' in part:
1511+
config_dict['sasl.password'] = connection_string.strip()
1512+
elif 'BootstrapServer' in part:
1513+
config_dict['bootstrap.servers'] = part.split('=', 1)[1].strip()
15121514
except IndexError as e:
15131515
raise ValueError("Invalid connection string format") from e
1516+
if 'sasl.username' in config_dict:
1517+
config_dict['security.protocol'] = 'SASL_SSL'
1518+
config_dict['sasl.mechanism'] = 'PLAIN'
15141519
return config_dict
15151520

15161521

hubeau-hydrometrie/hubeau_hydrometrie/hubeau_hydrometrie.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -105,19 +105,25 @@ def get_latest_observations(self) -> List[Dict[str, Any]]:
105105

106106
def parse_connection_string(self, connection_string: str) -> Dict[str, str]:
107107
"""Parse an Event Hubs connection string."""
108-
config_dict = {
109-
'sasl.username': '$ConnectionString',
110-
'sasl.password': connection_string.strip(),
111-
}
108+
config_dict = {}
112109
try:
113110
for part in connection_string.split(';'):
114111
if 'Endpoint' in part:
115112
config_dict['bootstrap.servers'] = part.split('=')[1].strip(
116113
'"').strip().replace('sb://', '').replace('/', '')+':9093'
117114
elif 'EntityPath' in part:
118115
config_dict['kafka_topic'] = part.split('=')[1].strip('"').strip()
116+
elif 'SharedAccessKeyName' in part:
117+
config_dict['sasl.username'] = '$ConnectionString'
118+
elif 'SharedAccessKey' in part:
119+
config_dict['sasl.password'] = connection_string.strip()
120+
elif 'BootstrapServer' in part:
121+
config_dict['bootstrap.servers'] = part.split('=', 1)[1].strip()
119122
except IndexError as e:
120123
raise ValueError("Invalid connection string format") from e
124+
if 'sasl.username' in config_dict:
125+
config_dict['security.protocol'] = 'SASL_SSL'
126+
config_dict['sasl.mechanism'] = 'PLAIN'
121127
return config_dict
122128

123129
def feed_stations(self, kafka_config: dict, kafka_topic: str, polling_interval: int, state_file: str = '') -> None:
@@ -275,17 +281,16 @@ def main() -> None:
275281
if not kafka_topic:
276282
print("Error: Kafka topic must be provided.")
277283
sys.exit(1)
278-
if not sasl_username or not sasl_password:
279-
print("Error: SASL username and password must be provided.")
280-
sys.exit(1)
281-
282284
kafka_config = {
283285
'bootstrap.servers': kafka_bootstrap_servers,
284-
'sasl.mechanisms': 'PLAIN',
285-
'security.protocol': 'SASL_SSL',
286-
'sasl.username': sasl_username,
287-
'sasl.password': sasl_password
288286
}
287+
if sasl_username and sasl_password:
288+
kafka_config.update({
289+
'sasl.mechanisms': 'PLAIN',
290+
'security.protocol': 'SASL_SSL',
291+
'sasl.username': sasl_username,
292+
'sasl.password': sasl_password
293+
})
289294
api.feed_stations(kafka_config, kafka_topic, args.polling_interval, args.state_file)
290295
else:
291296
parser.print_help()

mode-s/mode_s_kafka_bridge/mode_s.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -196,10 +196,7 @@ def parse_connection_string(connection_string: str) -> Dict[str, str]:
196196
"""
197197
Parse the connection string and extract bootstrap server, topic name, username, and password.
198198
"""
199-
config_dict = {
200-
'sasl.username': '$ConnectionString',
201-
'sasl.password': connection_string.strip(),
202-
}
199+
config_dict = {}
203200
try:
204201
for part in connection_string.split(';'):
205202
if 'Endpoint' in part:
@@ -210,9 +207,18 @@ def parse_connection_string(connection_string: str) -> Dict[str, str]:
210207
config_dict['bootstrap.servers'] = endpoint
211208
elif 'EntityPath' in part:
212209
config_dict['kafka_topic'] = part.split('=')[1]
210+
elif 'SharedAccessKeyName' in part:
211+
config_dict['sasl.username'] = '$ConnectionString'
212+
elif 'SharedAccessKey' in part:
213+
config_dict['sasl.password'] = connection_string.strip()
214+
elif 'BootstrapServer' in part:
215+
config_dict['bootstrap.servers'] = part.split('=', 1)[1].strip()
213216
except IndexError as e:
214217
logging.error("Connection string parsing error: %s", e)
215218
raise ValueError("Invalid connection string format") from e
219+
if 'sasl.username' in config_dict:
220+
config_dict['security.protocol'] = 'SASL_SSL'
221+
config_dict['sasl.mechanism'] = 'PLAIN'
216222
return config_dict
217223

218224
async def run():
@@ -294,19 +300,19 @@ async def run():
294300
if not kafka_topic:
295301
print("Error: No Kafka topic found.")
296302
return
297-
if not sasl_username or not sasl_password:
298-
print("Error: SASL username and password are required.")
299-
return
300303

301304
# Build Producer
302305
try:
303306
kafka_config = {
304307
'bootstrap.servers': kafka_bootstrap_servers,
305-
'sasl.mechanisms': 'PLAIN',
306-
'security.protocol': 'SASL_SSL',
307-
'sasl.username': sasl_username,
308-
'sasl.password': sasl_password
309308
}
309+
if sasl_username and sasl_password:
310+
kafka_config.update({
311+
'sasl.mechanisms': 'PLAIN',
312+
'security.protocol': 'SASL_SSL',
313+
'sasl.username': sasl_username,
314+
'sasl.password': sasl_password
315+
})
310316
kafka_producer = Producer(kafka_config)
311317
except Exception as producer_err:
312318
print("Error: Could not create Kafka producer.")

noaa-goes/noaa_goes/noaa_goes.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -241,19 +241,25 @@ def parse_connection_string(connection_string: str) -> Dict[str, str]:
241241
Returns:
242242
Dict with bootstrap.servers, kafka_topic, sasl.username, sasl.password.
243243
"""
244-
config_dict = {
245-
'sasl.username': '$ConnectionString',
246-
'sasl.password': connection_string.strip(),
247-
}
244+
config_dict = {}
248245
try:
249246
for part in connection_string.split(';'):
250247
if 'Endpoint' in part:
251248
config_dict['bootstrap.servers'] = part.split('=')[1].strip(
252249
'"').replace('sb://', '').replace('/', '') + ':9093'
253250
elif 'EntityPath' in part:
254251
config_dict['kafka_topic'] = part.split('=')[1].strip('"')
252+
elif 'SharedAccessKeyName' in part:
253+
config_dict['sasl.username'] = '$ConnectionString'
254+
elif 'SharedAccessKey' in part:
255+
config_dict['sasl.password'] = connection_string.strip()
256+
elif 'BootstrapServer' in part:
257+
config_dict['bootstrap.servers'] = part.split('=', 1)[1].strip()
255258
except IndexError as e:
256259
raise ValueError("Invalid connection string format") from e
260+
if 'sasl.username' in config_dict:
261+
config_dict['security.protocol'] = 'SASL_SSL'
262+
config_dict['sasl.mechanism'] = 'PLAIN'
257263
return config_dict
258264

259265

@@ -302,17 +308,16 @@ def main():
302308
if not kafka_topic:
303309
print("Error: Kafka topic must be provided either through the command line or connection string.")
304310
sys.exit(1)
305-
if not sasl_username or not sasl_password:
306-
print("Error: SASL username and password must be provided either through the command line or connection string.")
307-
sys.exit(1)
308-
309311
kafka_config = {
310312
'bootstrap.servers': kafka_bootstrap_servers,
311-
'sasl.mechanisms': 'PLAIN',
312-
'security.protocol': 'SASL_SSL',
313-
'sasl.username': sasl_username,
314-
'sasl.password': sasl_password
315313
}
314+
if sasl_username and sasl_password:
315+
kafka_config.update({
316+
'sasl.mechanisms': 'PLAIN',
317+
'security.protocol': 'SASL_SSL',
318+
'sasl.username': sasl_username,
319+
'sasl.password': sasl_password
320+
})
316321

317322
poller = SWPCPoller(
318323
kafka_config=kafka_config,

noaa-ndbc/noaa_ndbc/noaa_ndbc.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -336,19 +336,25 @@ def parse_connection_string(connection_string: str) -> Dict[str, str]:
336336
Returns:
337337
Dict with bootstrap.servers, kafka_topic, sasl.username, sasl.password.
338338
"""
339-
config_dict = {
340-
'sasl.username': '$ConnectionString',
341-
'sasl.password': connection_string.strip(),
342-
}
339+
config_dict = {}
343340
try:
344341
for part in connection_string.split(';'):
345342
if 'Endpoint' in part:
346343
config_dict['bootstrap.servers'] = part.split('=')[1].strip(
347344
'"').replace('sb://', '').replace('/', '') + ':9093'
348345
elif 'EntityPath' in part:
349346
config_dict['kafka_topic'] = part.split('=')[1].strip('"')
347+
elif 'SharedAccessKeyName' in part:
348+
config_dict['sasl.username'] = '$ConnectionString'
349+
elif 'SharedAccessKey' in part:
350+
config_dict['sasl.password'] = connection_string.strip()
351+
elif 'BootstrapServer' in part:
352+
config_dict['bootstrap.servers'] = part.split('=', 1)[1].strip()
350353
except IndexError as e:
351354
raise ValueError("Invalid connection string format") from e
355+
if 'sasl.username' in config_dict:
356+
config_dict['security.protocol'] = 'SASL_SSL'
357+
config_dict['sasl.mechanism'] = 'PLAIN'
352358
return config_dict
353359

354360

@@ -397,17 +403,16 @@ def main():
397403
if not kafka_topic:
398404
print("Error: Kafka topic must be provided either through the command line or connection string.")
399405
sys.exit(1)
400-
if not sasl_username or not sasl_password:
401-
print("Error: SASL username and password must be provided either through the command line or connection string.")
402-
sys.exit(1)
403-
404406
kafka_config = {
405407
'bootstrap.servers': kafka_bootstrap_servers,
406-
'sasl.mechanisms': 'PLAIN',
407-
'security.protocol': 'SASL_SSL',
408-
'sasl.username': sasl_username,
409-
'sasl.password': sasl_password
410408
}
409+
if sasl_username and sasl_password:
410+
kafka_config.update({
411+
'sasl.mechanisms': 'PLAIN',
412+
'security.protocol': 'SASL_SSL',
413+
'sasl.username': sasl_username,
414+
'sasl.password': sasl_password
415+
})
411416

412417
poller = NDBCBuoyPoller(
413418
kafka_config=kafka_config,

0 commit comments

Comments
 (0)