Skip to content

Commit d6b2a50

Browse files
Implement share consumer support for OAUTHBEARER with background callback handling and enhance related tests
1 parent e422294 commit d6b2a50

File tree

7 files changed

+660
-6
lines changed

7 files changed

+660
-6
lines changed

.semaphore/run-sasl-tests.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,20 +43,20 @@ blocks:
4343
value: "False"
4444
commands:
4545
- if [[ "$TEST_TYPE" != *"plaintext"* ]]; then exit 0; fi
46-
- if [[ "$TEST_SASL" == "OAUTHBEARER" ]]; then (cd tests && python3 sasl_test.py --kraft --suite "OAuth/OIDC" ${TEST_KAFKA_GIT_REF:-4.2.0}) && echo "OIDC tests (0126) PASSED" || { echo "OIDC tests (0126) FAILED"; exit 1; }; fi
4746
- 'export TEST_TRIVUP_PARAMETERS=''--conf ["connections.max.reauth.ms=10000"]'''
4847
- 'if [[ "$TEST_SASL" == "OAUTHBEARER" ]]; then export TEST_TRIVUP_PARAMETERS=''--oidc --conf ["connections.max.reauth.ms=10000"]''; fi'
4948
- ./tests/run-all-tests.sh
49+
- if [[ "$TEST_SASL" == "OAUTHBEARER" ]]; then (cd tests && echo "TESTS=0126 make 2>&1; exit" | python3 interactive_broker_version.py --kraft --sasl OAUTHBEARER --oauthbearer-method OIDC ${TEST_KAFKA_GIT_REF:-4.2.0}) && echo "OIDC tests (0126) PASSED" || { echo "OIDC tests (0126) FAILED"; exit 1; }; fi
5050
- name: "SASL SSL cluster (x86_64)"
5151
env_vars:
5252
- name: TEST_SSL
5353
value: "True"
5454
commands:
5555
- if [[ "$TEST_TYPE" != *"ssl"* ]]; then exit 0; fi
56-
- if [[ "$TEST_SASL" == "OAUTHBEARER" ]]; then (cd tests && python3 sasl_test.py --kraft --suite "OAuth/OIDC" ${TEST_KAFKA_GIT_REF:-4.2.0}) && echo "OIDC tests (0126) PASSED" || { echo "OIDC tests (0126) FAILED"; exit 1; }; fi
5756
- 'export TEST_TRIVUP_PARAMETERS=''--conf ["connections.max.reauth.ms=10000"]'''
5857
- 'if [[ "$TEST_SASL" == "OAUTHBEARER" ]]; then export TEST_TRIVUP_PARAMETERS=''--oidc --conf ["connections.max.reauth.ms=10000"]''; fi'
5958
- ./tests/run-all-tests.sh
59+
- if [[ "$TEST_SASL" == "OAUTHBEARER" ]]; then (cd tests && echo "TESTS=0126 make 2>&1; exit" | python3 interactive_broker_version.py --kraft --sasl OAUTHBEARER --oauthbearer-method OIDC ${TEST_KAFKA_GIT_REF:-4.2.0}) && echo "OIDC tests (0126) PASSED" || { echo "OIDC tests (0126) FAILED"; exit 1; }; fi
6060
- name: "Run SASL tests (aarch64)"
6161
dependencies: []
6262
task:
@@ -73,17 +73,17 @@ blocks:
7373
value: "False"
7474
commands:
7575
- if [[ "$TEST_TYPE" != *"plaintext"* ]]; then exit 0; fi
76-
- if [[ "$TEST_SASL" == "OAUTHBEARER" ]]; then (cd tests && python3 sasl_test.py --kraft --suite "OAuth/OIDC" ${TEST_KAFKA_GIT_REF:-4.2.0}) && echo "OIDC tests (0126) PASSED" || { echo "OIDC tests (0126) FAILED"; exit 1; }; fi
7776
- 'export TEST_TRIVUP_PARAMETERS=''--conf ["connections.max.reauth.ms=10000"]'''
7877
- 'if [[ "$TEST_SASL" == "OAUTHBEARER" ]]; then export TEST_TRIVUP_PARAMETERS=''--oidc --conf ["connections.max.reauth.ms=10000"]''; fi'
7978
- ./tests/run-all-tests.sh
79+
- if [[ "$TEST_SASL" == "OAUTHBEARER" ]]; then (cd tests && echo "TESTS=0126 make 2>&1; exit" | python3 interactive_broker_version.py --kraft --sasl OAUTHBEARER --oauthbearer-method OIDC ${TEST_KAFKA_GIT_REF:-4.2.0}) && echo "OIDC tests (0126) PASSED" || { echo "OIDC tests (0126) FAILED"; exit 1; }; fi
8080
- name: "SASL SSL cluster (aarch64)"
8181
env_vars:
8282
- name: TEST_SSL
8383
value: "True"
8484
commands:
8585
- if [[ "$TEST_TYPE" != *"ssl"* ]]; then exit 0; fi
86-
- if [[ "$TEST_SASL" == "OAUTHBEARER" ]]; then (cd tests && python3 sasl_test.py --kraft --suite "OAuth/OIDC" ${TEST_KAFKA_GIT_REF:-4.2.0}) && echo "OIDC tests (0126) PASSED" || { echo "OIDC tests (0126) FAILED"; exit 1; }; fi
8786
- 'export TEST_TRIVUP_PARAMETERS=''--conf ["connections.max.reauth.ms=10000"]'''
8887
- 'if [[ "$TEST_SASL" == "OAUTHBEARER" ]]; then export TEST_TRIVUP_PARAMETERS=''--oidc --conf ["connections.max.reauth.ms=10000"]''; fi'
8988
- ./tests/run-all-tests.sh
89+
- if [[ "$TEST_SASL" == "OAUTHBEARER" ]]; then (cd tests && echo "TESTS=0126 make 2>&1; exit" | python3 interactive_broker_version.py --kraft --sasl OAUTHBEARER --oauthbearer-method OIDC ${TEST_KAFKA_GIT_REF:-4.2.0}) && echo "OIDC tests (0126) PASSED" || { echo "OIDC tests (0126) FAILED"; exit 1; }; fi

