@@ -419,6 +419,70 @@ func TestBrokerClientAddPartitions(t *testing.T) {
419419 assert .Equal (t , []int {6 , 1 }, topicInfo .Partitions [4 ].Replicas )
420420}
421421
422+ func TestBrokerDeleteTopic (t * testing.T ) {
423+ if ! util .CanTestBrokerAdmin () {
424+ t .Skip ("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN is not set" )
425+ }
426+
427+ ctx := context .Background ()
428+ client , err := NewBrokerAdminClient (
429+ ctx ,
430+ BrokerAdminClientConfig {
431+ ConnectorConfig : ConnectorConfig {
432+ BrokerAddr : util .TestKafkaAddr (),
433+ },
434+ },
435+ )
436+ require .NoError (t , err )
437+
438+ topicName := util .RandomString ("topic-delete-" , 6 )
439+ err = client .CreateTopic (
440+ ctx ,
441+ kafka.TopicConfig {
442+ Topic : topicName ,
443+ NumPartitions : - 1 ,
444+ ReplicationFactor : - 1 ,
445+ ReplicaAssignments : []kafka.ReplicaAssignment {
446+ {
447+ Partition : 0 ,
448+ Replicas : []int {1 , 2 },
449+ },
450+ {
451+ Partition : 1 ,
452+ Replicas : []int {2 , 3 },
453+ },
454+ {
455+ Partition : 2 ,
456+ Replicas : []int {3 , 4 },
457+ },
458+ },
459+ ConfigEntries : []kafka.ConfigEntry {
460+ {
461+ ConfigName : "flush.ms" ,
462+ ConfigValue : "2000" ,
463+ },
464+ {
465+ ConfigName : "retention.ms" ,
466+ ConfigValue : "10000000" ,
467+ },
468+ },
469+ },
470+ )
471+ require .NoError (t , err )
472+ util .RetryUntil (t , 5 * time .Second , func () error {
473+ _ , err := client .GetTopic (ctx , topicName , true )
474+ return err
475+ })
476+
477+ err = client .DeleteTopic (ctx , topicName )
478+ require .NoError (t , err )
479+
480+ time .Sleep (time .Second * 10 )
481+
482+ _ , err = client .GetTopic (ctx , topicName , false )
483+ require .Error (t , err )
484+ }
485+
422486func TestBrokerClientAlterAssignments (t * testing.T ) {
423487 if ! util .CanTestBrokerAdmin () {
424488 t .Skip ("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN is not set" )
0 commit comments