Skip to content

Commit 415213a

Browse files
committed
v.1.0.4, delete objects by pattern
1 parent ac8d347 commit 415213a

9 files changed

+308
-53
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
*.so
66
*.dylib
77
kafka-ops
8-
*.yaml
8+
tmp
99

1010
# RPM
1111
rpm-build

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
.PHONY: rpm clean source test coverage
22

3-
KAFKA_OPS_VERSION ?= 1.0.3
3+
KAFKA_OPS_VERSION ?= 1.0.4
44
BUILD_NUMBER ?= 1
55
KAFKA_OPS ?= kafka-ops
66
REPO ?= github.com/agapoff/${KAFKA_OPS}

README.md

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,51 @@ name: my-product.{{ if .Plant }}{{ .Plant }}{{ else }}default{{ end }}.{{ .Env }
221221
But Kafka-Ops fails if some unresolved template key is encountered. In order to override this behaviour use flag *--missingok*.
222222

223223

224+
## How to delete multiple topics and consumer groups by pattern
225+
226+
Kafka-Ops supports deleting the topics and consumer groups by patterns. Please refer to the Spec-file example showing how to achieve the goal:
227+
228+
kafka-cluster-example4.yaml
229+
```yaml
230+
---
231+
topics:
232+
- name: my_
233+
state: absent
234+
patternType: PREFIXED
235+
- name: topic[1-2]
236+
state: absent
237+
patternType: MATCH
238+
239+
consumer-groups:
240+
- name: my_
241+
state: absent
242+
patternType: PREFIXED
243+
- name: grou[a-z]
244+
state: absent
245+
patternType: MATCH
246+
- name: some.group
247+
state: absent
248+
```
249+
250+
Two pattern types are supported: *PREFIXED* (the object name must start with the string) and *MATCH* (the object name must match the defined regex). The third option is *LITERAL* which is default. Kafka-Ops looks through the list of topics and/or consumer groups and deletes the matched ones.
251+
252+
253+
## Defining broker connection settings via Spec-file
254+
255+
Kafka-Ops can read broker connection settings right from the Spec-file. This can be useful when the Spec is being templated by some third-party tool (e.g. by Helm). The settings can be defined as follows:
256+
257+
kafka-cluster-example5.yaml
258+
```yaml
259+
---
260+
connection:
261+
broker: kafka1.example.local:9093,kafka2.example.local:9093
262+
protocol: SASL_SSL
263+
mechanism: SCRAM-SHA-256
264+
username: admin
265+
password: admin-secret
266+
```
267+
268+
224269
## Full Usage
225270
226271
```

main.go

Lines changed: 169 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,12 @@ import (
1212
"gopkg.in/yaml.v2"
1313
"io/ioutil"
1414
"os"
15+
"regexp"
1516
"strings"
1617
"text/template"
1718
)
1819

19-
const version string = "1.0.3"
20+
const version string = "1.0.4"
2021

2122
var (
2223
broker string
@@ -42,9 +43,10 @@ type arrFlags []string
4243

4344
// Spec contains the full structure of the manifest
4445
type Spec struct {
45-
Topics []Topic `yaml:"topics" json:"topics"`
46-
Acls []Acl `yaml:"acls" json:"acls"`
47-
Connection Connection `yaml:"connection,omitempty" json:"connection,omitempty"`
46+
Topics []Topic `yaml:"topics" json:"topics"`
47+
Acls []Acl `yaml:"acls" json:"acls"`
48+
ConsumerGroups []ConsumerGroup `yaml:"consumer-groups,omitempty" json:"consumer-groups,omitempty"`
49+
Connection Connection `yaml:"connection,omitempty" json:"connection,omitempty"`
4850
}
4951

5052
// Topic describes single topic
@@ -54,6 +56,16 @@ type Topic struct {
5456
ReplicationFactor int `yaml:"replication_factor" json:"replication_factor"`
5557
Configs map[string]string `yaml:"configs" json:"configs"`
5658
State string `yaml:"state,omitempty" json:"state,omitempty"`
59+
PatternType string `yaml:"patternType,omitempty" json:"patternType,omitempty"`
60+
Matched []string `yaml:"matched,omitempty" json:"matched,omitempty"`
61+
}
62+
63+
// ConsumerGroup describes a consumer group to be deleted
64+
type ConsumerGroup struct {
65+
Name string `yaml:"name" json:"name"`
66+
State string `yaml:"state,omitempty" json:"state,omitempty"`
67+
PatternType string `yaml:"patternType,omitempty" json:"patternType,omitempty"`
68+
Matched []string `yaml:"matched,omitempty" json:"matched,omitempty"`
5769
}
5870

5971
// Acl describes single ACL
@@ -351,11 +363,52 @@ func applySpecFile() error {
351363

352364
// Iterate over topics
353365
for _, topic := range spec.Topics {
366+
354367
if topic.State == "absent" {
355-
fmt.Printf("TASK [TOPIC : Delete topic %s] %s\n", topic.Name, strings.Repeat("*", 52))
368+
topic.PatternType = strings.ToLower(topic.PatternType)
369+
if topic.PatternType == "prefixed" || topic.PatternType == "match" {
370+
// Delete topics by pattern
371+
fmt.Printf("TASK [TOPIC : Delete topics %s by %s] %s\n", topic.PatternType, topic.Name, strings.Repeat("*", 42))
372+
var currentState = Ok
373+
var currentError = ""
374+
for currentTopicName, _ := range currentTopics {
375+
var matched = false
376+
if topic.PatternType == "prefixed" {
377+
matched = strings.HasPrefix(currentTopicName, topic.Name)
378+
} else if topic.PatternType == "match" {
379+
matched, _ = regexp.MatchString(topic.Name, currentTopicName)
380+
}
381+
if matched {
382+
topic.Matched = append(topic.Matched, currentTopicName)
383+
err := deleteTopic(currentTopicName, admin)
384+
if err != nil {
385+
numError++
386+
currentState = Error
387+
currentError = err.Error()
388+
if errorStop {
389+
printResult(Error, broker, err.Error(), topic)
390+
break
391+
}
392+
} else {
393+
numChanged++
394+
if currentState != Error {
395+
currentState = Changed
396+
}
397+
}
398+
}
399+
}
400+
printResult(currentState, broker, currentError, topic)
401+
if currentState == Ok {
402+
numOk++
403+
}
404+
continue
405+
} else {
406+
fmt.Printf("TASK [TOPIC : Delete topic %s] %s\n", topic.Name, strings.Repeat("*", 52))
407+
}
356408
} else {
357409
topic.State = "present"
358410
}
411+
359412
currentTopic, found := currentTopics[topic.Name]
360413

361414
if !found {
@@ -466,65 +519,125 @@ func applySpecFile() error {
466519
}
467520
}
468521

469-
// Get current ACLs from broker
470-
currentAcls, err := listAllAcls(admin)
471-
if err != nil {
472-
return err
473-
}
522+
if len(spec.ConsumerGroups) > 0 {
523+
// Get current consumer-groups from broker
524+
currentGroups, err := (*admin).ListConsumerGroups()
525+
if err != nil {
526+
return errors.New("Can't list consumer-groups: " + err.Error())
527+
}
474528

475-
// Iterate over ACLs
476-
breakLoop := false
477-
for _, acl := range spec.Acls {
478-
principal := acl.Principal
479-
for _, permission := range acl.Permissions {
480-
resource := permission.Resource
481-
for i, rule := range append(permission.Allow, permission.Deny...) {
482-
sacl := SingleACL{
483-
Principal: principal,
484-
Resource: resource,
485-
Operation: getOperation(rule),
486-
Host: getHost(rule),
529+
// Iterate over consumer-groups
530+
for _, group := range spec.ConsumerGroups {
531+
if group.State != "absent" {
532+
return errors.New("Consumer-groups support only state=absent")
533+
}
534+
group.PatternType = strings.ToLower(group.PatternType)
535+
if group.PatternType != "prefixed" && group.PatternType != "match" {
536+
group.PatternType = "literal"
537+
}
538+
539+
// Delete consumer-groups by pattern
540+
fmt.Printf("TASK [CONSUMER-GROUP : Delete consumer-group %s by %s] %s\n", group.PatternType, group.Name, strings.Repeat("*", 25))
541+
var currentState = Ok
542+
var currentError = ""
543+
for currentGroupName, _ := range currentGroups {
544+
var matched = false
545+
if group.PatternType == "prefixed" {
546+
matched = strings.HasPrefix(currentGroupName, group.Name)
547+
} else if group.PatternType == "match" {
548+
matched, _ = regexp.MatchString(group.Name, currentGroupName)
549+
} else if currentGroupName == group.Name {
550+
matched = true
487551
}
488-
if permission.State == "absent" {
489-
sacl.State = "absent"
490-
} else {
491-
sacl.State = "present"
492-
// Host can be unset, we'll treat this as * for creating
493-
if sacl.Host == "" {
494-
sacl.Host = "*"
552+
553+
if matched {
554+
group.Matched = append(group.Matched, currentGroupName)
555+
err := DeleteConsumerGroup(currentGroupName, admin)
556+
if err != nil {
557+
numError++
558+
currentState = Error
559+
currentError = err.Error()
560+
if errorStop {
561+
printResult(Error, broker, err.Error(), group)
562+
break
563+
}
564+
} else {
565+
numChanged++
566+
if currentState != Error {
567+
currentState = Changed
568+
}
495569
}
496570
}
497-
if i < len(permission.Allow) {
498-
sacl.PermissionType = "ALLOW"
499-
} else {
500-
sacl.PermissionType = "DENY"
501-
}
571+
}
572+
printResult(currentState, broker, currentError, group)
573+
if currentState == Ok {
574+
numOk++
575+
}
576+
}
577+
}
502578

503-
result, err := alignAcl(admin, &currentAcls, sacl)
504-
if result == Ok {
505-
printResult(Ok, broker, "", sacl)
506-
numOk++
507-
} else if err != nil {
508-
printResult(Error, broker, err.Error(), sacl)
509-
numError++
510-
if errorStop {
511-
breakLoop = true
512-
break
579+
if len(spec.Acls) > 0 {
580+
581+
// Get current ACLs from broker
582+
currentAcls, err := listAllAcls(admin)
583+
if err != nil {
584+
return err
585+
}
586+
587+
// Iterate over ACLs
588+
breakLoop := false
589+
for _, acl := range spec.Acls {
590+
principal := acl.Principal
591+
for _, permission := range acl.Permissions {
592+
resource := permission.Resource
593+
for i, rule := range append(permission.Allow, permission.Deny...) {
594+
sacl := SingleACL{
595+
Principal: principal,
596+
Resource: resource,
597+
Operation: getOperation(rule),
598+
Host: getHost(rule),
513599
}
514-
} else {
515-
printResult(result, broker, "", sacl)
516-
numChanged++
600+
if permission.State == "absent" {
601+
sacl.State = "absent"
602+
} else {
603+
sacl.State = "present"
604+
// Host can be unset, we'll treat this as * for creating
605+
if sacl.Host == "" {
606+
sacl.Host = "*"
607+
}
608+
}
609+
if i < len(permission.Allow) {
610+
sacl.PermissionType = "ALLOW"
611+
} else {
612+
sacl.PermissionType = "DENY"
613+
}
614+
615+
result, err := alignAcl(admin, &currentAcls, sacl)
616+
if result == Ok {
617+
printResult(Ok, broker, "", sacl)
618+
numOk++
619+
} else if err != nil {
620+
printResult(Error, broker, err.Error(), sacl)
621+
numError++
622+
if errorStop {
623+
breakLoop = true
624+
break
625+
}
626+
} else {
627+
printResult(result, broker, "", sacl)
628+
numChanged++
629+
}
630+
}
631+
if breakLoop {
632+
break
517633
}
518634
}
519635
if breakLoop {
520636
break
521637
}
522-
}
523-
if breakLoop {
524-
break
638+
525639
}
526640
}
527-
528641
printSummary(broker, numOk, numChanged, numError)
529642
if numError > 0 {
530643
return errors.New("")
@@ -704,6 +817,11 @@ func deleteTopic(topic string, admin *sarama.ClusterAdmin) error {
704817
return err
705818
}
706819

820+
func DeleteConsumerGroup(group string, admin *sarama.ClusterAdmin) error {
821+
err := (*admin).DeleteConsumerGroup(group)
822+
return err
823+
}
824+
707825
func printResult(color string, broker string, msg string, debug interface{}) {
708826
var status string
709827
switch color {

0 commit comments

Comments
 (0)