src/rdkafka.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3699,6 +3699,26 @@ RD_EXPORT
36993699
rd_kafka_error_t *rd_kafka_sasl_background_callbacks_enable(rd_kafka_t *rk);
37003700

37013701

3702+
/**
3703+
* @brief Enable SASL background callbacks for a share consumer.
3704+
*
3705+
* This is a convenience wrapper around
3706+
* rd_kafka_sasl_background_callbacks_enable() for share consumers.
3707+
* It forwards the SASL queue to the background thread so that
3708+
* OAUTHBEARER token refresh callbacks are served automatically.
3709+
*
3710+
* @param rkshare Share consumer instance.
3711+
*
3712+
* @returns NULL on success or an error object on failure.
3713+
*
3714+
* @sa rd_kafka_sasl_background_callbacks_enable()
3715+
* @sa rd_kafka_conf_set_oauthbearer_token_refresh_cb()
3716+
*/
3717+
RD_EXPORT
3718+
rd_kafka_error_t *
3719+
rd_kafka_share_sasl_background_callbacks_enable(rd_kafka_share_t *rkshare);
3720+
3721+
37023722
/**
37033723
* @brief Sets SASL credentials used for SASL PLAIN and SCRAM mechanisms by
37043724
* this Kafka client.

src/rdkafka_sasl.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,12 @@ rd_kafka_error_t *rd_kafka_sasl_background_callbacks_enable(rd_kafka_t *rk) {
476476
}
477477

478478

479+
rd_kafka_error_t *
480+
rd_kafka_share_sasl_background_callbacks_enable(rd_kafka_share_t *rkshare) {
481+
return rd_kafka_sasl_background_callbacks_enable(rkshare->rkshare_rk);
482+
}
483+
484+
479485
/**
480486
* Global SASL termination.
481487
*/

