@@ -1216,6 +1216,7 @@ rd_kafka_parse_Metadata_admin(rd_kafka_broker_t *rkb,
12161216 "(admin request)" );
12171217}
12181218
1219+ typedef RD_MAP_TYPE (const char * , const char * ) map_str_str_t ;
12191220
12201221/**
12211222 * @brief Add all topics in current cached full metadata
@@ -1226,7 +1227,7 @@ rd_kafka_parse_Metadata_admin(rd_kafka_broker_t *rkb,
12261227 * an available topic will be added to this list with
12271228 * the appropriate error set.
12281229 *
1229- * @returns the number of topics matched and added to \p list
1230+ * @returns the number of topics matched and added to \p tinfos
12301231 *
12311232 * @locks none
12321233 * @locality any
@@ -1236,13 +1237,16 @@ rd_kafka_metadata_topic_match(rd_kafka_t *rk,
12361237 rd_list_t * tinfos ,
12371238 const rd_kafka_topic_partition_list_t * match ,
12381239 rd_kafka_topic_partition_list_t * errored ) {
1239- int ti , i ;
1240+ int i ;
12401241 size_t cnt = 0 ;
12411242 rd_kafka_topic_partition_list_t * unmatched ;
1242- rd_list_t cached_topics ;
1243- const char * topic ;
1243+ const struct rd_kafka_metadata_cache_entry * rkmce ;
1244+ map_str_str_t map ;
12441245
12451246 rd_kafka_rdlock (rk );
1247+ map = (map_str_str_t )RD_MAP_INITIALIZER (
1248+ 0 , rd_map_str_cmp , rd_map_str_hash , NULL /* topic list element */ ,
1249+ NULL /* topic list element */ );
12461250 /* To keep track of which patterns and topics in `match` that
12471251 * did not match any topic (or matched an errored topic), we
12481252 * create a set of all topics to match in `unmatched` and then
@@ -1253,15 +1257,24 @@ rd_kafka_metadata_topic_match(rd_kafka_t *rk,
12531257
12541258 /* For each topic in the cluster, scan through the match list
12551259 * to find matching topic. */
1256- rd_list_init (& cached_topics , rk -> rk_metadata_cache .rkmc_cnt , rd_free );
1257- rd_kafka_metadata_cache_topics_to_list (rk , & cached_topics , rd_false );
1258- RD_LIST_FOREACH (topic , & cached_topics , ti ) {
1260+ TAILQ_FOREACH (rkmce , & rk -> rk_metadata_cache .rkmc_expiry , rkmce_link ) {
12591261 const rd_kafka_metadata_topic_internal_t * mdti ;
1260- const rd_kafka_metadata_topic_t * mdt =
1261- rd_kafka_metadata_cache_topic_get (rk , topic , & mdti ,
1262- rd_true /* valid */ );
1263- if (!mdt )
1262+ const rd_kafka_metadata_topic_t * mdt ;
1263+ const char * topic = rkmce -> rkmce_mtopic .topic ;
1264+ rd_bool_t matched = rd_false ;
1265+
1266+ if (!topic )
1267+ continue ;
1268+ if (RD_MAP_GET (& map , topic ))
1269+ /* We could have multiple cache entries
1270+ * with different topic id and same topic name
1271+ * in some cases */
12641272 continue ;
1273+ RD_MAP_SET (& map , topic , topic );
1274+
1275+ mdt = & rkmce -> rkmce_mtopic ;
1276+ mdti = & rkmce -> rkmce_metadata_internal_topic ;
1277+
12651278
12661279 /* Ignore topics in blacklist */
12671280 if (rk -> rk_conf .topic_blacklist &&
@@ -1279,6 +1292,15 @@ rd_kafka_metadata_topic_match(rd_kafka_t *rk,
12791292 unmatched , match -> elems [i ].topic ,
12801293 RD_KAFKA_PARTITION_UA );
12811294
1295+ if (matched )
1296+ /*
1297+ * Just remove it from unmatched.
1298+ * Topic was already added to
1299+ * `tinfos` or `errored`.
1300+ */
1301+ continue ;
1302+ matched = rd_true ;
1303+
12821304 if (mdt -> err ) {
12831305 rd_kafka_topic_partition_list_add (
12841306 errored , topic , RD_KAFKA_PARTITION_UA )
@@ -1293,6 +1315,7 @@ rd_kafka_metadata_topic_match(rd_kafka_t *rk,
12931315 cnt ++ ;
12941316 }
12951317 }
1318+ RD_MAP_DESTROY (& map );
12961319 rd_kafka_rdunlock (rk );
12971320
12981321 /* Any topics/patterns still in unmatched did not match any
@@ -1306,7 +1329,6 @@ rd_kafka_metadata_topic_match(rd_kafka_t *rk,
13061329 }
13071330
13081331 rd_kafka_topic_partition_list_destroy (unmatched );
1309- rd_list_destroy (& cached_topics );
13101332
13111333 return cnt ;
13121334}
0 commit comments