Skip to content

Commit fefe9cc

Browse files
committed
Milestone: Finished ACL support
1 parent 6c79c26 commit fefe9cc

File tree

1 file changed

+61
-68
lines changed

1 file changed

+61
-68
lines changed

kafka-ops.go

Lines changed: 61 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -157,29 +157,7 @@ func dumpSpec(adminref *sarama.ClusterAdmin) {
157157
panic(err)
158158
}
159159

160-
/*
161-
r := sarama.Resource{ResourceType: sarama.AclResourceTopic, ResourceName: "Hello", ResourcePatternType: sarama.AclPatternLiteral}
162-
a := sarama.Acl{Principal: "User:test", Host: "*", Operation: sarama.AclOperationRead, PermissionType: sarama.AclPermissionAllow}
163-
err = admin.CreateACL(r, a)
164-
if err != nil {
165-
panic(err)
166-
}
167-
*/
168-
169-
/*
170-
filter := sarama.AclFilter{
171-
Version: 1,
172-
ResourceType: sarama.AclResourceAny,
173-
Operation: sarama.AclOperationAny,
174-
ResourcePatternTypeFilter: sarama.AclPatternAny,
175-
PermissionType: sarama.AclPermissionAny,
176-
}
177-
178-
currentAcls, err := admin.ListAcls(filter)
179-
if err != nil {
180-
panic(err)
181-
}
182-
*/
160+
// Get current ACLs from broker
183161
currentAcls := listAllAcls(&admin)
184162

185163
var spec Spec
@@ -255,7 +233,6 @@ func (r Resource) Equals(res Resource) bool {
255233
}
256234

257235
func (o OperationHost) Operation() sarama.AclOperation {
258-
//s := strings.Split(string(o), ":")
259236
return aclOperationFromString(o.OperationName())
260237
}
261238

@@ -346,9 +323,7 @@ func applySpecFile(adminref *sarama.ClusterAdmin) {
346323
}
347324