tests/0022-consume_batch.c

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,59 @@ static void do_test_consume_batch_oauthbearer_cb(void) {
193193

194194
rd_kafka_destroy(rk);
195195
}
196+
197+
198+
static rd_bool_t share_refresh_called = rd_false;
199+
200+
static void share_refresh_cb(rd_kafka_t *rk,
201+
const char *oauthbearer_config,
202+
void *opaque) {
203+
TEST_SAY("Share consumer refresh callback called\n");
204+
TEST_ASSERT(!share_refresh_called);
205+
share_refresh_called = rd_true;
206+
rd_kafka_oauthbearer_set_token_failure(rk, "Refresh called");
207+
}
208+
209+
/**
210+
* @brief Verify that the oauthbearer_refresh_cb() is triggered
211+
* when using rd_kafka_share_consume_batch() with a share consumer.
212+
*/
213+
static void do_test_share_consume_batch_oauthbearer_cb(void) {
214+
rd_kafka_share_t *rk;
215+
rd_kafka_conf_t *conf;
216+
rd_kafka_message_t *rkms[1];
217+
size_t rcvd = 0;
218+
rd_kafka_error_t *err;
219+
char errstr[512];
220+
221+
SUB_TEST_QUICK();
222+
223+
share_refresh_called = rd_false;
224+
225+
conf = rd_kafka_conf_new();
226+
test_conf_set(conf, "security.protocol", "sasl_plaintext");
227+
test_conf_set(conf, "sasl.mechanism", "OAUTHBEARER");
228+
test_conf_set(conf, "group.id", "share-oauthbearer-cb-test");
229+
rd_kafka_conf_set_oauthbearer_token_refresh_cb(conf,
230+
share_refresh_cb);
231+
232+
/* Create share consumer */
233+
rk = rd_kafka_share_consumer_new(conf, errstr, sizeof(errstr));
234+
TEST_ASSERT(rk, "Failed to create share consumer: %s", errstr);
235+
236+
/* Poll to trigger the token refresh callback */
237+
err = rd_kafka_share_consume_batch(rk, 1000, rkms, &rcvd);
238+
if (err)
239+
rd_kafka_error_destroy(err);
240+
241+
TEST_SAY("share_refresh_called = %d\n", share_refresh_called);
242+
TEST_ASSERT(share_refresh_called,
243+
"Expected refresh callback to have been called "
244+
"for share consumer");
245+
246+
rd_kafka_share_consumer_close(rk);
247+
rd_kafka_share_destroy(rk);
248+
}
196249
#endif
197250

198251

