@@ -158,6 +158,7 @@ _TEST_DECL(0045_subscribe_update_non_exist_and_partchange);
158158_TEST_DECL (0045 _subscribe_update_mock );
159159_TEST_DECL (0045 _subscribe_update_racks_mock );
160160_TEST_DECL (0045 _resubscribe_with_regex );
161+ _TEST_DECL (0045 _subscribe_many_updates );
161162_TEST_DECL (0046 _rkt_cache );
162163_TEST_DECL (0047 _partial_buf_tmout );
163164_TEST_DECL (0048 _partitioner );
@@ -388,6 +389,7 @@ struct test tests[] = {
388389 _TEST (0045 _subscribe_update_mock , TEST_F_LOCAL ),
389390 _TEST (0045 _subscribe_update_racks_mock , TEST_F_LOCAL ),
390391 _TEST (0045 _resubscribe_with_regex , 0 , TEST_BRKVER (0 , 9 , 0 , 0 )),
392+ _TEST (0045 _subscribe_many_updates , 0 , TEST_BRKVER (0 , 10 , 2 , 0 )),
391393 _TEST (0046 _rkt_cache , TEST_F_LOCAL ),
392394 _TEST (0047 _partial_buf_tmout , TEST_F_KNOWN_ISSUE ),
393395 _TEST (0048 _partitioner ,
@@ -3114,6 +3116,106 @@ void test_consumer_verify_assignment0(const char *func,
31143116 rd_kafka_topic_partition_list_destroy (assignment );
31153117}
31163118
3119+ /**
3120+ * @brief Verify that the consumer's assignment matches the expected assignment.
3121+ * passed as a topic partition list in \p expected_assignment .
3122+ */
3123+ rd_bool_t test_consumer_verify_assignment_topic_partition_list0 (
3124+ const char * func ,
3125+ int line ,
3126+ rd_kafka_t * rk ,
3127+ const rd_kafka_topic_partition_list_t * expected_assignment ) {
3128+ rd_kafka_topic_partition_list_t * assignment ,
3129+ * expected_assignment_copy = NULL ;
3130+ rd_kafka_resp_err_t err ;
3131+ int i ;
3132+ rd_bool_t ret = rd_true ;
3133+
3134+ if ((err = rd_kafka_assignment (rk , & assignment )))
3135+ TEST_FAIL ("%s:%d: Failed to get assignment for %s: %s" , func ,
3136+ line , rd_kafka_name (rk ), rd_kafka_err2str (err ));
3137+
3138+ TEST_SAYL (4 , "%s assignment (%d partition(s)):\n" , rd_kafka_name (rk ),
3139+ assignment -> cnt );
3140+ for (i = 0 ; i < assignment -> cnt ; i ++ )
3141+ TEST_SAYL (4 , " %s [%" PRId32 "]\n" , assignment -> elems [i ].topic ,
3142+ assignment -> elems [i ].partition );
3143+
3144+ if (assignment -> cnt != expected_assignment -> cnt ) {
3145+ ret = rd_false ;
3146+ goto done ;
3147+ }
3148+
3149+ expected_assignment_copy =
3150+ rd_kafka_topic_partition_list_copy (expected_assignment );
3151+ rd_kafka_topic_partition_list_sort (assignment , NULL , NULL );
3152+ rd_kafka_topic_partition_list_sort (expected_assignment_copy , NULL ,
3153+ NULL );
3154+
3155+ for (i = 0 ; i < assignment -> cnt ; i ++ ) {
3156+ if (strcmp (assignment -> elems [i ].topic ,
3157+ expected_assignment_copy -> elems [i ].topic ) ||
3158+ assignment -> elems [i ].partition !=
3159+ expected_assignment_copy -> elems [i ].partition ) {
3160+ ret = rd_false ;
3161+ goto done ;
3162+ }
3163+ }
3164+
3165+ done :
3166+ RD_IF_FREE (expected_assignment_copy ,
3167+ rd_kafka_topic_partition_list_destroy );
3168+ rd_kafka_topic_partition_list_destroy (assignment );
3169+ return ret ;
3170+ }
3171+
3172+ /**
3173+ * @brief Wait until the consumer's assignment matches the expected assignment.
3174+ * passed as a topic partition list in \p expected_assignment .
3175+ * Polling if \p do_poll is true, otherwise sleeps.
3176+ * Until \p timeout_ms milliseconds.
3177+ */
3178+ void test_consumer_wait_assignment_topic_partition_list0 (
3179+ const char * func ,
3180+ int line ,
3181+ rd_kafka_t * rk ,
3182+ rd_bool_t do_poll ,
3183+ const rd_kafka_topic_partition_list_t * expected_assignment ,
3184+ int timeout_ms ) {
3185+ int i ;
3186+ rd_ts_t end = test_clock () + timeout_ms * 1000 ;
3187+ rd_bool_t verified = rd_false ;
3188+
3189+ TEST_SAY ("Verifying assignment\n" );
3190+ TEST_SAYL (4 , "%s expected assignment (%d partition(s)):\n" ,
3191+ rd_kafka_name (rk ), expected_assignment -> cnt );
3192+ for (i = 0 ; i < expected_assignment -> cnt ; i ++ )
3193+ TEST_SAYL (4 , " %s [%" PRId32 "]\n" ,
3194+ expected_assignment -> elems [i ].topic ,
3195+ expected_assignment -> elems [i ].partition );
3196+
3197+ do {
3198+ verified =
3199+ test_consumer_verify_assignment_topic_partition_list0 (
3200+ func , line , rk , expected_assignment );
3201+ if (verified )
3202+ break ;
3203+
3204+ if (do_poll )
3205+ test_consumer_poll_once (rk , NULL , 100 );
3206+ else
3207+ rd_usleep (100 * 1000 , NULL );
3208+
3209+ } while (test_clock () < end );
3210+
3211+ if (!verified ) {
3212+ TEST_FAIL (
3213+ "%s:%d: Expected assignment not found in %s's "
3214+ "assignment within timeout %d" ,
3215+ func , line , rd_kafka_name (rk ), timeout_ms );
3216+ }
3217+ TEST_SAY ("Verified assignment\n" );
3218+ }
31173219
31183220
31193221/**
0 commit comments