Skip to content

Commit 7fec122

Browse files
committed
Implement topic deletion
1 parent 3222949 commit 7fec122

File tree

2 files changed

+128
-122
lines changed

2 files changed

+128
-122
lines changed

README.md

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,13 @@ topics:
2727
name: my-topic1
2828
partitions: 3
2929
- name: my-topic2
30-
partitions: 3
31-
configs:
32-
retention.ms: 30000
30+
state: absent
3331
- name: my-topic3
3432
partitions: 1
3533
replication_factor: 1
3634
configs:
3735
min.insync.replicas: 'default'
36+
retention.ms: 30000
3837
acls:
3938
- principal: 'User:test1'
4039
permissions:
@@ -52,9 +51,10 @@ acls:
5251
```
5352
5453
The format is quite evident. Just few remarks:
55-
* The topic config values are always strings
54+
* The topic config values are always strings, while *partitions* and *replication_factor* are always numeric
5655
* The topic config value can be set to *default*. This will remove the per-topic setting and the topic will be using the cluster default value
5756
* *replication_factor* for topic is optional. If utility will need to create the topic and this setting will not be defined then it will be set to 1 on single-node clusters and to 2 on multi-node clusters
57+
* The parameter *state=absent* can be used for deleting topics if they present. Any value other than *absent* is considered as *present*
5858
* The ACL operation is described as *OperationType:Host*
5959
6060
kafka-cluster-example2.json:
@@ -124,28 +124,28 @@ kafka-cluster-example2.json:
124124
output:
125125
```
126126
TASK [TOPIC : Create topic my-topic1 (partitions=3, replicas=1)] ***********************
127-
ok: [cy-selenium.quotix.io:9092]
127+
ok: [kafka1.cluster.local:9092]
128128
129-
TASK [TOPIC : Create topic my-topic2 (partitions=3, replicas=1)] ***********************
130-
ok: [cy-selenium.quotix.io:9092]
129+
TASK [TOPIC : Delete topic my-topic2 ***************************************************
130+
ok: [kafka1.cluster.local:9092]
131131
132132
TASK [TOPIC : Create topic my-topic3 (partitions=1, replicas=1)] ***********************
133-
ok: [cy-selenium.quotix.io:9092]
133+
ok: [kafka1.cluster.local:9092]
134134
135135
TASK [ACL : Create ACL (ALLOW User:test1@* to READ topic:PREFIXED:my-)] ****************
136-
changed: [cy-selenium.quotix.io:9092]
136+
changed: [kafka1.cluster.local:9092]
137137
138138
TASK [ACL : Create ACL (ALLOW User:test1@* to WRITE topic:PREFIXED:my-)] ***************
139-
changed: [cy-selenium.quotix.io:9092]
139+
changed: [kafka1.cluster.local:9092]
140140
141141
TASK [ACL : Create ACL (ALLOW User:test1@* to DESCRIBE topic:PREFIXED:my-)] ************
142-
changed: [cy-selenium.quotix.io:9092]
142+
changed: [kafka1.cluster.local:9092]
143143
144144
TASK [ACL : Create ACL (ALLOW User:test1@* to READ group:LITERAL:my-group)] ************
145-
changed: [cy-selenium.quotix.io:9092]
145+
changed: [kafka1.cluster.local:9092]
146146
147147
TASK [ACL : Create ACL (DENY User:test1@* to DESCRIBE group:LITERAL:my-group)] *********
148-
changed: [cy-selenium.quotix.io:9092]
148+
changed: [kafka1.cluster.local:9092]
149149
150150
SUMMARY ********************************************************************************
151151

kafka-ops.go

Lines changed: 115 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type Topic struct {
4141
Partitions int `yaml:"partitions" json:"partitions"`
4242
ReplicationFactor int `yaml:"replication_factor" json:"replication_factor"`
4343
Configs map[string]string `yaml:"configs" json:"configs"`
44+
State string `yaml:"state" json:"state"`
4445
}
4546

4647
type Acl struct {
@@ -129,8 +130,8 @@ func init() {
129130

130131
func main() {
131132
defer handleExit()
132-
133133
//defer fmt.Println("closed")
134+
134135
if actionApply {
135136
admin := connectToKafkaCluster()
136137
defer func() { _ = (*admin).Close() }()
@@ -343,90 +344,91 @@ func applySpecFile(admin *sarama.ClusterAdmin) error {
343344

344345
// Iterate over topics
345346
for _, topic := range spec.Topics {
346-
if topic.ReplicationFactor < 1 {
347-
topic.ReplicationFactor = autoReplicationFactor
347+
if topic.State == "absent" {
348+
fmt.Printf("TASK [TOPIC : Delete topic %s] %s\n", topic.Name, strings.Repeat("*", 52))
349+
} else {
350+
fmt.Printf("TASK [TOPIC : Create topic %s (partitions=%d, replicas=%d)] %s\n", topic.Name, topic.Partitions, topic.ReplicationFactor, strings.Repeat("*", 25))
348351
}
349-
fmt.Printf("TASK [TOPIC : Create topic %s (partitions=%d, replicas=%d)] %s\n", topic.Name, topic.Partitions, topic.ReplicationFactor, strings.Repeat("*", 25))
350352
currentTopic, found := currentTopics[topic.Name]
351353

352354
if !found {
353-
// Topic doesn't exist - need to create one
354-
err := createTopic(topic, admin)
355-
if err != nil {
356-
printResult(Error, broker, err.Error(), topic)
357-
numError++
358-
if errorStop {
359-
break
360-
} else {
361-
continue
355+
// Topic doesn't exist - need to create one or no need to delete
356+
if topic.State == "absent" {
357+
printResult(Ok, broker, "", topic)
358+
numOk++
359+
} else {
360+
if topic.ReplicationFactor < 1 {
361+
topic.ReplicationFactor = autoReplicationFactor
362+
}
363+
err := createTopic(topic, admin)
364+
if err != nil {
365+
printResult(Error, broker, err.Error(), topic)
366+
numError++
367+
if errorStop { break } else { continue }
362368
}
369+
printResult(Changed, broker, "", topic)
370+
numChanged++
363371
}
364-
printResult(Changed, broker, "", topic)
365-
numChanged++
366372
} else {
367373
// Topic exists
368-
var topicAltered bool = false
369-
var topicConfigAlterNeeded = false
370-
// Check the replication-factor
371-
if int16(topic.ReplicationFactor) != currentTopic.ReplicationFactor {
372-
printResult(Error, broker, "Cannot change replication-factor. Consider doing it manually with kafka-reassign-partitions utility or re-creating the topic", topic)
373-
numError++
374-
if errorStop {
375-
break
376-
} else {
377-
continue
378-
}
379-
}
380-
381-
// Check the partitions count
382-
if int32(topic.Partitions) != currentTopic.NumPartitions {
383-
err := alterNumPartitions(topic.Name, admin, topic.Partitions)
374+
if topic.State == "absent" {
375+
err := deleteTopic(topic.Name, admin)
384376
if err != nil {
385377
printResult(Error, broker, err.Error(), topic)
386378
numError++
387-
if errorStop {
388-
break
389-
} else {
390-
continue
379+
if errorStop { break } else { continue }
380+
}
381+
printResult(Changed, broker, "", topic)
382+
numChanged++
383+
} else {
384+
var topicAltered bool = false
385+
var topicConfigAlterNeeded = false
386+
// Check the replication-factor
387+
if topic.ReplicationFactor > 0 && int16(topic.ReplicationFactor) != currentTopic.ReplicationFactor {
388+
printResult(Error, broker, "Cannot change replication-factor. Consider doing it manually with kafka-reassign-partitions utility or re-creating the topic", topic)
389+
numError++
390+
if errorStop { break } else { continue }
391+
}
392+
// Check the partitions count
393+
if int32(topic.Partitions) != currentTopic.NumPartitions {
394+
err := alterNumPartitions(topic.Name, admin, topic.Partitions)
395+
if err != nil {
396+
printResult(Error, broker, err.Error(), topic)
397+
numError++
398+
if errorStop { break } else { continue }
391399
}
400+
topicAltered = true
392401
}
393-
topicAltered = true
394-
}
395-
// Check the configs
396-
for key, val := range topic.Configs {
397-
currentVal, found := currentTopic.ConfigEntries[key]
398-
if found {
399-
if val != *currentVal {
402+
// Check the configs
403+
for key, val := range topic.Configs {
404+
currentVal, found := currentTopic.ConfigEntries[key]
405+
if found {
406+
if val != *currentVal {
407+
topicConfigAlterNeeded = true
408+
break
409+
}
410+
} else if val != "default" {
400411
topicConfigAlterNeeded = true
401-
//fmt.Printf("Current %s: %#v New: %#v\n", key, *currentVal, val)
402412
break
403413
}
404-
} else if val != "default" {
405-
topicConfigAlterNeeded = true
406-
//fmt.Printf("New %s: %#v\n", key, val)
407-
break
408414
}
409-
}
410-
if topicConfigAlterNeeded {
411-
topic,err = alterTopicConfig(topic, admin, currentTopic.ConfigEntries)
412-
if err != nil {
413-
printResult(Error, broker, err.Error(), topic)
414-
numError++
415-
if errorStop {
416-
break
417-
} else {
418-
continue
415+
if topicConfigAlterNeeded {
416+
topic,err = alterTopicConfig(topic, admin, currentTopic.ConfigEntries)
417+
if err != nil {
418+
printResult(Error, broker, err.Error(), topic)
419+
numError++
420+
if errorStop { break } else { continue }
419421
}
422+
topicAltered = true
420423
}
421-
topicAltered = true
422-
}
423424

424-
if topicAltered {
425-
printResult(Changed, broker, "", topic)
426-
numChanged++
427-
} else {
428-
printResult(Ok, broker, "", topic)
429-
numOk++
425+
if topicAltered {
426+
printResult(Changed, broker, "", topic)
427+
numChanged++
428+
} else {
429+
printResult(Ok, broker, "", topic)
430+
numOk++
431+
}
430432
}
431433
}
432434
}
@@ -484,7 +486,7 @@ func applySpecFile(admin *sarama.ClusterAdmin) error {
484486

485487
func createAclIfNotExists(admin *sarama.ClusterAdmin, acls *[]sarama.ResourceAcls, p sarama.AclPermissionType, principal string, resource Resource, oh OperationHost) (string, error){
486488
fmt.Printf("TASK [ACL : Create ACL (%s %s@%s to %s %s:%s:%s)] %s\n",
487-
aclPermissionTypeToString(p), principal, oh.Host(), oh.OperationName(), resource.Type, resource.PatternType, resource.Pattern, strings.Repeat("*", 25))
489+
aclPermissionTypeToString(p), principal, oh.Host(), oh.OperationName(), resource.Type, resource.PatternType, resource.Pattern, strings.Repeat("*", 25))
488490
if aclExists(acls, p, principal, oh.OperationName(), resource.Type, resource.PatternType, resource.Pattern, oh.Host()) {
489491
return Ok, nil
490492
} else {
@@ -508,14 +510,14 @@ func aclExists(a *[]sarama.ResourceAcls, p sarama.AclPermissionType, principal s
508510
for _, resourceAcls := range *a {
509511
for _, currentAcl := range resourceAcls.Acls {
510512
if principal == currentAcl.Principal &&
511-
strings.ToUpper(operation) == aclOperationToString(currentAcl.Operation) &&
512-
strings.ToLower(resource) == aclResourceTypeToString(resourceAcls.Resource.ResourceType) &&
513-
strings.ToUpper(patternType) == aclResourcePatternTypeToString(resourceAcls.Resource.ResourcePatternType) &&
514-
pattern == resourceAcls.Resource.ResourceName &&
515-
host == currentAcl.Host &&
516-
p == currentAcl.PermissionType {
517-
return true
518-
}
513+
strings.ToUpper(operation) == aclOperationToString(currentAcl.Operation) &&
514+
strings.ToLower(resource) == aclResourceTypeToString(resourceAcls.Resource.ResourceType) &&
515+
strings.ToUpper(patternType) == aclResourcePatternTypeToString(resourceAcls.Resource.ResourcePatternType) &&
516+
pattern == resourceAcls.Resource.ResourceName &&
517+
host == currentAcl.Host &&
518+
p == currentAcl.PermissionType {
519+
return true
520+
}
519521
}
520522
}
521523
return false
@@ -565,22 +567,26 @@ func alterTopicConfig(topic Topic, clusterAdmin *sarama.ClusterAdmin, currentCon
565567
return topic, err
566568
}
567569

568-
func createTopic(topic Topic, clusterAdmin *sarama.ClusterAdmin) error {
569-
admin := *clusterAdmin
570+
func createTopic(topic Topic, admin *sarama.ClusterAdmin) error {
570571
configEntries := make(map[string]*string)
571572
for key, val := range topic.Configs {
572573
if val != "default" {
573574
configEntries[key] = getPtr(topic.Configs[key])
574575
}
575576
}
576-
err := admin.CreateTopic(topic.Name, &sarama.TopicDetail{
577+
err := (*admin).CreateTopic(topic.Name, &sarama.TopicDetail{
577578
NumPartitions: int32(topic.Partitions),
578579
ReplicationFactor: int16(topic.ReplicationFactor),
579580
ConfigEntries: configEntries,
580581
}, false)
581582
return err
582583
}
583584

585+
func deleteTopic(topic string, admin *sarama.ClusterAdmin) error {
586+
err := (*admin).DeleteTopic(topic)
587+
return err
588+
}
589+
584590
func printResult(color string, broker string, msg string, debug interface{}) {
585591
var status string
586592
switch color {
@@ -767,38 +773,38 @@ func aclPermissionTypeToString(permissionType sarama.AclPermissionType) string {
767773

768774
func usage() {
769775
usage := `Manage Kafka cluster resources (topics and ACLs)
770-
Usage: %s <action> [<options>] [<broker connection options>]
771-
----------------
772-
Actions
773-
--help Show this help and exit
774-
--dump Dump cluster resources and their configs to stdout
775-
See also --json and --yaml options
776-
--apply Idempotently align cluster resources with the spec manifest
777-
See also --spec, --json and --yaml options
778-
----------------
779-
Options
780-
--spec A path to manifest (specification file) to be used
781-
with --apply action
782-
Can be also set by Env variable KAFKA_SPEC_FILE
783-
--yaml Spec-file is in YAML format
784-
Will try to detect format if none of --yaml or --json is set
785-
--json Spec-file is in JSON format
786-
Will try to detect format if none of --yaml or --json is set
787-
--verbose Verbose output
788-
--stop-on-error Exit on first occurred error
789-
----------------
790-
Broker connection options
791-
--broker Bootstrap-brokers, comma-separated. Default is localhost:9092
792-
Can be also set by Env variable KAFKA_BROKER
793-
--protocol Security protocol. Default is plaintext
794-
Available options: plaintext, sasl_ssl, sasl_plaintext
795-
--mechanism SASL mechanism. Default is scram-sha-256
796-
Available options: scram-sha-256, scram-sha-512
797-
--username Username for authentication
798-
Can be also set by Env variable KAFKA_USERNAME
799-
--password Password for authentication
800-
Can be also set by Env variable KAFKA_PASSWORD
801-
`
776+
Usage: %s <action> [<options>] [<broker connection options>]
777+
----------------
778+
Actions
779+
--help Show this help and exit
780+
--dump Dump cluster resources and their configs to stdout
781+
See also --json and --yaml options
782+
--apply Idempotently align cluster resources with the spec manifest
783+
See also --spec, --json and --yaml options
784+
----------------
785+
Options
786+
--spec A path to manifest (specification file) to be used
787+
with --apply action
788+
Can be also set by Env variable KAFKA_SPEC_FILE
789+
--yaml Spec-file is in YAML format
790+
Will try to detect format if none of --yaml or --json is set
791+
--json Spec-file is in JSON format
792+
Will try to detect format if none of --yaml or --json is set
793+
--verbose Verbose output
794+
--stop-on-error Exit on first occurred error
795+
----------------
796+
Broker connection options
797+
--broker Bootstrap-brokers, comma-separated. Default is localhost:9092
798+
Can be also set by Env variable KAFKA_BROKER
799+
--protocol Security protocol. Default is plaintext
800+
Available options: plaintext, sasl_ssl, sasl_plaintext
801+
--mechanism SASL mechanism. Default is scram-sha-256
802+
Available options: scram-sha-256, scram-sha-512
803+
--username Username for authentication
804+
Can be also set by Env variable KAFKA_USERNAME
805+
--password Password for authentication
806+
Can be also set by Env variable KAFKA_PASSWORD
807+
`
802808

803809
fmt.Fprintf(os.Stderr, usage, os.Args[0])
804810
//flag.PrintDefaults()

0 commit comments

Comments
 (0)