@@ -272,6 +325,7 @@ int main_0022_consume_batch(int argc, char **argv) {
272325
int main_0022_consume_batch_local(int argc, char **argv) {
273326
#if WITH_SASL_OAUTHBEARER
274327
do_test_consume_batch_oauthbearer_cb();
328+
do_test_share_consume_batch_oauthbearer_cb();
275329
#else
276330
TEST_SKIP("No OAUTHBEARER support\n");
277331
#endif

tests/0126-oauthbearer_oidc.c

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,95 @@ do_test_produce_consumer_with_OIDC(const char *test_name,
9494
}
9595

9696

97+
/**
98+
* @brief After config OIDC, make sure the share consumer
99+
* can work successfully.
100+
*/
101+
static void
102+
do_test_produce_share_consumer_with_OIDC(const char *test_name,
103+
const rd_kafka_conf_t *base_conf) {
104+
const char *topic;
105+
rd_kafka_t *p1;
106+
rd_kafka_share_t *sc1;
107+
rd_kafka_conf_t *conf;
108+
rd_kafka_topic_partition_list_t *subs;
109+
rd_kafka_message_t *batch[500];
110+
const char *grp_conf[] = {"share.auto.offset.reset", "SET", "earliest"};
111+
int consumed = 0, attempts;
112+
const int msg_cnt = 10;
113+
114+
const char *url = test_getenv("VALID_OIDC_URL", NULL);
115+
116+
SUB_TEST(
117+
"Test share consumer with oidc configuration: %s", test_name);
118+
119+
if (!url) {
120+
SUB_TEST_SKIP(
121+
"VALID_OIDC_URL environment variable is not set\n");
122+
return;
123+
}
124+
125+
conf = rd_kafka_conf_dup(base_conf);
126+
test_conf_set(conf, "sasl.oauthbearer.token.endpoint.url", url);
127+
128+
rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
129+
130+
p1 = test_create_handle(RD_KAFKA_PRODUCER, conf);
131+
132+
topic = test_mk_topic_name("0126-oauthbearer_oidc_share", 1);
133+
test_create_topic_wait_exists(p1, topic, 1, 3, 5000);
134+
TEST_SAY("Topic: %s is created\n", topic);
135+
136+
test_produce_msgs_easy(topic, 0, 0, msg_cnt);
137+
138+
/* Create share consumer (picks up SASL config from test_conf_init) */
139+
sc1 = test_create_share_consumer("oidc-share-group");
140+
141+
/* Set group config for earliest offset */
142+
test_IncrementalAlterConfigs_simple(
143+
p1, RD_KAFKA_RESOURCE_GROUP, "oidc-share-group", grp_conf, 1);
144+
145+
/* Subscribe */
146+
subs = rd_kafka_topic_partition_list_new(1);
147+
rd_kafka_topic_partition_list_add(subs, topic, RD_KAFKA_PARTITION_UA);
148+
rd_kafka_share_subscribe(sc1, subs);
149+
rd_kafka_topic_partition_list_destroy(subs);
150+
151+
/* Give it some time to trigger the token refresh. */
152+
rd_usleep(5 * 1000 * 1000, NULL);
153+
154+
/* Consume messages */
155+
attempts = 50;
156+
while (consumed < msg_cnt && attempts-- > 0) {
157+
size_t rcvd = 0;
158+
size_t m;
159+
rd_kafka_error_t *err;
160+
161+
err = rd_kafka_share_consume_batch(sc1, 3000, batch, &rcvd);
162+
if (err) {
163+
rd_kafka_error_destroy(err);
164+
continue;
165+
}
166+
167+
for (m = 0; m < rcvd; m++) {
168+
if (!batch[m]->err)
169+
consumed++;
170+
rd_kafka_message_destroy(batch[m]);
171+
}
172+
}
173+
174+
TEST_ASSERT(consumed == msg_cnt,
175+
"Expected %d messages, consumed %d", msg_cnt, consumed);
176+
TEST_SAY("Share consumer consumed %d/%d messages with OIDC\n",
177+
consumed, msg_cnt);
178+
179+
rd_kafka_share_consumer_close(sc1);
180+
rd_kafka_share_destroy(sc1);
181+
rd_kafka_destroy(p1);
182+
SUB_TEST_PASS();
183+
}
184+
185+
97186
static void
98187
auth_error_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque) {
99188
if (err == RD_KAFKA_RESP_ERR__AUTHENTICATION ||
@@ -149,6 +238,59 @@ static void do_test_produce_consumer_with_OIDC_expired_token_should_fail(
149238
}
150239

151240

241+
/**
242+
* @brief After config OIDC with expired token, make sure the share consumer
243+
* authentication fails as expected.
244+
*/
245+
static void
246+
do_test_produce_share_consumer_with_OIDC_expired_token_should_fail(
247+
const rd_kafka_conf_t *base_conf) {
248+
rd_kafka_share_t *sc1;
249+
rd_kafka_conf_t *conf;
250+
rd_kafka_message_t *batch[10];
251+
size_t rcvd = 0;
252+
rd_kafka_error_t *err;
253+
char errstr[512];
254+
255+
const char *expired_url = test_getenv("EXPIRED_TOKEN_OIDC_URL", NULL);
256+
257+
SUB_TEST("Test OAUTHBEARER/OIDC share consumer failing with "
258+
"expired JWT");
259+
260+
if (!expired_url) {
261+
SUB_TEST_SKIP(
262+
"EXPIRED_TOKEN_OIDC_URL environment variable is not "
263+
"set\n");
264+
return;
265+
}
266+
267+
conf = rd_kafka_conf_dup(base_conf);
268+
269+
error_seen = rd_false;
270+
test_conf_set(conf, "sasl.oauthbearer.token.endpoint.url",
271+
expired_url);
272+
test_conf_set(conf, "group.id", "oidc-share-fail");
273+
274+
rd_kafka_conf_set_error_cb(conf, auth_error_cb);
275+
276+
sc1 = rd_kafka_share_consumer_new(conf, errstr, sizeof(errstr));
277+
TEST_ASSERT(sc1, "Failed to create share consumer: %s", errstr);
278+
279+
/* Poll — should trigger auth error callback, no messages expected */
280+
err = rd_kafka_share_consume_batch(sc1, 10 * 1000, batch, &rcvd);
281+
if (err)
282+
rd_kafka_error_destroy(err);
283+
284+
TEST_ASSERT(error_seen,
285+
"Expected authentication error for share consumer "
286+
"with expired token");
287+
288+
rd_kafka_share_consumer_close(sc1);
289+
rd_kafka_share_destroy(sc1);
290+
SUB_TEST_PASS();
291+
}
292+
293+
152294
/**
153295
* @brief After configiguring OIDC, make sure the
154296
* authentication fails as expected.
@@ -233,6 +375,58 @@ do_test_produce_consumer_with_OIDC_should_fail_invalid_token_endpoint(
233375
SUB_TEST_PASS();
234376
}
235377

378+
379+
/**
380+
* @brief After config OIDC with invalid token endpoint, make sure the
381+
* share consumer authentication fails as expected.
382+
*/
383+
static void
384+
do_test_produce_share_consumer_with_OIDC_should_fail_invalid_token_endpoint(
385+
const rd_kafka_conf_t *base_conf) {
386+
rd_kafka_share_t *sc1;
387+
rd_kafka_conf_t *conf;
388+
rd_kafka_message_t *batch[10];
389+
size_t rcvd = 0;
390+
rd_kafka_error_t *err;
391+
char errstr[512];
392+
393+
const char *invalid_url = test_getenv("INVALID_OIDC_URL", NULL);
394+
395+
SUB_TEST("Test OAUTHBEARER/OIDC share consumer failing with "
396+
"invalid JWT");
397+
398+
if (!invalid_url) {
399+
SUB_TEST_SKIP(
400+
"INVALID_OIDC_URL environment variable is not set\n");
401+
return;
402+
}
403+
404+
conf = rd_kafka_conf_dup(base_conf);
405+
406+
error_seen = rd_false;
407+
test_conf_set(conf, "sasl.oauthbearer.token.endpoint.url",
408+
invalid_url);
409+
test_conf_set(conf, "group.id", "oidc-share-fail-invalid");
410+
411+
rd_kafka_conf_set_error_cb(conf, auth_error_cb);
412+
413+
sc1 = rd_kafka_share_consumer_new(conf, errstr, sizeof(errstr));
414+
TEST_ASSERT(sc1, "Failed to create share consumer: %s", errstr);
415+
416+
/* Poll — should trigger auth error callback, no messages expected */
417+
err = rd_kafka_share_consume_batch(sc1, 10 * 1000, batch, &rcvd);
418+
if (err)
419+
rd_kafka_error_destroy(err);
420+
421+
TEST_ASSERT(error_seen,
422+
"Expected authentication error for share consumer "
423+
"with invalid token endpoint");
424+
425+
rd_kafka_share_consumer_close(sc1);
426+
rd_kafka_share_destroy(sc1);
427+
SUB_TEST_PASS();
428+
}
429+
236430
typedef enum oidc_configuration_jwt_bearer_variation_t {
237431
/** Use a private key file. */
238432
OIDC_CONFIGURATION_JWT_BEARER_VARIATION_PRIVATE_KEY_FILE,
@@ -516,9 +710,13 @@ int main_0126_oauthbearer_oidc(int argc, char **argv) {
516710
}
517711

518712
do_test_produce_consumer_with_OIDC("client_credentials", conf);
713+
do_test_produce_share_consumer_with_OIDC("client_credentials", conf);
519714
do_test_produce_consumer_with_OIDC_should_fail_invalid_token_endpoint(
520715
conf);
716+
do_test_produce_share_consumer_with_OIDC_should_fail_invalid_token_endpoint(
717+
conf);
521718
do_test_produce_consumer_with_OIDC_expired_token_should_fail(conf);
719+
do_test_produce_share_consumer_with_OIDC_expired_token_should_fail(conf);
522720
do_test_produce_consumer_with_OIDC_jwt_bearer(conf);
523721
do_test_produce_consumer_with_OIDC_metadata_authentication(conf);
524722

0 commit comments

Comments
 (0)