@@ -11,6 +11,7 @@ import (
11
11
"io/ioutil"
12
12
"gopkg.in/yaml.v2"
13
13
"encoding/json"
14
+ "errors"
14
15
)
15
16
16
17
var (
25
26
isJSON bool
26
27
actionApply bool
27
28
actionDump bool
29
+ actionHelp bool
28
30
errorStop bool
29
31
)
30
32
@@ -59,6 +61,8 @@ type Resource struct {
59
61
60
62
type OperationHost string
61
63
64
+ type Exit struct { Code int }
65
+
62
66
const (
63
67
Ok = "\033 [0;32m"
64
68
Changed = "\033 [0;33m"
@@ -75,20 +79,27 @@ func init() {
75
79
flag .StringVar (& password , "password" , "" , "Password for authentication (can be also set by Env variable KAFKA_PASSWORD" )
76
80
flag .BoolVar (& actionApply , "apply" , false , "Apply spec-file to the broker, create all entities that do not exist there; this is the default action" )
77
81
flag .BoolVar (& actionDump , "dump" , false , "Dump broker entities in YAML (default) or JSON format to stdout or to a file if --spec option is defined" )
82
+ flag .BoolVar (& actionHelp , "help" , false , "Print usage" )
78
83
flag .BoolVar (& isYAML , "yaml" , false , "Spec-file is in YAML format (will try to detect format if none of --yaml or --json is set)" )
79
84
flag .BoolVar (& isJSON , "json" , false , "Spec-file is in JSON format (will try to detect format if none of --yaml or --json is set)" )
80
85
flag .BoolVar (& errorStop , "stop-on-error" , false , "Exit on first occurred error" )
81
86
flag .BoolVar (& verbose , "verbose" , false , "Verbose output" )
87
+ flag .Usage = func () {
88
+ usage ()
89
+ }
82
90
flag .Parse ()
83
91
84
- if ! actionApply && ! actionDump {
85
- actionApply = true
92
+ if ! actionApply && ! actionDump && ! actionHelp {
93
+ fmt .Println ("Please define one of the actions: --dump, --apply, --help" )
94
+ os .Exit (1 )
86
95
}
87
96
if actionApply && actionDump {
88
- panic ("Please define one of the actions: --dump, --apply" )
97
+ fmt .Println ("Please define one of the actions: --dump, --apply. Refer to kafka-ops --help for details" )
98
+ os .Exit (1 )
89
99
}
90
100
if isJSON && isYAML {
91
- panic ("Please define one of the formats: --json, --yaml" )
101
+ fmt .Println ("Please define one of the formats: --json, --yaml" )
102
+ os .Exit (1 )
92
103
}
93
104
if broker == "" {
94
105
broker = loadEnvVar ("KAFKA_BROKER" )
@@ -99,7 +110,8 @@ func init() {
99
110
if specfile == "" {
100
111
specfile = loadEnvVar ("KAFKA_SPEC_FILE" )
101
112
if specfile == "" && actionApply {
102
- panic ("Please define spec file with --spec option or with KAFKA_SPEC_FILE env variable" )
113
+ fmt .Println ("Please define spec file with --spec option or with KAFKA_SPEC_FILE env variable" )
114
+ os .Exit (1 )
103
115
}
104
116
}
105
117
protocol = strings .ToLower (protocol )
@@ -115,6 +127,7 @@ func init() {
115
127
}
116
128
117
129
func main () {
130
+ defer handleExit ()
118
131
// Connect to Kafka broker
119
132
brokerAddrs := strings .Split (broker , "," )
120
133
config := sarama .NewConfig ()
@@ -131,34 +144,60 @@ func main() {
131
144
config .Net .SASL .Mechanism = sarama .SASLTypeSCRAMSHA512
132
145
config .Net .SASL .SCRAMClientGeneratorFunc = func () sarama.SCRAMClient { return & client.XDGSCRAMClient {HashGeneratorFcn : client .SHA512 } }
133
146
} else {
134
- panic ("The only supported SASL mechanisms: scram-sha-256, scram-sha-512" )
147
+ fmt .Println ("The only supported SASL mechanisms: scram-sha-256, scram-sha-512" )
148
+ os .Exit (1 )
135
149
}
136
150
}
137
151
138
152
admin , err := sarama .NewClusterAdmin (brokerAddrs , config )
139
153
if err != nil {
140
- panic ("Error while creating cluster admin: " + err .Error ())
154
+ fmt .Println ("Error while creating cluster admin: " + err .Error ())
155
+ os .Exit (2 )
141
156
}
142
157
defer func () { _ = admin .Close () }()
143
-
158
+ //defer fmt.Println("closed")
144
159
if actionApply {
145
- applySpecFile (& admin )
160
+ err = applySpecFile (& admin )
161
+ if err != nil {
162
+ if err .Error () != "" {
163
+ fmt .Println (err .Error ())
164
+ }
165
+ panic (Exit {2 })
166
+ }
146
167
} else if actionDump {
147
- dumpSpec (& admin )
168
+ err = dumpSpec (& admin )
169
+ if err != nil {
170
+ if err .Error () != "" {
171
+ fmt .Println (err .Error ())
172
+ }
173
+ panic (Exit {2 })
174
+ }
175
+ } else if actionHelp {
176
+ usage ()
148
177
}
149
178
}
150
179
151
- func dumpSpec (adminref * sarama.ClusterAdmin ) {
152
- admin := * adminref
180
+ func handleExit () {
181
+ if e := recover (); e != nil {
182
+ if exit , ok := e .(Exit ); ok == true {
183
+ os .Exit (exit .Code )
184
+ }
185
+ panic (e )
186
+ }
187
+ }
153
188
189
+ func dumpSpec (admin * sarama.ClusterAdmin ) error {
154
190
// Get current topics from broker
155
- currentTopics , err := admin .ListTopics ()
191
+ currentTopics , err := ( * admin ) .ListTopics ()
156
192
if err != nil {
157
- panic ( err )
193
+ return err
158
194
}
159
195
160
196
// Get current ACLs from broker
161
- currentAcls := listAllAcls (& admin )
197
+ currentAcls ,err := listAllAcls (admin )
198
+ if err != nil {
199
+ return err
200
+ }
162
201
163
202
var spec Spec
164
203
@@ -209,6 +248,7 @@ func dumpSpec(adminref *sarama.ClusterAdmin) {
209
248
yamlTopic ,_ := yaml .Marshal (spec )
210
249
fmt .Printf (string (yamlTopic ))
211
250
}
251
+ return nil
212
252
}
213
253
214
254
func (s * Spec ) AddAcl (acl Acl ) {
@@ -244,7 +284,7 @@ func (o OperationHost) Host() string {
244
284
return strings .Split (string (o ), ":" )[1 ]
245
285
}
246
286
247
- func listAllAcls (adminref * sarama.ClusterAdmin ) []sarama.ResourceAcls {
287
+ func listAllAcls (admin * sarama.ClusterAdmin ) ( []sarama.ResourceAcls , error ) {
248
288
filter := sarama.AclFilter {
249
289
Version : 1 ,
250
290
ResourceType : sarama .AclResourceAny ,
@@ -253,24 +293,25 @@ func listAllAcls(adminref *sarama.ClusterAdmin) []sarama.ResourceAcls {
253
293
PermissionType : sarama .AclPermissionAny ,
254
294
}
255
295
256
- currentAcls , err := (* adminref ).ListAcls (filter )
296
+ currentAcls , err := (* admin ).ListAcls (filter )
257
297
if err != nil {
258
- panic ( err )
298
+ return nil , err
259
299
}
260
- return currentAcls
300
+ return currentAcls , nil
261
301
}
262
302
263
- func applySpecFile (adminref * sarama.ClusterAdmin ) {
264
- admin := * adminref
265
-
303
+ func applySpecFile (admin * sarama.ClusterAdmin ) error {
266
304
var numOk , numChanged , numError int
267
305
268
- spec := parseSpecFile ()
306
+ spec , err := parseSpecFile ()
307
+ if err != nil {
308
+ return errors .New ("Can't parse spec manifest: " + err .Error ())
309
+ }
269
310
270
311
// Get number of brokers
271
- brokers , _ , err := admin .DescribeCluster ()
312
+ brokers , _ , err := ( * admin ) .DescribeCluster ()
272
313
if err != nil {
273
- panic ( err )
314
+ return errors . New ( "Can't get number of brokers: " + err . Error () )
274
315
}
275
316
var autoReplicationFactor int
276
317
if len (brokers ) > 1 {
@@ -280,9 +321,9 @@ func applySpecFile(adminref *sarama.ClusterAdmin) {
280
321
}
281
322
282
323
// Get current topics from broker
283
- currentTopics , err := admin .ListTopics ()
324
+ currentTopics , err := ( * admin ) .ListTopics ()
284
325
if err != nil {
285
- panic ( err )
326
+ return errors . New ( "Can't list topics: " + err . Error () )
286
327
}
287
328
288
329
// Iterate over topics
@@ -295,7 +336,7 @@ func applySpecFile(adminref *sarama.ClusterAdmin) {
295
336
296
337
if ! found {
297
338
// Topic doesn't exist - need to create one
298
- err := createTopic (topic , & admin )
339
+ err := createTopic (topic , admin )
299
340
if err != nil {
300
341
printResult (Error , broker , err .Error (), topic )
301
342
numError ++
@@ -324,7 +365,7 @@ func applySpecFile(adminref *sarama.ClusterAdmin) {
324
365
325
366
// Check the partitions count
326
367
if int32 (topic .Partitions ) != currentTopic .NumPartitions {
327
- err := alterNumPartitions (topic .Name , & admin , topic .Partitions )
368
+ err := alterNumPartitions (topic .Name , admin , topic .Partitions )
328
369
if err != nil {
329
370
printResult (Error , broker , err .Error (), topic )
330
371
numError ++
@@ -352,7 +393,7 @@ func applySpecFile(adminref *sarama.ClusterAdmin) {
352
393
}
353
394
}
354
395
if topicConfigAlterNeeded {
355
- topic ,err = alterTopicConfig (topic , & admin , currentTopic .ConfigEntries )
396
+ topic ,err = alterTopicConfig (topic , admin , currentTopic .ConfigEntries )
356
397
if err != nil {
357
398
printResult (Error , broker , err .Error (), topic )
358
399
numError ++
@@ -376,7 +417,10 @@ func applySpecFile(adminref *sarama.ClusterAdmin) {
376
417
}
377
418
378
419
// Get current ACLs from broker
379
- currentAcls := listAllAcls (& admin )
420
+ currentAcls ,err := listAllAcls (admin )
421
+ if err != nil {
422
+ return err
423
+ }
380
424
381
425
// Iterate over ACLs
382
426
breakLoop := false
@@ -391,7 +435,7 @@ func applySpecFile(adminref *sarama.ClusterAdmin) {
391
435
} else {
392
436
permissionType = sarama .AclPermissionDeny
393
437
}
394
- result , err := createAclIfNotExists (& admin , & currentAcls , permissionType , principal , resource , OperationHost (rule ))
438
+ result , err := createAclIfNotExists (admin , & currentAcls , permissionType , principal , resource , OperationHost (rule ))
395
439
if result == Ok {
396
440
printResult (Ok , broker , "" , acl )
397
441
numOk ++
@@ -418,8 +462,9 @@ func applySpecFile(adminref *sarama.ClusterAdmin) {
418
462
419
463
printSummary (broker , numOk , numChanged , numError )
420
464
if numError > 0 {
421
- os . Exit ( 2 )
465
+ return errors . New ( "" )
422
466
}
467
+ return nil
423
468
}
424
469
425
470
func createAclIfNotExists (admin * sarama.ClusterAdmin , acls * []sarama.ResourceAcls , p sarama.AclPermissionType , principal string , resource Resource , oh OperationHost ) (string , error ){
@@ -461,33 +506,24 @@ func aclExists(a *[]sarama.ResourceAcls, p sarama.AclPermissionType, principal s
461
506
return false
462
507
}
463
508
464
- func parseSpecFile () Spec {
509
+ func parseSpecFile () (Spec ,error ) {
510
+ var spec Spec
465
511
specFile , err := ioutil .ReadFile (specfile )
466
512
if err != nil {
467
- panic ( err )
513
+ return spec , err
468
514
}
469
515
470
- var spec Spec
471
516
if (isYAML ) {
472
517
err = yaml .Unmarshal (specFile , & spec )
473
- if err != nil {
474
- panic (err )
475
- }
476
518
} else if (isJSON ) {
477
519
err = json .Unmarshal (specFile , & spec )
478
- if err != nil {
479
- panic (err )
480
- }
481
520
} else {
482
521
err = yaml .Unmarshal (specFile , & spec )
483
522
if err != nil {
484
523
err = json .Unmarshal (specFile , & spec )
485
- if err != nil {
486
- panic (err )
487
- }
488
524
}
489
525
}
490
- return spec
526
+ return spec , err
491
527
}
492
528
493
529
func alterNumPartitions (topic string , clusterAdmin * sarama.ClusterAdmin , count int ) error {
@@ -715,7 +751,41 @@ func aclPermissionTypeToString(permissionType sarama.AclPermissionType) string {
715
751
}
716
752
717
753
func usage () {
718
- fmt .Fprintf (os .Stderr , "Usage: %s -b <bootstrap brokers> -s <specfile> [-v]\n " , os .Args [0 ])
719
- flag .PrintDefaults ()
754
+ usage := `Manage Kafka cluster resources (topics and ACLs)
755
+ Usage: %s <action> [<options>] [<broker connection options>]
756
+ ----------------
757
+ Actions
758
+ --help Show this help and exit
759
+ --dump Dump cluster resources and their configs to stdout
760
+ See also --json and --yaml options
761
+ --apply Idempotently align cluster resources with the spec manifest
762
+ See also --spec, --json and --yaml options
763
+ ----------------
764
+ Options
765
+ --spec A path to manifest (specification file) to be used
766
+ with --apply action
767
+ Can be also set by Env variable KAFKA_SPEC_FILE
768
+ --yaml Spec-file is in YAML format
769
+ Will try to detect format if none of --yaml or --json is set
770
+ --json Spec-file is in JSON format
771
+ Will try to detect format if none of --yaml or --json is set
772
+ --verbose Verbose output
773
+ --stop-on-error Exit on first occurred error
774
+ ----------------
775
+ Broker connection options
776
+ --broker Bootstrap-brokers, comma-separated. Default is localhost:9092
777
+ Can be also set by Env variable KAFKA_BROKER
778
+ --protocol Security protocol. Default is plaintext
779
+ Available options: plaintext, sasl_ssl, sasl_plaintext
780
+ --mechanism SASL mechanism. Default is scram-sha-256
781
+ Available options: scram-sha-256, scram-sha-512
782
+ --username Username for authentication
783
+ Can be also set by Env variable KAFKA_USERNAME
784
+ --password Password for authentication
785
+ Can be also set by Env variable KAFKA_PASSWORD
786
+ `
787
+
788
+ fmt .Fprintf (os .Stderr , usage , os .Args [0 ])
789
+ //flag.PrintDefaults()
720
790
os .Exit (1 )
721
791
}
0 commit comments