Skip to content

Commit 109ad3c

Browse files
committed
use correct port in discovery for kerberos. Removed bootstrap changes for non-kerberos.
1 parent 2131479 commit 109ad3c

File tree

4 files changed

+35
-104
lines changed

4 files changed

+35
-104
lines changed

rust/crd/src/listener.rs

Lines changed: 10 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -212,23 +212,7 @@ pub fn get_kafka_listener_config(
212212
}
213213

214214
// BOOTSTRAP
215-
if kafka_security.tls_client_authentication_class().is_some() {
216-
listeners.push(KafkaListener {
217-
name: KafkaListenerName::Bootstrap,
218-
host: LISTENER_LOCAL_ADDRESS.to_string(),
219-
port: kafka_security.bootstrap_port().to_string(),
220-
});
221-
advertised_listeners.push(KafkaListener {
222-
name: KafkaListenerName::Bootstrap,
223-
host: node_address_cmd(STACKABLE_LISTENER_BROKER_DIR),
224-
port: node_port_cmd(
225-
STACKABLE_LISTENER_BROKER_DIR,
226-
kafka_security.client_port_name(),
227-
),
228-
});
229-
listener_security_protocol_map
230-
.insert(KafkaListenerName::Bootstrap, KafkaListenerProtocol::Ssl);
231-
} else if kafka_security.has_kerberos_enabled() {
215+
if kafka_security.has_kerberos_enabled() {
232216
listeners.push(KafkaListener {
233217
name: KafkaListenerName::Bootstrap,
234218
host: LISTENER_LOCAL_ADDRESS.to_string(),
@@ -244,40 +228,6 @@ pub fn get_kafka_listener_config(
244228
});
245229
listener_security_protocol_map
246230
.insert(KafkaListenerName::Bootstrap, KafkaListenerProtocol::SaslSsl);
247-
} else if kafka_security.tls_server_secret_class().is_some() {
248-
listeners.push(KafkaListener {
249-
name: KafkaListenerName::Bootstrap,
250-
host: LISTENER_LOCAL_ADDRESS.to_string(),
251-
port: kafka_security.bootstrap_port().to_string(),
252-
});
253-
advertised_listeners.push(KafkaListener {
254-
name: KafkaListenerName::Bootstrap,
255-
host: node_address_cmd(STACKABLE_LISTENER_BROKER_DIR),
256-
port: node_port_cmd(
257-
STACKABLE_LISTENER_BROKER_DIR,
258-
kafka_security.client_port_name(),
259-
),
260-
});
261-
listener_security_protocol_map
262-
.insert(KafkaListenerName::Bootstrap, KafkaListenerProtocol::Ssl);
263-
} else {
264-
listeners.push(KafkaListener {
265-
name: KafkaListenerName::Bootstrap,
266-
host: LISTENER_LOCAL_ADDRESS.to_string(),
267-
port: KafkaTlsSecurity::BOOTSTRAP_PORT.to_string(),
268-
});
269-
advertised_listeners.push(KafkaListener {
270-
name: KafkaListenerName::Bootstrap,
271-
host: node_address_cmd(STACKABLE_LISTENER_BROKER_DIR),
272-
port: node_port_cmd(
273-
STACKABLE_LISTENER_BROKER_DIR,
274-
kafka_security.client_port_name(),
275-
),
276-
});
277-
listener_security_protocol_map.insert(
278-
KafkaListenerName::Bootstrap,
279-
KafkaListenerProtocol::Plaintext,
280-
);
281231
}
282232

283233
Ok(KafkaListenerConfig {
@@ -372,22 +322,20 @@ mod tests {
372322
assert_eq!(
373323
config.listeners(),
374324
format!(
375-
"{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port},{bootstrap_name}://{internal_host}:{bootstrap_port}",
325+
"{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}",
376326
name = KafkaListenerName::ClientAuth,
377327
host = LISTENER_LOCAL_ADDRESS,
378328
port = kafka_security.client_port(),
379329
internal_name = KafkaListenerName::Internal,
380330
internal_host = LISTENER_LOCAL_ADDRESS,
381331
internal_port = kafka_security.internal_port(),
382-
bootstrap_name = KafkaListenerName::Bootstrap,
383-
bootstrap_port = kafka_security.bootstrap_port(),
384332
)
385333
);
386334

387335
assert_eq!(
388336
config.advertised_listeners(),
389337
format!(
390-
"{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port},{bootstrap_name}://{bootstrap_host}:{bootstrap_port}",
338+
"{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}",
391339
name = KafkaListenerName::ClientAuth,
392340
host = node_address_cmd(STACKABLE_LISTENER_BROKER_DIR),
393341
port = node_port_cmd(
@@ -397,25 +345,17 @@ mod tests {
397345
internal_name = KafkaListenerName::Internal,
398346
internal_host = pod_fqdn(&kafka, object_name, &cluster_info).unwrap(),
399347
internal_port = kafka_security.internal_port(),
400-
bootstrap_name = KafkaListenerName::Bootstrap,
401-
bootstrap_host = node_address_cmd(STACKABLE_LISTENER_BROKER_DIR),
402-
bootstrap_port = node_port_cmd(
403-
STACKABLE_LISTENER_BROKER_DIR,
404-
kafka_security.client_port_name()
405-
),
406348
)
407349
);
408350

409351
assert_eq!(
410352
config.listener_security_protocol_map(),
411353
format!(
412-
"{name}:{protocol},{internal_name}:{internal_protocol},{bootstrap_name}:{bootstrap_protocol}",
354+
"{name}:{protocol},{internal_name}:{internal_protocol}",
413355
name = KafkaListenerName::ClientAuth,
414356
protocol = KafkaListenerProtocol::Ssl,
415357
internal_name = KafkaListenerName::Internal,
416358
internal_protocol = KafkaListenerProtocol::Ssl,
417-
bootstrap_name = KafkaListenerName::Bootstrap,
418-
bootstrap_protocol = KafkaListenerProtocol::Ssl
419359
)
420360
);
421361

@@ -430,23 +370,20 @@ mod tests {
430370
assert_eq!(
431371
config.listeners(),
432372
format!(
433-
"{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port},{bootstrap_name}://{bootstrap_host}:{bootstrap_port}",
373+
"{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}",
434374
name = KafkaListenerName::Client,
435375
host = LISTENER_LOCAL_ADDRESS,
436376
port = kafka_security.client_port(),
437377
internal_name = KafkaListenerName::Internal,
438378
internal_host = LISTENER_LOCAL_ADDRESS,
439379
internal_port = kafka_security.internal_port(),
440-
bootstrap_name = KafkaListenerName::Bootstrap,
441-
bootstrap_host = LISTENER_LOCAL_ADDRESS,
442-
bootstrap_port = kafka_security.bootstrap_port(),
443380
)
444381
);
445382

446383
assert_eq!(
447384
config.advertised_listeners(),
448385
format!(
449-
"{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port},{bootstrap_name}://{bootstrap_host}:{bootstrap_port}",
386+
"{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}",
450387
name = KafkaListenerName::Client,
451388
host = node_address_cmd(STACKABLE_LISTENER_BROKER_DIR),
452389
port = node_port_cmd(
@@ -456,25 +393,17 @@ mod tests {
456393
internal_name = KafkaListenerName::Internal,
457394
internal_host = pod_fqdn(&kafka, object_name, &cluster_info).unwrap(),
458395
internal_port = kafka_security.internal_port(),
459-
bootstrap_name = KafkaListenerName::Bootstrap,
460-
bootstrap_host = node_address_cmd(STACKABLE_LISTENER_BROKER_DIR),
461-
bootstrap_port = node_port_cmd(
462-
STACKABLE_LISTENER_BROKER_DIR,
463-
kafka_security.client_port_name()
464-
),
465396
)
466397
);
467398

468399
assert_eq!(
469400
config.listener_security_protocol_map(),
470401
format!(
471-
"{name}:{protocol},{internal_name}:{internal_protocol},{bootstrap_name}:{bootstrap_protocol}",
402+
"{name}:{protocol},{internal_name}:{internal_protocol}",
472403
name = KafkaListenerName::Client,
473404
protocol = KafkaListenerProtocol::Ssl,
474405
internal_name = KafkaListenerName::Internal,
475406
internal_protocol = KafkaListenerProtocol::Ssl,
476-
bootstrap_name = KafkaListenerName::Bootstrap,
477-
bootstrap_protocol = KafkaListenerProtocol::Ssl
478407
)
479408
);
480409

@@ -490,23 +419,20 @@ mod tests {
490419
assert_eq!(
491420
config.listeners(),
492421
format!(
493-
"{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port},{bootstrap_name}://{bootstrap_host}:{bootstrap_port}",
422+
"{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}",
494423
name = KafkaListenerName::Client,
495424
host = LISTENER_LOCAL_ADDRESS,
496425
port = kafka_security.client_port(),
497426
internal_name = KafkaListenerName::Internal,
498427
internal_host = LISTENER_LOCAL_ADDRESS,
499428
internal_port = kafka_security.internal_port(),
500-
bootstrap_name = KafkaListenerName::Bootstrap,
501-
bootstrap_host = LISTENER_LOCAL_ADDRESS,
502-
bootstrap_port = kafka_security.bootstrap_port(),
503429
)
504430
);
505431

506432
assert_eq!(
507433
config.advertised_listeners(),
508434
format!(
509-
"{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port},{bootstrap_name}://{bootstrap_host}:{bootstrap_port}",
435+
"{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}",
510436
name = KafkaListenerName::Client,
511437
host = node_address_cmd(STACKABLE_LISTENER_BROKER_DIR),
512438
port = node_port_cmd(
@@ -516,25 +442,17 @@ mod tests {
516442
internal_name = KafkaListenerName::Internal,
517443
internal_host = pod_fqdn(&kafka, object_name, &cluster_info).unwrap(),
518444
internal_port = kafka_security.internal_port(),
519-
bootstrap_name = KafkaListenerName::Bootstrap,
520-
bootstrap_host = node_address_cmd(STACKABLE_LISTENER_BROKER_DIR),
521-
bootstrap_port = node_port_cmd(
522-
STACKABLE_LISTENER_BROKER_DIR,
523-
kafka_security.client_port_name()
524-
),
525445
)
526446
);
527447

528448
assert_eq!(
529449
config.listener_security_protocol_map(),
530450
format!(
531-
"{name}:{protocol},{internal_name}:{internal_protocol},{bootstrap_name}:{bootstrap_protocol}",
451+
"{name}:{protocol},{internal_name}:{internal_protocol}",
532452
name = KafkaListenerName::Client,
533453
protocol = KafkaListenerProtocol::Plaintext,
534454
internal_name = KafkaListenerName::Internal,
535455
internal_protocol = KafkaListenerProtocol::Plaintext,
536-
bootstrap_name = KafkaListenerName::Bootstrap,
537-
bootstrap_protocol = KafkaListenerProtocol::Plaintext
538456
)
539457
);
540458
}

rust/crd/src/security.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,14 @@ impl KafkaTlsSecurity {
7373
pub const CLIENT_PORT: u16 = 9092;
7474
pub const SECURE_CLIENT_PORT_NAME: &'static str = "kafka-tls";
7575
pub const SECURE_CLIENT_PORT: u16 = 9093;
76+
// bootstrap: we will have a single named port with different values for
77+
// secure (9095) and insecure (9094). The bootstrap listener is needed to
78+
// be able to expose principals for both the broker and bootstrap in the
79+
// JAAS configuration, so that clients can use both.
7680
pub const BOOTSTRAP_PORT_NAME: &'static str = "bootstrap";
7781
pub const BOOTSTRAP_PORT: u16 = 9094;
7882
pub const SECURE_BOOTSTRAP_PORT: u16 = 9095;
83+
// internal
7984
pub const INTERNAL_PORT: u16 = 19092;
8085
pub const SECURE_INTERNAL_PORT: u16 = 19093;
8186
// - TLS global
@@ -494,9 +499,7 @@ impl KafkaTlsSecurity {
494499
);
495500
}
496501

497-
if self.tls_client_authentication_class().is_some()
498-
|| self.tls_server_secret_class().is_some()
499-
{
502+
if self.has_kerberos_enabled() {
500503
// Bootstrap
501504
config.insert(
502505
Self::BOOTSTRAP_SSL_KEYSTORE_LOCATION.to_string(),

rust/operator-binary/src/discovery.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,11 @@ pub async fn build_discovery_configmaps(
6060
listeners: &[Listener],
6161
) -> Result<Vec<ConfigMap>, Error> {
6262
let name = owner.name_unchecked();
63-
let port_name = kafka_security.client_port_name();
63+
let port_name = if kafka_security.has_kerberos_enabled() {
64+
kafka_security.bootstrap_port_name()
65+
} else {
66+
kafka_security.client_port_name()
67+
};
6468
Ok(vec![
6569
build_discovery_configmap(
6670
kafka,

rust/operator-binary/src/kafka_controller.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1188,7 +1188,7 @@ pub fn error_policy(
11881188

11891189
/// We only expose client HTTP / HTTPS and Metrics ports.
11901190
fn listener_ports(kafka_security: &KafkaTlsSecurity) -> Vec<ListenerPort> {
1191-
vec![
1191+
let mut ports = vec![
11921192
ListenerPort {
11931193
name: METRICS_PORT_NAME.to_string(),
11941194
port: METRICS_PORT.into(),
@@ -1199,17 +1199,20 @@ fn listener_ports(kafka_security: &KafkaTlsSecurity) -> Vec<ListenerPort> {
11991199
port: kafka_security.client_port().into(),
12001200
protocol: Some("TCP".to_string()),
12011201
},
1202-
ListenerPort {
1202+
];
1203+
if kafka_security.has_kerberos_enabled() {
1204+
ports.push(ListenerPort {
12031205
name: kafka_security.bootstrap_port_name().to_string(),
12041206
port: kafka_security.bootstrap_port().into(),
12051207
protocol: Some("TCP".to_string()),
1206-
},
1207-
]
1208+
});
1209+
}
1210+
ports
12081211
}
12091212

12101213
/// We only expose client HTTP / HTTPS and Metrics ports.
12111214
fn container_ports(kafka_security: &KafkaTlsSecurity) -> Vec<ContainerPort> {
1212-
vec![
1215+
let mut ports = vec![
12131216
ContainerPort {
12141217
name: Some(METRICS_PORT_NAME.to_string()),
12151218
container_port: METRICS_PORT.into(),
@@ -1222,11 +1225,14 @@ fn container_ports(kafka_security: &KafkaTlsSecurity) -> Vec<ContainerPort> {
12221225
protocol: Some("TCP".to_string()),
12231226
..ContainerPort::default()
12241227
},
1225-
ContainerPort {
1228+
];
1229+
if kafka_security.has_kerberos_enabled() {
1230+
ports.push(ContainerPort {
12261231
name: Some(kafka_security.bootstrap_port_name().to_string()),
12271232
container_port: kafka_security.bootstrap_port().into(),
12281233
protocol: Some("TCP".to_string()),
12291234
..ContainerPort::default()
1230-
},
1231-
]
1235+
});
1236+
}
1237+
ports
12321238
}

0 commit comments

Comments
 (0)