Skip to content

Commit 0745eba

Browse files
authored
fix: Kafka source/sink - pass client certs for mTLS when insecureSkipVerify is false (#3059)
1 parent e37e981 commit 0745eba

File tree

1 file changed

+33
-42
lines changed

1 file changed

+33
-42
lines changed

rust/numaflow-core/src/config/components.rs

Lines changed: 33 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -195,59 +195,50 @@ fn parse_kafka_auth_config(
195195
};
196196

197197
let tls = if let Some(tls_config) = tls_config {
198-
let tls_skip_verify = tls_config.insecure_skip_verify.unwrap_or(false);
199-
if tls_skip_verify {
200-
Some(numaflow_kafka::TlsConfig {
201-
insecure_skip_verify: true,
202-
ca_cert: None,
203-
client_auth: None,
198+
let ca_cert = tls_config
199+
.ca_cert_secret
200+
.map(|ca_cert_secret| {
201+
match get_secret_from_volume(&ca_cert_secret.name, &ca_cert_secret.key) {
202+
Ok(secret) => Ok(secret),
203+
Err(e) => Err(Error::Config(format!(
204+
"Failed to get CA cert secret: {e:?}"
205+
))),
206+
}
204207
})
205-
} else {
206-
let ca_cert = tls_config
207-
.ca_cert_secret
208-
.map(|ca_cert_secret| {
209-
match get_secret_from_volume(&ca_cert_secret.name, &ca_cert_secret.key) {
210-
Ok(secret) => Ok(secret),
211-
Err(e) => Err(Error::Config(format!(
212-
"Failed to get CA cert secret: {e:?}"
213-
))),
214-
}
215-
})
216-
.transpose()?;
208+
.transpose()?;
217209

218-
let tls_client_auth_certs = match tls_config.cert_secret {
219-
Some(client_cert_secret) => {
220-
let client_cert =
221-
get_secret_from_volume(&client_cert_secret.name, &client_cert_secret.key)
222-
.map_err(|e| {
210+
let tls_client_auth_certs = match tls_config.cert_secret {
211+
Some(client_cert_secret) => {
212+
let client_cert =
213+
get_secret_from_volume(&client_cert_secret.name, &client_cert_secret.key)
214+
.map_err(|e| {
223215
Error::Config(format!("Failed to get client cert secret: {e:?}"))
224216
})?;
225217

226-
let Some(private_key_secret) = tls_config.key_secret else {
227-
return Err(Error::Config("Client cert is specified for TLS authentication, but private key is not specified".into()));
228-
};
218+
let Some(private_key_secret) = tls_config.key_secret else {
219+
return Err(Error::Config("Client cert is specified for TLS authentication, but private key is not specified".into()));
220+
};
229221

230-
let client_cert_private_key =
231-
get_secret_from_volume(&private_key_secret.name, &private_key_secret.key)
232-
.map_err(|e| {
222+
let client_cert_private_key =
223+
get_secret_from_volume(&private_key_secret.name, &private_key_secret.key)
224+
.map_err(|e| {
233225
Error::Config(format!(
234226
"Failed to get client cert private key secret: {e:?}"
235227
))
236228
})?;
237-
Some(numaflow_kafka::TlsClientAuthCerts {
238-
client_cert,
239-
client_cert_private_key,
240-
})
241-
}
242-
None => None,
243-
};
229+
Some(numaflow_kafka::TlsClientAuthCerts {
230+
client_cert,
231+
client_cert_private_key,
232+
})
233+
}
234+
None => None,
235+
};
244236

245-
Some(numaflow_kafka::TlsConfig {
246-
insecure_skip_verify: tls_config.insecure_skip_verify.unwrap_or(false),
247-
ca_cert,
248-
client_auth: tls_client_auth_certs,
249-
})
250-
}
237+
Some(numaflow_kafka::TlsConfig {
238+
insecure_skip_verify: tls_config.insecure_skip_verify.unwrap_or(false),
239+
ca_cert,
240+
client_auth: tls_client_auth_certs,
241+
})
251242
} else {
252243
None
253244
};

0 commit comments

Comments
 (0)