Skip to content

Commit f34492b

Browse files
committed
Remove performance regression when we're subscribing with a regex and none of many topics in the cluster are matching it
1 parent be00619 commit f34492b

File tree

1 file changed

+34
-12
lines changed

1 file changed

+34
-12
lines changed

src/rdkafka_metadata.c

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1216,6 +1216,7 @@ rd_kafka_parse_Metadata_admin(rd_kafka_broker_t *rkb,
12161216
"(admin request)");
12171217
}
12181218

1219+
typedef RD_MAP_TYPE(const char *, const char *) map_str_str_t;
12191220

12201221
/**
12211222
* @brief Add all topics in current cached full metadata
@@ -1226,7 +1227,7 @@ rd_kafka_parse_Metadata_admin(rd_kafka_broker_t *rkb,
12261227
* an available topic will be added to this list with
12271228
* the appropriate error set.
12281229
*
1229-
* @returns the number of topics matched and added to \p list
1230+
* @returns the number of topics matched and added to \p tinfos
12301231
*
12311232
* @locks none
12321233
* @locality any
@@ -1236,13 +1237,16 @@ rd_kafka_metadata_topic_match(rd_kafka_t *rk,
12361237
rd_list_t *tinfos,
12371238
const rd_kafka_topic_partition_list_t *match,
12381239
rd_kafka_topic_partition_list_t *errored) {
1239-
int ti, i;
1240+
int i;
12401241
size_t cnt = 0;
12411242
rd_kafka_topic_partition_list_t *unmatched;
1242-
rd_list_t cached_topics;
1243-
const char *topic;
1243+
const struct rd_kafka_metadata_cache_entry *rkmce;
1244+
map_str_str_t map;
12441245

12451246
rd_kafka_rdlock(rk);
1247+
map = (map_str_str_t)RD_MAP_INITIALIZER(
1248+
0, rd_map_str_cmp, rd_map_str_hash, NULL /* topic list element */,
1249+
NULL /* topic list element */);
12461250
/* To keep track of which patterns and topics in `match` that
12471251
* did not match any topic (or matched an errored topic), we
12481252
* create a set of all topics to match in `unmatched` and then
@@ -1253,15 +1257,24 @@ rd_kafka_metadata_topic_match(rd_kafka_t *rk,
12531257

12541258
/* For each topic in the cluster, scan through the match list
12551259
* to find matching topic. */
1256-
rd_list_init(&cached_topics, rk->rk_metadata_cache.rkmc_cnt, rd_free);
1257-
rd_kafka_metadata_cache_topics_to_list(rk, &cached_topics, rd_false);
1258-
RD_LIST_FOREACH(topic, &cached_topics, ti) {
1260+
TAILQ_FOREACH(rkmce, &rk->rk_metadata_cache.rkmc_expiry, rkmce_link) {
12591261
const rd_kafka_metadata_topic_internal_t *mdti;
1260-
const rd_kafka_metadata_topic_t *mdt =
1261-
rd_kafka_metadata_cache_topic_get(rk, topic, &mdti,
1262-
rd_true /* valid */);
1263-
if (!mdt)
1262+
const rd_kafka_metadata_topic_t *mdt;
1263+
const char *topic = rkmce->rkmce_mtopic.topic;
1264+
rd_bool_t matched = rd_false;
1265+
1266+
if (!topic)
1267+
continue;
1268+
if (RD_MAP_GET(&map, topic))
1269+
/* We could have multiple cache entries
1270+
* with different topic id and same topic name
1271+
* in some cases */
12641272
continue;
1273+
RD_MAP_SET(&map, topic, topic);
1274+
1275+
mdt = &rkmce->rkmce_mtopic;
1276+
mdti = &rkmce->rkmce_metadata_internal_topic;
1277+
12651278

12661279
/* Ignore topics in blacklist */
12671280
if (rk->rk_conf.topic_blacklist &&
@@ -1279,6 +1292,15 @@ rd_kafka_metadata_topic_match(rd_kafka_t *rk,
12791292
unmatched, match->elems[i].topic,
12801293
RD_KAFKA_PARTITION_UA);
12811294

1295+
if (matched)
1296+
/*
1297+
* Just remove it from unmatched.
1298+
* Topic was already add to
1299+
* `tinfos` or `errored`.
1300+
*/
1301+
continue;
1302+
matched = rd_true;
1303+
12821304
if (mdt->err) {
12831305
rd_kafka_topic_partition_list_add(
12841306
errored, topic, RD_KAFKA_PARTITION_UA)
@@ -1293,6 +1315,7 @@ rd_kafka_metadata_topic_match(rd_kafka_t *rk,
12931315
cnt++;
12941316
}
12951317
}
1318+
RD_MAP_DESTROY(&map);
12961319
rd_kafka_rdunlock(rk);
12971320

12981321
/* Any topics/patterns still in unmatched did not match any
@@ -1306,7 +1329,6 @@ rd_kafka_metadata_topic_match(rd_kafka_t *rk,
13061329
}
13071330

13081331
rd_kafka_topic_partition_list_destroy(unmatched);
1309-
rd_list_destroy(&cached_topics);
13101332

13111333
return cnt;
13121334
}

0 commit comments

Comments
 (0)