348325
// Check the partitions count
349-
//fmt.Printf("\nPartitions: %+v\n", currentTopic)
350326
if int32(topic.Partitions) != currentTopic.NumPartitions {
351-
//fmt.Printf("\nPartitions: %v, need %v\n", currentTopic.NumPartitions, topic.Partitions)
352327
err := alterNumPartitions(topic.Name, &admin, topic.Partitions)
353328
if err != nil {
354329
printResult(Error, broker, err.Error(), topic)
@@ -404,69 +379,74 @@ func applySpecFile(adminref *sarama.ClusterAdmin) {
404379
currentAcls := listAllAcls(&admin)
405380

406381
// Iterate over ACLs
407-
/*
408-
r := sarama.Resource{ResourceType: sarama.AclResourceTopic, ResourceName: "Hello", ResourcePatternType: sarama.AclPatternLiteral}
409-
a := sarama.Acl{Principal: "User:test", Host: "*", Operation: sarama.AclOperationRead, PermissionType: sarama.AclPermissionAllow}
410-
err = admin.CreateACL(r, a)
411-
if err != nil {
412-
panic(err)
413-
}
414-
*/
382+
breakLoop := false
415383
for _, acl := range spec.Acls {
416384
principal := acl.Principal
417385
for _, permission := range acl.Permissions {
418386
resource := permission.Resource
419-
for _, allow := range permission.Allow {
420-
oh := OperationHost(allow)
421-
fmt.Printf("TASK [ACL : Create ACL (ALLOW %s@%s to %s %s:%s:%s)] %s\n",
422-
principal, oh.Host(), oh.OperationName(), resource.Type, resource.PatternType, resource.Pattern, strings.Repeat("*", 25))
423-
if aclExists(&currentAcls, sarama.AclPermissionAllow, principal, oh.OperationName(), resource.Type, resource.PatternType, resource.Pattern, oh.Host()) {
424-
printResult(Ok, broker, "", nil)
425-
numOk++
387+
for i, rule := range append(permission.Allow,permission.Deny...) {
388+
var permissionType sarama.AclPermissionType
389+
if i < len(permission.Allow) {
390+
permissionType = sarama.AclPermissionAllow
426391
} else {
427-
r := sarama.Resource{
428-
ResourceType: aclResourceTypeFromString(resource.Type),
429-
ResourceName: resource.Pattern,
430-
ResourcePatternType: aclResourcePatternTypeFromString(resource.PatternType),
431-
}
432-
a := sarama.Acl{
433-
Principal: principal,
434-
Host: oh.Host(),
435-
Operation: aclOperationFromString(oh.OperationName()),
436-
PermissionType: sarama.AclPermissionAllow,
437-
}
438-
err = admin.CreateACL(r, a)
439-
if err != nil {
440-
printResult(Error, broker, err.Error(), acl)
441-
numError++
442-
if errorStop {
443-
break
444-
// TODO : break from all loops
445-
} else {
446-
continue
447-
}
392+
permissionType = sarama.AclPermissionDeny
393+
}
394+
result, err := createAclIfNotExists(&admin, &currentAcls, permissionType, principal, resource, OperationHost(rule))
395+
if result == Ok {
396+
printResult(Ok, broker, "", acl)
397+
numOk++
398+
} else if err != nil {
399+
printResult(Error, broker, err.Error(), acl)
400+
numError++
401+
if errorStop {
402+
breakLoop = true
403+
break
448404
}
449-
printResult(Changed, broker, "", acl)
405+
} else {
406+
printResult(result, broker, "", acl)
450407
numChanged++
451408
}
452409
}
410+
if breakLoop {
411+
break
412+
}
413+
}
414+
if breakLoop {
415+
break
453416
}
454417
}
455418

456-
//for _, resourceAcls := range currentAcls {
457-
// for _, currentAcl := range resourceAcls.Acls {
458-
459419
printSummary(broker, numOk, numChanged, numError)
460420
if numError > 0 {
461421
os.Exit(2)
462422
}
463423
}
464424

425+
func createAclIfNotExists(admin *sarama.ClusterAdmin, acls *[]sarama.ResourceAcls, p sarama.AclPermissionType, principal string, resource Resource, oh OperationHost) (string, error){
426+
fmt.Printf("TASK [ACL : Create ACL (%s %s@%s to %s %s:%s:%s)] %s\n",
427+
aclPermissionTypeToString(p), principal, oh.Host(), oh.OperationName(), resource.Type, resource.PatternType, resource.Pattern, strings.Repeat("*", 25))
428+
if aclExists(acls, p, principal, oh.OperationName(), resource.Type, resource.PatternType, resource.Pattern, oh.Host()) {
429+
return Ok, nil
430+
} else {
431+
r := sarama.Resource{
432+
ResourceType: aclResourceTypeFromString(resource.Type),
433+
ResourceName: resource.Pattern,
434+
ResourcePatternType: aclResourcePatternTypeFromString(resource.PatternType),
435+
}
436+
a := sarama.Acl{
437+
Principal: principal,
438+
Host: oh.Host(),
439+
Operation: aclOperationFromString(oh.OperationName()),
440+
PermissionType: p,
441+
}
442+
err := (*admin).CreateACL(r, a)
443+
return Changed, err
444+
}
445+
}
446+
465447
func aclExists(a *[]sarama.ResourceAcls, p sarama.AclPermissionType, principal string, operation string, resource string, patternType string, pattern string, host string) bool {
466448
for _, resourceAcls := range *a {
467449
for _, currentAcl := range resourceAcls.Acls {
468-
//fmt.Printf("Compare %s=%s %s=%s %s=%s\n", principal, currentAcl.Principal, operation, aclOperationToString(currentAcl.Operation), resource, aclResourceTypeToString(resourceAcls.Resource.ResourceType) )
469-
//fmt.Printf("Compare %s=%s %s=%s %s=%s\n", patternType, aclResourcePatternTypeToString(resourceAcls.Resource.ResourcePatternType), pattern, resourceAcls.Resource.ResourceName, host, currentAcl.Host)
470450
if principal == currentAcl.Principal &&
471451
strings.ToUpper(operation) == aclOperationToString(currentAcl.Operation) &&
472452
strings.ToLower(resource) == aclResourceTypeToString(resourceAcls.Resource.ResourceType) &&
@@ -721,6 +701,19 @@ func aclResourceTypeFromString(resourceType string) sarama.AclResourceType {
721701
}
722702
}
723703

704+
func aclPermissionTypeToString(permissionType sarama.AclPermissionType) string {
705+
switch permissionType {
706+
case sarama.AclPermissionAny:
707+
return "ANY"
708+
case sarama.AclPermissionDeny:
709+
return "DENY"
710+
case sarama.AclPermissionAllow:
711+
return "ALLOW"
712+
default:
713+
return "INVALID"
714+
}
715+
}
716+
724717
func usage() {
725718
fmt.Fprintf(os.Stderr, "Usage: %s -b <bootstrap brokers> -s <specfile> [-v]\n", os.Args[0])
726719
flag.PrintDefaults()

0 commit comments

Comments
 (0)