Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 58 additions & 7 deletions src/rdkafka_ssl.c
Original file line number Diff line number Diff line change
Expand Up @@ -484,18 +484,47 @@ static int rd_kafka_transport_ssl_set_endpoint_id(rd_kafka_transport_t *rktrans,
RD_KAFKA_SSL_ENDPOINT_ID_NONE)
return 0;

/* Check if connecting to an IP address */
rd_bool_t is_ip = rd_false;
if (/*ipv6*/ (strchr(name, ':') &&
strspn(name, "0123456789abcdefABCDEF:.[]%") ==
strlen(name)) ||
/*ipv4*/ strspn(name, "0123456789.") == strlen(name)) {
is_ip = rd_true;
}

#if OPENSSL_VERSION_NUMBER >= 0x10100000 && !defined(OPENSSL_IS_BORINGSSL)
if (!SSL_set1_host(rktrans->rktrans_ssl, name))
goto fail;
/* OpenSSL 1.1.0+ has SSL_set1_host for hostnames
* but IP addresses should use the IP-specific function */
if (is_ip) {
#if OPENSSL_VERSION_NUMBER >= 0x10100000
/* Use IP-specific function for proper IP matching */
X509_VERIFY_PARAM *param = SSL_get0_param(rktrans->rktrans_ssl);
if (!X509_VERIFY_PARAM_set1_ip_asc(param, name))
goto fail;
#else
if (!SSL_set1_host(rktrans->rktrans_ssl, name))
goto fail;
#endif
} else {
if (!SSL_set1_host(rktrans->rktrans_ssl, name))
goto fail;
}
#elif OPENSSL_VERSION_NUMBER >= 0x1000200fL /* 1.0.2 */
{
X509_VERIFY_PARAM *param;

param = SSL_get0_param(rktrans->rktrans_ssl);

if (!X509_VERIFY_PARAM_set1_host(param, name,
/* Use IP-specific function for IP addresses */
if (is_ip) {
if (!X509_VERIFY_PARAM_set1_ip_asc(param, name))
goto fail;
} else {
if (!X509_VERIFY_PARAM_set1_host(param, name,
strnlen(name, sizeof(name))))
goto fail;
goto fail;
}
}
#else
rd_snprintf(errstr, errstr_size,
Expand All @@ -506,7 +535,8 @@ static int rd_kafka_transport_ssl_set_endpoint_id(rd_kafka_transport_t *rktrans,
#endif

rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "ENDPOINT",
"Enabled endpoint identification using hostname %s", name);
"Enabled endpoint identification using %s %s",
is_ip ? "IP address" : "hostname", name);

return 0;

Expand Down Expand Up @@ -608,10 +638,20 @@ static int rd_kafka_transport_ssl_verify(rd_kafka_transport_t *rktrans) {
}

if ((rl = SSL_get_verify_result(rktrans->rktrans_ssl)) != X509_V_OK) {
char subject[256] = "";
char issuer[256] = "";
if (cert) {
X509_NAME_oneline(X509_get_subject_name(cert), subject,
sizeof(subject));
X509_NAME_oneline(X509_get_issuer_name(cert), issuer,
sizeof(issuer));
}
rd_kafka_broker_fail(rktrans->rktrans_rkb, LOG_ERR,
RD_KAFKA_RESP_ERR__SSL,
"Failed to verify broker certificate: %s",
X509_verify_cert_error_string(rl));
"Failed to verify broker certificate: %s "
"(subject=%s, issuer=%s, openssl=0x%lx)",
X509_verify_cert_error_string(rl),
subject, issuer, OPENSSL_VERSION_NUMBER);
return -1;
}

Expand Down Expand Up @@ -1215,6 +1255,11 @@ static int rd_kafka_ssl_set_certs(rd_kafka_t *rk,
return -1;
}

rd_kafka_dbg(rk, SECURITY, "SSL",
"Loaded CA certificates from %s: %s",
is_dir ? "directory" : "file",
rk->rk_conf.ssl.ca_location);

ca_probe = rd_false;
}

Expand Down Expand Up @@ -1910,6 +1955,12 @@ int rd_kafka_ssl_ctx_init(rd_kafka_t *rk, char *errstr, size_t errstr_size) {
}

/* Set up broker certificate verification. */
rd_kafka_dbg(rk, SECURITY, "SSL",
"Setting up verification: enable_verify=%d, "
"cert_verify_cb=%s, security_level=%d",
rk->rk_conf.ssl.enable_verify,
rk->rk_conf.ssl.cert_verify_cb ? "set" : "NULL",
SSL_CTX_get_security_level(ctx));
SSL_CTX_set_verify(ctx,
rk->rk_conf.ssl.enable_verify ? SSL_VERIFY_PEER
: SSL_VERIFY_NONE,
Expand Down