Skip to content

Commit 2834d65

Browse files
committed
Subscribed topics cache
1 parent 90b8595 commit 2834d65

File tree

2 files changed

+74
-55
lines changed

2 files changed

+74
-55
lines changed

src/rdkafka_cgrp.c

Lines changed: 63 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -5415,6 +5415,64 @@ rd_kafka_cgrp_subscription_set(rd_kafka_cgrp_t *rkcg,
54155415
return rd_atomic32_add(&rkcg->rkcg_subscription_version, 1);
54165416
}
54175417

5418+
static rd_bool_t
5419+
rd_kafka_cgrp_stale_subscribed_topics_cache(rd_kafka_cgrp_t *rkcg) {
5420+
rd_bool_t stale = rd_false;
5421+
int32_t current_subscription_version =
5422+
rd_atomic32_get(&rkcg->rkcg_subscription_version);
5423+
rd_ts_t current_ts_metadata = rkcg->rkcg_rk->rk_ts_metadata;
5424+
5425+
stale = rkcg->rkcg_subscribed_topics_cache.rk_ts_metadata !=
5426+
current_ts_metadata ||
5427+
rkcg->rkcg_subscribed_topics_cache.subscription_version !=
5428+
current_subscription_version;
5429+
if (stale) {
5430+
rkcg->rkcg_subscribed_topics_cache.rk_ts_metadata =
5431+
current_ts_metadata;
5432+
rkcg->rkcg_subscribed_topics_cache.subscription_version =
5433+
current_subscription_version;
5434+
}
5435+
return stale;
5436+
}
5437+
5438+
static rd_bool_t
5439+
rd_kafka_cgrp_set_subscribed_topics_from_subscription(rd_kafka_cgrp_t *rkcg) {
5440+
rd_list_t *tinfos;
5441+
rd_kafka_topic_partition_list_t *errored;
5442+
5443+
if (!rd_kafka_cgrp_stale_subscribed_topics_cache(rkcg))
5444+
return rd_false; /* Not stale, no change */
5445+
5446+
/*
5447+
* Unmatched topics will be added to the errored list.
5448+
*/
5449+
errored = rd_kafka_topic_partition_list_new(0);
5450+
5451+
/*
5452+
* Create a list of the topics in metadata that matches our subscription
5453+
*/
5454+
tinfos = rd_list_new(rkcg->rkcg_subscription->cnt,
5455+
rd_kafka_topic_info_destroy_free);
5456+
5457+
if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION)
5458+
rd_kafka_metadata_topic_match(rkcg->rkcg_rk, tinfos,
5459+
rkcg->rkcg_subscription, errored);
5460+
else
5461+
rd_kafka_metadata_topic_filter(
5462+
rkcg->rkcg_rk, tinfos, rkcg->rkcg_subscription, errored);
5463+
5464+
/*
5465+
* Propagate consumer errors for any non-existent or errored topics.
5466+
* The function takes ownership of errored.
5467+
*/
5468+
rd_kafka_propagate_consumer_topic_errors(
5469+
rkcg, errored, "Subscribed topic not available");
5470+
5471+
/*
5472+
* Update effective list of topics (takes ownership of \c tinfos)
5473+
*/
5474+
return rd_kafka_cgrp_update_subscribed_topics(rkcg, tinfos);
5475+
}
54185476

54195477
/**
54205478
* @brief Handle a new subscription that is modifying an existing subscription
@@ -5427,11 +5485,10 @@ rd_kafka_cgrp_modify_subscription(rd_kafka_cgrp_t *rkcg,
54275485
rd_kafka_topic_partition_list_t *rktparlist) {
54285486
rd_kafka_topic_partition_list_t *unsubscribing_topics;
54295487
rd_kafka_topic_partition_list_t *revoking;
5430-
rd_list_t *tinfos;
5431-
rd_kafka_topic_partition_list_t *errored;
54325488
int metadata_age;
54335489
int old_cnt = rkcg->rkcg_subscription->cnt;
54345490
int32_t cgrp_subscription_version;
5491+
rd_bool_t changed = rd_false;
54355492

54365493
/* Topics in rkcg_subscribed_topics that don't match any pattern in
54375494
the new subscription. */
@@ -5486,27 +5543,10 @@ rd_kafka_cgrp_modify_subscription(rd_kafka_cgrp_t *rkcg,
54865543
if (unsubscribing_topics)
54875544
rd_kafka_topic_partition_list_destroy(unsubscribing_topics);
54885545

5489-
/* Create a list of the topics in metadata that matches the new
5490-
* subscription */
5491-
tinfos = rd_list_new(rkcg->rkcg_subscription->cnt,
5492-
rd_kafka_topic_info_destroy_free);
5493-
5494-
/* Unmatched topics will be added to the errored list. */
5495-
errored = rd_kafka_topic_partition_list_new(0);
54965546

5497-
if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION)
5498-
rd_kafka_metadata_topic_match(rkcg->rkcg_rk, tinfos,
5499-
rkcg->rkcg_subscription, errored);
5500-
else
5501-
rd_kafka_metadata_topic_filter(
5502-
rkcg->rkcg_rk, tinfos, rkcg->rkcg_subscription, errored);
5503-
5504-
/* Propagate consumer errors for any non-existent or errored topics.
5505-
* The function takes ownership of errored. */
5506-
rd_kafka_propagate_consumer_topic_errors(
5507-
rkcg, errored, "Subscribed topic not available");
5547+
changed = rd_kafka_cgrp_set_subscribed_topics_from_subscription(rkcg);
55085548

