Skip to content

Commit 7ef6c44

Browse files
SavonitarAHeise
authored andcommitted
[hotfix] Wait for leader election to prevent OUT_OF_ORDER issues
1 parent cdbd635 commit 7ef6c44

File tree

1 file changed

+10
-1
lines changed

1 file changed

+10
-1
lines changed

flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.kafka.clients.admin.TopicListing;
2929
import org.apache.kafka.clients.consumer.KafkaConsumer;
3030
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
31+
import org.apache.kafka.common.Node;
3132
import org.apache.kafka.common.TopicPartition;
3233
import org.slf4j.Logger;
3334
import org.slf4j.LoggerFactory;
@@ -164,7 +165,15 @@ public static void createNewTopic(
164165
return false;
165166
}
166167
TopicDescription topicDescription = topicDescriptions.get(topic);
167-
return topicDescription.partitions().size() == numberOfPartitions;
168+
if (topicDescription.partitions().size() != numberOfPartitions) {
169+
return false;
170+
}
171+
// Ensure all partitions have a leader elected.
172+
return topicDescription.partitions().stream()
173+
.allMatch(
174+
p ->
175+
p.leader() != null
176+
&& p.leader().id() != Node.noNode().id());
168177
},
169178
Duration.ofSeconds(30),
170179
String.format("New topic \"%s\" is not ready within timeout", topicObj));

0 commit comments

Comments
 (0)