Skip to content

Commit f21766f

Browse files
Add support for different sub claims for authentication (#5336)
* Add documentation for new sub claim config * Add new config in rdkafka_conf * Modify logic based on subclaim name value * Add unit tests * Fix style format errors for the modified files * Add integration tests for sub claim name * Fix style check for 0126-oauthbearer_oidc * Add new trivup version 0.15.0 and update the dependency in requirements.txt * Remove redundant comments * Update Change log * Remove the link for PR as GH adds it automatically * Add validation for subclaim string configuration * Remove tests as preconditions will already be validated before * Function should fail agnostic of reason. Corrected the comment * Modify integration test to fail at configuration finalization * Fix comment for configuration.md consistency * Add back the default value for sub_claim_name configuration * Rebuilt CONFIGURATION.md and fix style check * Fix style check * Add unit tests for rd_kafka_conf_validate_str * Fix style check
1 parent 901af7c commit f21766f

File tree

9 files changed

+317
-17
lines changed

9 files changed

+317
-17
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
# librdkafka v2.14.0
2+
3+
librdkafka v2.14.0 is a feature release:
4+
5+
* [KIP-768](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575#KIP768:ExtendSASL/OAUTHBEARERwithSupportforOIDC-ClientConfiguration) Extend SASL/OAUTHBEARER to support OIDC claim mapping beyond the default `sub` claim (#5336).
6+
17
# librdkafka v2.13.2
28

39
librdkafka v2.13.2 is a maintenance release:

CONFIGURATION.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ sasl.oauthbearer.client.secret | * | |
108108
sasl.oauthbearer.scope | * | | | low | Client use this to specify the scope of the access request to the broker. Only used when `sasl.oauthbearer.method` is set to "oidc". <br>*Type: string*
109109
sasl.oauthbearer.extensions | * | | | low | Allow additional information to be provided to the broker. Comma-separated list of key=value pairs. E.g., "supportFeatureX=true,organizationId=sales-emea".Only used when `sasl.oauthbearer.method` is set to "oidc". <br>*Type: string*
110110
sasl.oauthbearer.token.endpoint.url | * | | | low | OAuth/OIDC issuer token endpoint HTTP(S) URI used to retrieve token. Only used when `sasl.oauthbearer.method` is set to "oidc". <br>*Type: string*
111+
sasl.oauthbearer.sub.claim.name | * | | sub | low | JWT claim name to use as the subject (principal) when validating OIDC access tokens. Must be present in the JWT payload with a non-empty value. Should match the broker's `sasl.oauthbearer.sub.claim.name` configuration for consistent authentication. Only used when `sasl.oauthbearer.method` is set to "oidc". <br>*Type: string*
111112
sasl.oauthbearer.grant.type | * | client_credentials, urn:ietf:params:oauth:grant-type:jwt-bearer | client_credentials | low | OAuth grant type to use when communicating with the identity provider. <br>*Type: enum value*
112113
sasl.oauthbearer.assertion.algorithm | * | RS256, ES256 | RS256 | low | Algorithm the client should use to sign the assertion sent to the identity provider and in the OAuth alg header in the JWT assertion. <br>*Type: enum value*
113114
sasl.oauthbearer.assertion.private.key.file | * | | | low | Path to client's private key (PEM) used for authentication when using the JWT assertion. <br>*Type: string*

src/rdkafka_conf.c

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,20 @@ rd_kafka_conf_validate_partitioner(const struct rd_kafka_property *prop,
353353
!strcmp(val, "fnv1a_random");
354354
}
355355

356+
/**
357+
* @brief Validate that a string is non-null, non-empty, and not
358+
* whitespace-only.
359+
*/
360+
static rd_bool_t rd_kafka_conf_validate_str(const char *value) {
361+
const char *p;
362+
if (!value || !*value)
363+
return rd_false;
364+
for (p = value; *p; p++) {
365+
if (!isspace((int)*p))
366+
return rd_true;
367+
}
368+
return rd_false;
369+
}
356370

357371
/**
358372
* librdkafka configuration property definitions.
@@ -1121,6 +1135,15 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
11211135
"OAuth/OIDC issuer token endpoint HTTP(S) URI used to retrieve token. "
11221136
"Only used when `sasl.oauthbearer.method` is set to \"oidc\".",
11231137
_UNSUPPORTED_OIDC},
1138+
{_RK_GLOBAL, "sasl.oauthbearer.sub.claim.name", _RK_C_STR,
1139+
_RK(sasl.oauthbearer.sub_claim_name),
1140+
"JWT claim name to use as the subject (principal) when validating "
1141+
"OIDC access tokens. Must be present in the JWT payload with a "
1142+
"non-empty value. Should match the broker's "
1143+
"`sasl.oauthbearer.sub.claim.name` configuration for consistent "
1144+
"authentication. "
1145+
"Only used when `sasl.oauthbearer.method` is set to \"oidc\".",
1146+
.sdef = "sub", _UNSUPPORTED_OIDC},
11241147
{
11251148
_RK_GLOBAL,
11261149
"sasl.oauthbearer.grant.type",
@@ -4133,6 +4156,13 @@ const char *rd_kafka_conf_finalize_oauthbearer_oidc(rd_kafka_conf_t *conf) {
41334156
conf->enabled_events |= RD_KAFKA_EVENT_BACKGROUND;
41344157
conf->sasl.enable_callback_queue = 1;
41354158
}
4159+
4160+
if (rd_kafka_conf_is_modified(conf,
4161+
"sasl.oauthbearer.sub.claim.name") &&
4162+
!rd_kafka_conf_validate_str(conf->sasl.oauthbearer.sub_claim_name))
4163+
return "`sasl.oauthbearer.sub.claim.name` must be "
4164+
"non-empty and not contain only whitespace";
4165+
41364166
return NULL;
41374167
}
41384168

@@ -4879,6 +4909,19 @@ int unittest_conf(void) {
48794909

48804910
rd_kafka_conf_destroy(conf);
48814911

4912+
/* Verify rd_kafka_conf_validate_str */
4913+
RD_UT_ASSERT(!rd_kafka_conf_validate_str(NULL), "NULL must be invalid");
4914+
RD_UT_ASSERT(!rd_kafka_conf_validate_str(""),
4915+
"empty string must be invalid");
4916+
RD_UT_ASSERT(!rd_kafka_conf_validate_str(" "),
4917+
"whitespace-only string must be invalid");
4918+
RD_UT_ASSERT(!rd_kafka_conf_validate_str("\t\n"),
4919+
"tab/newline-only string must be invalid");
4920+
RD_UT_ASSERT(rd_kafka_conf_validate_str("sub"),
4921+
"\"sub\" must be valid");
4922+
RD_UT_ASSERT(rd_kafka_conf_validate_str(" sub "),
4923+
"\" sub \" must be valid");
4924+
48824925
RD_UT_PASS();
48834926
}
48844927

src/rdkafka_conf.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,7 @@ struct rd_kafka_conf_s {
376376

377377

378378
char *extensions_str;
379+
char *sub_claim_name;
379380
rd_bool_t builtin_token_refresh_cb;
380381
/* SASL/OAUTHBEARER token refresh event callback */
381382
void (*token_refresh_cb)(rd_kafka_t *rk,

src/rdkafka_sasl_oauthbearer_oidc.c

Lines changed: 151 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -674,6 +674,7 @@ static char *rd_kafka_oidc_assertion_read_from_file(const char *file_path) {
674674
*/
675675
static char *rd_kafka_oidc_token_try_validate(cJSON *json,
676676
const char *field,
677+
const char *sub_claim_name,
677678
char **sub,
678679
double *exp,
679680
char *errstr,
@@ -729,19 +730,26 @@ static char *rd_kafka_oidc_token_try_validate(cJSON *json,
729730
goto fail;
730731
}
731732

732-
jwt_sub = cJSON_GetObjectItem(payloads, "sub");
733+
rd_dassert(sub_claim_name && *sub_claim_name);
734+
735+
jwt_sub = cJSON_GetObjectItem(payloads, sub_claim_name);
733736
if (jwt_sub == NULL) {
734737
rd_snprintf(errstr, errstr_size,
735738
"Expected JSON JWT response with "
736-
"\"sub\" field");
739+
"\"%s\" field",
740+
sub_claim_name);
737741
goto fail;
738742
}
739743

740744
*sub = cJSON_GetStringValue(jwt_sub);
741-
if (*sub == NULL) {
745+
if (*sub == NULL || **sub == '\0') {
746+
/* Reset to NULL to prevent a dangling pointer to cJSON
747+
* internal memory after cJSON_Delete(payloads) */
748+
*sub = NULL;
742749
rd_snprintf(errstr, errstr_size,
743750
"Expected JSON JWT response with "
744-
"valid \"sub\" field");
751+
"valid \"%s\" field (non-empty value required)",
752+
sub_claim_name);
745753
goto fail;
746754
}
747755
*sub = rd_strdup(*sub);
@@ -857,13 +865,14 @@ void rd_kafka_oidc_token_jwt_bearer_refresh_cb(rd_kafka_t *rk,
857865
* This function will try to validate the `access_token` and then the
858866
* `id_token`.
859867
*/
860-
jwt_token = rd_kafka_oidc_token_try_validate(json, "access_token", &sub,
861-
&exp, validate_errstr,
862-
sizeof(validate_errstr));
868+
jwt_token = rd_kafka_oidc_token_try_validate(
869+
json, "access_token", rk->rk_conf.sasl.oauthbearer.sub_claim_name,
870+
&sub, &exp, validate_errstr, sizeof(validate_errstr));
863871
if (!jwt_token)
864872
jwt_token = rd_kafka_oidc_token_try_validate(
865-
json, "id_token", &sub, &exp, validate_errstr,
866-
sizeof(validate_errstr));
873+
json, "id_token",
874+
rk->rk_conf.sasl.oauthbearer.sub_claim_name, &sub, &exp,
875+
validate_errstr, sizeof(validate_errstr));
867876

868877
if (!jwt_token) {
869878
rd_kafka_oauthbearer_set_token_failure(rk, validate_errstr);
@@ -965,9 +974,9 @@ void rd_kafka_oidc_token_client_credentials_refresh_cb(
965974
goto done;
966975
}
967976

968-
jwt_token = rd_kafka_oidc_token_try_validate(json, "access_token", &sub,
969-
&exp, set_token_errstr,
970-
sizeof(set_token_errstr));
977+
jwt_token = rd_kafka_oidc_token_try_validate(
978+
json, "access_token", rk->rk_conf.sasl.oauthbearer.sub_claim_name,
979+
&sub, &exp, set_token_errstr, sizeof(set_token_errstr));
971980
if (!jwt_token) {
972981
rd_kafka_oauthbearer_set_token_failure(rk, set_token_errstr);
973982
goto done;
@@ -1083,9 +1092,9 @@ void rd_kafka_oidc_token_metadata_azure_imds_refresh_cb(
10831092
goto done;
10841093
}
10851094

1086-
jwt_token = rd_kafka_oidc_token_try_validate(json, "access_token", &sub,
1087-
&exp, set_token_errstr,
1088-
sizeof(set_token_errstr));
1095+
jwt_token = rd_kafka_oidc_token_try_validate(
1096+
json, "access_token", rk->rk_conf.sasl.oauthbearer.sub_claim_name,
1097+
&sub, &exp, set_token_errstr, sizeof(set_token_errstr));
10891098
if (!jwt_token) {
10901099
rd_kafka_oauthbearer_set_token_failure(rk, set_token_errstr);
10911100
goto done;
@@ -1312,6 +1321,132 @@ static int ut_sasl_oauthbearer_oidc_post_fields_with_empty_scope(void) {
13121321
RD_UT_PASS();
13131322
}
13141323

1324+
/** All JWTs use a fake signature since token_try_validate() only
1325+
* decodes and inspects the payload; signature verification is done
1326+
* separately by the broker.
1327+
*
1328+
* JWT payloads (decoded):
1329+
* JWT_SUB_ONLY:
1330+
* {"exp":9999999999,"iat":1000000000,"sub":"subject"}
1331+
*
1332+
* JWT_MULTI_CLAIMS:
1333+
* {"exp":9999999999,"iat":1000000000,"sub":"subject",
1334+
* "client_id":"client_id_123","azp":"azp_123"}
1335+
*
1336+
* JWT_EMPTY_SUB:
1337+
* {"exp":9999999999,"iat":1000000000,"sub":""}
1338+
*
1339+
* JWT_MISSING_SUB:
1340+
* {"exp":9999999999,"iat":1000000000,"client_id":"client_id_123"}
1341+
*/
1342+
/* payload: {"exp":9999999999,"iat":1000000000,"sub":"subject"} */
1343+
#define UT_JWT_SUB_ONLY \
1344+
"eyJhbGciOiJSUzI1NiIsImtpZCI6ImFiY2RlZmcifQ" \
1345+
"." \
1346+
"eyJleHAiOjk5OTk5OTk5OTksImlhdCI6MTAwMDAwMDAwMCwic" \
1347+
"3ViIjoic3ViamVjdCJ9" \
1348+
"." \
1349+
"fakesignature"
1350+
/* payload: {"exp":9999999999,"iat":1000000000,"sub":"subject",
1351+
* "client_id":"client_id_123","azp":"azp_123"} */
1352+
#define UT_JWT_MULTI_CLAIMS \
1353+
"eyJhbGciOiJSUzI1NiIsImtpZCI6ImFiY2RlZmcifQ" \
1354+
"." \
1355+
"eyJleHAiOjk5OTk5OTk5OTksImlhdCI6MTAwMDAwMDAwMCwic3ViIjoic3ViamVj" \
1356+
"dCIsImNsaWVudF9pZCI6ImNsaWVudF9pZF8xMjMiLCJhenAiOiJhenBfMTIzIn0" \
1357+
"." \
1358+
"fakesignature"
1359+
/* payload: {"exp":9999999999,"iat":1000000000,"sub":""} */
1360+
#define UT_JWT_EMPTY_SUB \
1361+
"eyJhbGciOiJSUzI1NiIsImtpZCI6ImFiY2RlZmcifQ" \
1362+
"." \
1363+
"eyJleHAiOjk5OTk5OTk5OTksImlhdCI6MTAwMDAwMDAwMCwic3ViIjoiIn0" \
1364+
"." \
1365+
"fakesignature"
1366+
/* payload: {"exp":9999999999,"iat":1000000000,"client_id":"client_id_123"} */
1367+
#define UT_JWT_MISSING_SUB \
1368+
"eyJhbGciOiJSUzI1NiIsImtpZCI6ImFiY2RlZmcifQ" \
1369+
"." \
1370+
"eyJleHAiOjk5OTk5OTk5OTksImlhdCI6MTAwMDAwMDAwMCwiY2xpZW50X2lkIjoi" \
1371+
"Y2xpZW50X2lkXzEyMyJ9" \
1372+
"." \
1373+
"fakesignature"
1374+
1375+
/**
1376+
* @brief Verifies the extraction logic of the subject from the configured JWT
1377+
* claim name, falls back to "sub" when unconfigured, and rejects missing or
1378+
* empty claim values.
1379+
*/
1380+
static int ut_sasl_oauthbearer_oidc_sub_claim_name(void) {
1381+
1382+
const struct {
1383+
const char *test_name;
1384+
const char *jwt;
1385+
const char *sub_claim_name;
1386+
rd_bool_t expect_success;
1387+
const char *expected_sub;
1388+
} tests[] = {
1389+
{"Explicit 'sub' claim name", UT_JWT_SUB_ONLY, "sub", rd_true,
1390+
"subject"},
1391+
{"Custom 'client_id' claim", UT_JWT_MULTI_CLAIMS, "client_id",
1392+
rd_true, "client_id_123"},
1393+
{"Custom 'azp' claim", UT_JWT_MULTI_CLAIMS, "azp", rd_true,
1394+
"azp_123"},
1395+
{"Custom 'client_id' claim succeeds without sub",
1396+
UT_JWT_MISSING_SUB, "client_id", rd_true, "client_id_123"},
1397+
{"Missing 'sub' claim fails", UT_JWT_MISSING_SUB, "sub", rd_false,
1398+
NULL},
1399+
{"Empty 'sub' value fails", UT_JWT_EMPTY_SUB, "sub", rd_false,
1400+
NULL},
1401+
{"Nonexistent claim name fails", UT_JWT_SUB_ONLY, "nonexistent",
1402+
rd_false, NULL},
1403+
};
1404+
1405+
unsigned int i;
1406+
1407+
RD_UT_BEGIN();
1408+
1409+
for (i = 0; i < RD_ARRAYSIZE(tests); i++) {
1410+
char *sub = NULL;
1411+
double exp_v = 0;
1412+
char errstr[256];
1413+
char *result;
1414+
cJSON *json;
1415+
char access_token_json[2048];
1416+
1417+
rd_snprintf(access_token_json, sizeof(access_token_json),
1418+
"{\"access_token\":\"%s\"}", tests[i].jwt);
1419+
json = cJSON_Parse(access_token_json);
1420+
RD_UT_ASSERT(json != NULL, "[%s] Failed to build test JSON",
1421+
tests[i].test_name);
1422+
1423+
result = rd_kafka_oidc_token_try_validate(
1424+
json, "access_token", tests[i].sub_claim_name, &sub, &exp_v,
1425+
errstr, sizeof(errstr));
1426+
1427+
if (tests[i].expect_success) {
1428+
RD_UT_ASSERT(result != NULL,
1429+
"[%s] Expected success but got error: %s",
1430+
tests[i].test_name, errstr);
1431+
RD_UT_ASSERT(sub != NULL, "[%s] Expected non-NULL sub",
1432+
tests[i].test_name);
1433+
RD_UT_ASSERT(!strcmp(sub, tests[i].expected_sub),
1434+
"[%s] Expected sub '%s', got '%s'",
1435+
tests[i].test_name, tests[i].expected_sub,
1436+
sub);
1437+
} else {
1438+
RD_UT_ASSERT(result == NULL,
1439+
"[%s] Expected failure but got sub '%s'",
1440+
tests[i].test_name, sub ? sub : "(null)");
1441+
}
1442+
1443+
RD_IF_FREE(sub, rd_free);
1444+
cJSON_Delete(json);
1445+
}
1446+
1447+
RD_UT_PASS();
1448+
}
1449+
13151450

13161451
/**
13171452
* @brief make sure the jwt is able to be extracted from HTTP(S) requests
@@ -1323,6 +1458,7 @@ int unittest_sasl_oauthbearer_oidc(void) {
13231458
fails += ut_sasl_oauthbearer_oidc_with_empty_key();
13241459
fails += ut_sasl_oauthbearer_oidc_post_fields();
13251460
fails += ut_sasl_oauthbearer_oidc_post_fields_with_empty_scope();
1461+
fails += ut_sasl_oauthbearer_oidc_sub_claim_name();
13261462
return fails;
13271463
}
13281464

0 commit comments

Comments
 (0)