5509-
if (rd_kafka_cgrp_update_subscribed_topics(rkcg, tinfos) && !revoking) {
5549+
if (changed && !revoking) {
55105550
rd_kafka_cgrp_rejoin(rkcg, "Subscription modified");
55115551
return RD_KAFKA_RESP_ERR_NO_ERROR;
55125552
}
@@ -6900,7 +6940,6 @@ rd_kafka_cgrp_owned_but_not_exist_partitions(rd_kafka_cgrp_t *rkcg) {
69006940
return result;
69016941
}
69026942

6903-
69046943
/**
69056944
* @brief Check if the latest metadata affects the current subscription:
69066945
* - matched topic added
@@ -6912,9 +6951,7 @@ rd_kafka_cgrp_owned_but_not_exist_partitions(rd_kafka_cgrp_t *rkcg) {
69126951
*/
69136952
void rd_kafka_cgrp_metadata_update_check(rd_kafka_cgrp_t *rkcg,
69146953
rd_bool_t do_join) {
6915-
rd_list_t *tinfos;
6916-
rd_kafka_topic_partition_list_t *errored;
6917-
rd_bool_t changed;
6954+
rd_bool_t changed = rd_false;
69186955

69196956
rd_kafka_assert(NULL, thrd_is_current(rkcg->rkcg_rk->rk_thread));
69206957

@@ -6924,36 +6961,7 @@ void rd_kafka_cgrp_metadata_update_check(rd_kafka_cgrp_t *rkcg,
69246961
if (!rkcg->rkcg_subscription || rkcg->rkcg_subscription->cnt == 0)
69256962
return;
69266963

6927-
/*
6928-
* Unmatched topics will be added to the errored list.
6929-
*/
6930-
errored = rd_kafka_topic_partition_list_new(0);
6931-
6932-
/*
6933-
* Create a list of the topics in metadata that matches our subscription
6934-
*/
6935-
tinfos = rd_list_new(rkcg->rkcg_subscription->cnt,
6936-
rd_kafka_topic_info_destroy_free);
6937-
6938-
if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION)
6939-
rd_kafka_metadata_topic_match(rkcg->rkcg_rk, tinfos,
6940-
rkcg->rkcg_subscription, errored);
6941-
else
6942-
rd_kafka_metadata_topic_filter(
6943-
rkcg->rkcg_rk, tinfos, rkcg->rkcg_subscription, errored);
6944-
6945-
6946-
/*
6947-
* Propagate consumer errors for any non-existent or errored topics.
6948-
* The function takes ownership of errored.
6949-
*/
6950-
rd_kafka_propagate_consumer_topic_errors(
6951-
rkcg, errored, "Subscribed topic not available");
6952-
6953-
/*
6954-
* Update effective list of topics (takes ownership of \c tinfos)
6955-
*/
6956-
changed = rd_kafka_cgrp_update_subscribed_topics(rkcg, tinfos);
6964+
changed = rd_kafka_cgrp_set_subscribed_topics_from_subscription(rkcg);
69576965

69586966
if (!do_join ||
69596967
(!changed &&

src/rdkafka_cgrp.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,17 @@ typedef struct rd_kafka_cgrp_s {
361361

362362
rd_atomic32_t rkcg_subscription_version; /**< Subscription version */
363363

364+
struct {
365+
int32_t subscription_version; /**< Version of
366+
* rkcg_subscription
367+
* at time of
368+
* last change. */
369+
rd_ts_t rk_ts_metadata; /**< Timestamp of
370+
* last metadata
371+
* request at last
372+
* change. */
373+
} rkcg_subscribed_topics_cache;
374+
364375
/* Protected by rd_kafka_*lock() */
365376
struct {
366377
rd_ts_t ts_rebalance; /* Timestamp of

0 commit comments

Comments
 (0)