@@ -523,13 +523,27 @@ func alignAcl(admin *sarama.ClusterAdmin, acls *[]sarama.ResourceAcls, acl Singl
523
523
} else {
524
524
action = "Remove"
525
525
}
526
+
527
+ if acl .Resource .Type == "cluster" {
528
+ if acl .Resource .Pattern == "" {
529
+ acl .Resource .Pattern = "kafka-cluster"
530
+ }
531
+ }
532
+ if acl .Resource .PatternType == "" {
533
+ acl .Resource .PatternType = "LITERAL"
534
+ }
535
+
526
536
fmt .Printf ("TASK [ACL : %s ACL (%s %s@%s to %s %s:%s:%s)] %s\n " , action , acl .PermissionType , acl .Principal ,
527
537
acl .Host , acl .Operation , acl .Resource .Type , acl .Resource .PatternType , acl .Resource .Pattern , strings .Repeat ("*" , 25 ))
528
538
529
539
if acl .Principal == "" {
530
540
return Error , errors .New ("Principal not defined" )
531
541
}
532
542
543
+ if aclResourceTypeFromString (acl .Resource .Type ) == sarama .AclResourceUnknown {
544
+ return Error , errors .New ("Wrong resource type: " + acl .Resource .Type )
545
+ }
546
+
533
547
if acl .State == "absent" {
534
548
// Won't check the presence. We'll just try do delete and see the length of MatchingAcl in response
535
549
filter := sarama.AclFilter {
@@ -561,6 +575,7 @@ func alignAcl(admin *sarama.ClusterAdmin, acls *[]sarama.ResourceAcls, acl Singl
561
575
} else {
562
576
r := sarama.Resource {
563
577
ResourceType : aclResourceTypeFromString (acl .Resource .Type ),
578
+ ResourceName : acl .Resource .Pattern ,
564
579
ResourcePatternType : aclResourcePatternTypeFromString (acl .Resource .PatternType ),
565
580
}
566
581
a := sarama.Acl {
@@ -740,29 +755,29 @@ func (f *arrFlags) String() string {
740
755
}
741
756
742
757
func validateFlags () {
743
- flag .StringVar (& broker , "broker" , "" , "Bootstrap-brokers, default is localhost:9092 (can be also set by Env variable KAFKA_BROKER)" )
744
- flag .StringVar (& specfile , "spec" , "" , "Spec-file (can be set by Env variable KAFKA_SPEC_FILE)" )
745
- flag .StringVar (& protocol , "protocol" , "plaintext" , "Security protocol. Available options: plaintext, sasl_ssl, sasl_plaintext (default: plaintext)" )
746
- flag .StringVar (& mechanism , "mechanism" , "scram-sha-256" , "SASL mechanism. Available options: scram-sha-256, scram-sha-512 (default: scram-sha-256)" )
747
- flag .StringVar (& username , "username" , "" , "Username for authentication (can be also set by Env variable KAFKA_USERNAME" )
748
- flag .StringVar (& password , "password" , "" , "Password for authentication (can be also set by Env variable KAFKA_PASSWORD" )
749
- flag .BoolVar (& actionApply , "apply" , false , "Apply spec-file to the broker, create all entities that do not exist there; this is the default action" )
750
- flag .BoolVar (& actionDump , "dump" , false , "Dump broker entities in YAML (default) or JSON format to stdout or to a file if --spec option is defined" )
751
- flag .BoolVar (& actionHelp , "help" , false , "Print usage" )
752
- flag .BoolVar (& isYAML , "yaml" , false , "Spec-file is in YAML format (will try to detect format if none of --yaml or --json is set)" )
753
- flag .BoolVar (& isJSON , "json" , false , "Spec-file is in JSON format (will try to detect format if none of --yaml or --json is set)" )
754
- flag .BoolVar (& errorStop , "stop-on-error" , false , "Exit on first occurred error" )
755
- flag .BoolVar (& isTemplate , "template" , false , "Spec-file is a template" )
756
- flag .BoolVar (& missingOk , "missingok" , false , "Ignore missing template keys" )
757
- flag .BoolVar (& verbose , "verbose" , false , "Verbose output" )
758
- flag .Var (& varFlags , "var" , "Variable for templating" )
759
- flag .Usage = func () {
760
- usage ()
761
- }
762
- flag .Parse ()
763
-
764
- protocol = strings .ToLower (protocol )
765
- mechanism = strings .ToLower (mechanism )
758
+ flag .StringVar (& broker , "broker" , "" , "Bootstrap-brokers, default is localhost:9092 (can be also set by Env variable KAFKA_BROKER)" )
759
+ flag .StringVar (& specfile , "spec" , "" , "Spec-file (can be set by Env variable KAFKA_SPEC_FILE)" )
760
+ flag .StringVar (& protocol , "protocol" , "plaintext" , "Security protocol. Available options: plaintext, sasl_ssl, sasl_plaintext (default: plaintext)" )
761
+ flag .StringVar (& mechanism , "mechanism" , "scram-sha-256" , "SASL mechanism. Available options: scram-sha-256, scram-sha-512 (default: scram-sha-256)" )
762
+ flag .StringVar (& username , "username" , "" , "Username for authentication (can be also set by Env variable KAFKA_USERNAME" )
763
+ flag .StringVar (& password , "password" , "" , "Password for authentication (can be also set by Env variable KAFKA_PASSWORD" )
764
+ flag .BoolVar (& actionApply , "apply" , false , "Apply spec-file to the broker, create all entities that do not exist there; this is the default action" )
765
+ flag .BoolVar (& actionDump , "dump" , false , "Dump broker entities in YAML (default) or JSON format to stdout or to a file if --spec option is defined" )
766
+ flag .BoolVar (& actionHelp , "help" , false , "Print usage" )
767
+ flag .BoolVar (& isYAML , "yaml" , false , "Spec-file is in YAML format (will try to detect format if none of --yaml or --json is set)" )
768
+ flag .BoolVar (& isJSON , "json" , false , "Spec-file is in JSON format (will try to detect format if none of --yaml or --json is set)" )
769
+ flag .BoolVar (& errorStop , "stop-on-error" , false , "Exit on first occurred error" )
770
+ flag .BoolVar (& isTemplate , "template" , false , "Spec-file is a template" )
771
+ flag .BoolVar (& missingOk , "missingok" , false , "Ignore missing template keys" )
772
+ flag .BoolVar (& verbose , "verbose" , false , "Verbose output" )
773
+ flag .Var (& varFlags , "var" , "Variable for templating" )
774
+ flag .Usage = func () {
775
+ usage ()
776
+ }
777
+ flag .Parse ()
778
+
779
+ protocol = strings .ToLower (protocol )
780
+ mechanism = strings .ToLower (mechanism )
766
781
767
782
if ! actionApply && ! actionDump && ! actionHelp {
768
783
fmt .Println ("Please define one of the actions: --dump, --apply, --help" )
0 commit comments