@@ -51,8 +51,9 @@ type Acl struct {
51
51
52
52
type Permission struct {
53
53
Resource `yaml:"resource" json:"resource"`
54
- Allow []string `yaml:"allow_operations,omitempty,flow" json:"allow_operations,omitempty"`
55
- Deny []string `yaml:"deny_operations,omitempty" json:"deny_operations,omitempty"`
54
+ Allow []string `yaml:"allow_operations,omitempty,flow" json:"allow_operations,omitempty"`
55
+ Deny []string `yaml:"deny_operations,omitempty" json:"deny_operations,omitempty"`
56
+ State string `yaml:"state" json:"state"`
56
57
}
57
58
58
59
type Resource struct {
@@ -61,7 +62,14 @@ type Resource struct {
61
62
PatternType string `yaml:"patternType" json:"patternType"`
62
63
}
63
64
64
- type OperationHost string
65
+ type SingleACL struct {
66
+ PermissionType string `json:"permission_type"`
67
+ Principal string `json:"principal"`
68
+ Resource `json:"resource"`
69
+ Operation string `json:"operation"`
70
+ Host string `json:"host,omitempty"`
71
+ State string `json:"state"`
72
+ }
65
73
66
74
type Exit struct { Code int }
67
75
@@ -288,16 +296,16 @@ func (r Resource) Equals(res Resource) bool {
288
296
return r .Type == res .Type && r .Pattern == res .Pattern && r .PatternType == res .PatternType
289
297
}
290
298
291
- func ( o OperationHost ) Operation () sarama. AclOperation {
292
- return aclOperationFromString ( o . OperationName () )
293
- }
294
-
295
- func ( o OperationHost ) OperationName () string {
296
- return strings . Split ( string ( o ), ":" )[ 0 ]
299
+ func getHost ( s string ) string {
300
+ split := strings . Split ( string ( s ), ":" )
301
+ if len ( split ) > 1 {
302
+ return split [ 1 ]
303
+ }
304
+ return ""
297
305
}
298
306
299
- func ( o OperationHost ) Host ( ) string {
300
- return strings .Split (string (o ), ":" )[1 ]
307
+ func getOperation ( s string ) string {
308
+ return strings .Split (string (s ), ":" )[0 ]
301
309
}
302
310
303
311
func listAllAcls (admin * sarama.ClusterAdmin ) ([]sarama.ResourceAcls ,error ) {
@@ -347,6 +355,7 @@ func applySpecFile(admin *sarama.ClusterAdmin) error {
347
355
if topic .State == "absent" {
348
356
fmt .Printf ("TASK [TOPIC : Delete topic %s] %s\n " , topic .Name , strings .Repeat ("*" , 52 ))
349
357
} else {
358
+ topic .State = "present"
350
359
fmt .Printf ("TASK [TOPIC : Create topic %s (partitions=%d, replicas=%d)] %s\n " , topic .Name , topic .Partitions , topic .ReplicationFactor , strings .Repeat ("*" , 25 ))
351
360
}
352
361
currentTopic , found := currentTopics [topic .Name ]
@@ -446,25 +455,40 @@ func applySpecFile(admin *sarama.ClusterAdmin) error {
446
455
for _ , permission := range acl .Permissions {
447
456
resource := permission .Resource
448
457
for i , rule := range append (permission .Allow ,permission .Deny ... ) {
449
- var permissionType sarama.AclPermissionType
458
+ sacl := SingleACL {
459
+ Principal : principal ,
460
+ Resource : resource ,
461
+ Operation : getOperation (rule ),
462
+ Host : getHost (rule ),
463
+ }
464
+ if permission .State == "absent" {
465
+ sacl .State = "absent"
466
+ } else {
467
+ sacl .State = "present"
468
+ // Host can be unset, we'll treat this as * for creating
469
+ if sacl .Host == "" {
470
+ sacl .Host = "*"
471
+ }
472
+ }
450
473
if i < len (permission .Allow ) {
451
- permissionType = sarama . AclPermissionAllow
474
+ sacl . PermissionType = "ALLOW"
452
475
} else {
453
- permissionType = sarama . AclPermissionDeny
476
+ sacl . PermissionType = "DENY"
454
477
}
455
- result , err := createAclIfNotExists (admin , & currentAcls , permissionType , principal , resource , OperationHost (rule ))
478
+
479
+ result , err := alignAcl (admin , & currentAcls , sacl )
456
480
if result == Ok {
457
- printResult (Ok , broker , "" , acl )
481
+ printResult (Ok , broker , "" , sacl )
458
482
numOk ++
459
483
} else if err != nil {
460
- printResult (Error , broker , err .Error (), acl )
484
+ printResult (Error , broker , err .Error (), sacl )
461
485
numError ++
462
486
if errorStop {
463
487
breakLoop = true
464
488
break
465
489
}
466
490
} else {
467
- printResult (result , broker , "" , acl )
491
+ printResult (result , broker , "" , sacl )
468
492
numChanged ++
469
493
}
470
494
}
@@ -484,38 +508,74 @@ func applySpecFile(admin *sarama.ClusterAdmin) error {
484
508
return nil
485
509
}
486
510
487
- func createAclIfNotExists (admin * sarama.ClusterAdmin , acls * []sarama.ResourceAcls , p sarama.AclPermissionType , principal string , resource Resource , oh OperationHost ) (string , error ){
488
- fmt .Printf ("TASK [ACL : Create ACL (%s %s@%s to %s %s:%s:%s)] %s\n " ,
489
- aclPermissionTypeToString (p ), principal , oh .Host (), oh .OperationName (), resource .Type , resource .PatternType , resource .Pattern , strings .Repeat ("*" , 25 ))
490
- if aclExists (acls , p , principal , oh .OperationName (), resource .Type , resource .PatternType , resource .Pattern , oh .Host ()) {
511
+ func alignAcl (admin * sarama.ClusterAdmin , acls * []sarama.ResourceAcls , acl SingleACL ) (string , error ){
512
+ var action string
513
+ if acl .State == "present" {
514
+ action = "Create"
515
+ } else {
516
+ action = "Remove"
517
+ }
518
+ fmt .Printf ("TASK [ACL : %s ACL (%s %s@%s to %s %s:%s:%s)] %s\n " , action , acl .PermissionType , acl .Principal ,
519
+ acl .Host , acl .Operation , acl .Resource .Type , acl .Resource .PatternType , acl .Resource .Pattern , strings .Repeat ("*" , 25 ))
520
+
521
+ if acl .Principal == "" {
522
+ return Error , errors .New ("Principal not defined" )
523
+ }
524
+
525
+ if acl .State == "absent" {
526
+ // Won't check the presence. We'll just try do delete and see the length of MatchingAcl in response
527
+ filter := sarama.AclFilter {
528
+ ResourceType : aclResourceTypeFromString (acl .Resource .Type ),
529
+ ResourceName : & acl .Resource .Pattern ,
530
+ ResourcePatternTypeFilter : aclResourcePatternTypeFromString (acl .Resource .PatternType ),
531
+ Operation : aclOperationFromString (acl .Operation ),
532
+ PermissionType : aclPermissionTypeFromString (acl .PermissionType ),
533
+ }
534
+ if acl .Host != "" {
535
+ filter .Host = & acl .Host
536
+ }
537
+ if acl .Principal != "*" {
538
+ filter .Principal = & acl .Principal
539
+ }
540
+ mAcls , err := (* admin ).DeleteACL (filter , false )
541
+ if err != nil {
542
+ return Error , err
543
+ }
544
+ if len (mAcls ) > 0 {
545
+ return Changed , nil
546
+ } else {
547
+ return Ok , nil
548
+ }
549
+ }
550
+
551
+ if aclExists (admin , acls , acl ) {
491
552
return Ok , nil
492
553
} else {
493
554
r := sarama.Resource {
494
- ResourceType : aclResourceTypeFromString (resource .Type ),
495
- ResourceName : resource .Pattern ,
496
- ResourcePatternType : aclResourcePatternTypeFromString (resource .PatternType ),
555
+ ResourceType : aclResourceTypeFromString (acl .Resource .Type ),
556
+ ResourcePatternType : aclResourcePatternTypeFromString (acl .Resource .PatternType ),
497
557
}
498
558
a := sarama.Acl {
499
- Principal : principal ,
500
- Host : oh .Host () ,
501
- Operation : aclOperationFromString (oh . OperationName () ),
502
- PermissionType : p ,
559
+ Principal : acl . Principal ,
560
+ Host : acl .Host ,
561
+ Operation : aclOperationFromString (acl . Operation ),
562
+ PermissionType : aclPermissionTypeFromString ( acl . PermissionType ) ,
503
563
}
504
564
err := (* admin ).CreateACL (r , a )
505
565
return Changed , err
506
566
}
507
567
}
508
568
509
- func aclExists (a * [] sarama.ResourceAcls , p sarama.AclPermissionType , principal string , operation string , resource string , patternType string , pattern string , host string ) bool {
510
- for _ , resourceAcls := range * a {
569
+ func aclExists (admin * sarama.ClusterAdmin , acls * [] sarama.ResourceAcls , acl SingleACL ) bool {
570
+ for _ , resourceAcls := range * acls {
511
571
for _ , currentAcl := range resourceAcls .Acls {
512
- if principal == currentAcl .Principal &&
513
- strings .ToUpper (operation ) == aclOperationToString (currentAcl .Operation ) &&
514
- strings .ToLower (resource ) == aclResourceTypeToString (resourceAcls .Resource .ResourceType ) &&
515
- strings .ToUpper (patternType ) == aclResourcePatternTypeToString (resourceAcls .Resource .ResourcePatternType ) &&
516
- pattern == resourceAcls .Resource .ResourceName &&
517
- host == currentAcl .Host &&
518
- p == currentAcl .PermissionType {
572
+ if acl . Principal == currentAcl .Principal &&
573
+ strings .ToUpper (acl . Operation ) == aclOperationToString (currentAcl .Operation ) &&
574
+ strings .ToLower (acl . Resource . Type ) == aclResourceTypeToString (resourceAcls .Resource .ResourceType ) &&
575
+ strings .ToUpper (acl . Resource . PatternType ) == aclResourcePatternTypeToString (resourceAcls .Resource .ResourcePatternType ) &&
576
+ acl . Resource . Pattern == resourceAcls .Resource .ResourceName &&
577
+ acl . Host == currentAcl .Host &&
578
+ acl . PermissionType == aclPermissionTypeToString ( currentAcl .PermissionType ) {
519
579
return true
520
580
}
521
581
}
@@ -771,6 +831,19 @@ func aclPermissionTypeToString(permissionType sarama.AclPermissionType) string {
771
831
}
772
832
}
773
833
834
+ func aclPermissionTypeFromString (permissionType string ) sarama.AclPermissionType {
835
+ switch strings .ToUpper (permissionType ) {
836
+ case "ALLOW" :
837
+ return sarama .AclPermissionAllow
838
+ case "DENY" :
839
+ return sarama .AclPermissionDeny
840
+ case "ANY" :
841
+ return sarama .AclPermissionAny
842
+ default :
843
+ return sarama .AclPermissionUnknown
844
+ }
845
+ }
846
+
774
847
func usage () {
775
848
usage := `Manage Kafka cluster resources (topics and ACLs)
776
849
Usage: %s <action> [<options>] [<broker connection options>]
0 commit comments