Skip to content

Commit 5ff291e

Browse files
committed
Fix README and help
1 parent fb9953a commit 5ff291e

File tree

2 files changed

+253
-22
lines changed

2 files changed

+253
-22
lines changed

README.md

Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,226 @@
11
# kafka-ops
22
Yet another CLI utility to automate Kafka cluster management
3+
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://github.com/agapoff/kafka-ops/blob/master/LICENSE)
4+
5+
**Kafka-Ops** is a command-line utility written in Go and mostly inspired by [KafkaSpecs](https://github.com/streamthoughts/kafka-specs) java tool. It allows to automate Kafka management by describing resources (topics, ACLs) and their configs in spec-files and applying them to Kafka cluster. Kafka-Ops uses AdminClient Kafka API to align cluster resources with the spec idempotently. The spec can be manifested either in YAML or JSON format.
6+
7+
8+
## Requirements
9+
10+
* Kafka 2.0+
11+
12+
## Spec Files Examples
13+
14+
Both YAML and JSON formats of spec files have the same notation and can be converted from each other. This is the example for YAML format:
15+
16+
kafka-cluster-example1.yaml:
17+
```yaml
18+
---
19+
topics:
20+
- configs:
21+
cleanup.policy: compact
22+
compression.type: producer
23+
min.insync.replicas: '1'
24+
retention.ms: 'default'
25+
name: my-topic1
26+
partitions: 3
27+
- name: my-topic2
28+
partitions: 3
29+
configs:
30+
retention.ms: 30000
31+
- name: my-topic3
32+
partitions: 1
33+
replication_factor: 1
34+
configs:
35+
min.insync.replicas: 'default'
36+
acls:
37+
- principal: 'User:test1'
38+
permissions:
39+
- resource:
40+
type: 'topic'
41+
pattern: 'my-'
42+
patternType: 'PREFIXED'
43+
allow_operations: ['READ:*', 'WRITE:*', 'DESCRIBE:*']
44+
- resource:
45+
type: 'group'
46+
pattern: 'my-group'
47+
patternType: 'LITERAL'
48+
allow_operations: ['READ:*']
49+
deny_operations: ['DESCRIBE:*']
50+
```
51+
52+
The format is quite evident. Just few remarks:
53+
* The topic config values are always strings
54+
* The topic config value can be set to *default*. This will remove the per-topic setting and the topic will be using the cluster default value
55+
* The ACL operation is described as *OperationType:Host*
56+
57+
kafka-cluster-example2.json:
58+
```json
59+
{
60+
"topics": [
61+
{
62+
"name": "my-topic2",
63+
"partitions": 3,
64+
"replication_factor": 1,
65+
"configs": {
66+
"retention.ms": "30000",
67+
"segment.bytes": "1073741824"
68+
}
69+
}
70+
],
71+
"acls": [
72+
{
73+
"principal": "User:ANONYMOUS",
74+
"permissions": [
75+
{
76+
"resource": {
77+
"type": "group",
78+
"pattern": "*",
79+
"patternType": "LITERAL"
80+
},
81+
"allow_operations": [
82+
"READ:*"
83+
]
84+
},
85+
{
86+
"resource": {
87+
"type": "topic",
88+
"pattern": "*",
89+
"patternType": "LITERAL"
90+
},
91+
"allow_operations": [
92+
"ALL:*",
93+
"WRITE:*",
94+
"DESCRIBE:*",
95+
"READ:*",
96+
"CREATE:*"
97+
]
98+
},
99+
{
100+
"resource": {
101+
"type": "cluster",
102+
"pattern": "kafka-cluster",
103+
"patternType": "LITERAL"
104+
},
105+
"allow_operations": [
106+
"ALL:*"
107+
]
108+
}
109+
]
110+
}
111+
]
112+
}
113+
```
114+
115+
## How to Apply the Spec
116+
117+
```bash
118+
./kafka-ops --apply --broker kafka1.cluster.local:9092,kafka2.cluster.local:9092 --spec kafka-cluster-example1.yaml --yaml
119+
```
120+
121+
output:
122+
```
123+
TASK [TOPIC : Create topic my-topic1 (partitions=3, replicas=1)] ***********************
124+
ok: [cy-selenium.quotix.io:9092]
125+
126+
TASK [TOPIC : Create topic my-topic2 (partitions=3, replicas=1)] ***********************
127+
ok: [cy-selenium.quotix.io:9092]
128+
129+
TASK [TOPIC : Create topic my-topic3 (partitions=1, replicas=1)] ***********************
130+
ok: [cy-selenium.quotix.io:9092]
131+
132+
TASK [ACL : Create ACL (ALLOW User:test1@* to READ topic:PREFIXED:my-)] ****************
133+
changed: [cy-selenium.quotix.io:9092]
134+
135+
TASK [ACL : Create ACL (ALLOW User:test1@* to WRITE topic:PREFIXED:my-)] ***************
136+
changed: [cy-selenium.quotix.io:9092]
137+
138+
TASK [ACL : Create ACL (ALLOW User:test1@* to DESCRIBE topic:PREFIXED:my-)] ************
139+
changed: [cy-selenium.quotix.io:9092]
140+
141+
TASK [ACL : Create ACL (ALLOW User:test1@* to READ group:LITERAL:my-group)] ************
142+
changed: [cy-selenium.quotix.io:9092]
143+
144+
TASK [ACL : Create ACL (DENY User:test1@* to DESCRIBE group:LITERAL:my-group)] *********
145+
changed: [cy-selenium.quotix.io:9092]
146+
147+
SUMMARY ********************************************************************************
148+
149+
ok=3 changed=5 failed=0
150+
```
151+
152+
Some settings can be read from environment variables:
153+
```bash
154+
export KAFKA_BROKER=kafka1.cluster.local:9093
155+
export KAFKA_SPEC_FILE=kafka-cluster-example2.json
156+
export KAFKA_USERNAME=admin
157+
export KAFKA_PASSWORD=admin-secret
158+
./kafka-ops --apply --protocol sasl_ssl --json --verbose --stop-on-error
159+
```
160+
161+
## How to Dump the Current Cluster Config
162+
163+
Kafka-Ops can also export the current topics and ACLs from the cluster. This can be useful for editing the spec and applyting back or for migrating the spec to another cluster.
164+
165+
```bash
166+
./kafka-ops --dump --yaml
167+
```
168+
169+
Note that if no broker is defined then Kafka-Ops tries to connect to *localhost:9092*.
170+
171+
172+
## Full Usage
173+
174+
```
175+
./kafka-ops --help
176+
Manage Kafka cluster resources (topics and ACLs)
177+
Usage: ./kafka-ops <action> [<options>] [<broker connection options>]
178+
----------------
179+
Actions
180+
--help Show this help and exit
181+
--dump Dump cluster resources and their configs to stdout
182+
See also --json and --yaml options
183+
--apply Idempotently align cluster resources with the spec manifest
184+
See also --spec, --json and --yaml options
185+
----------------
186+
Options
187+
--spec A path to manifest (specification file) to be used
188+
with --apply action
189+
Can be also set by Env variable KAFKA_SPEC_FILE
190+
--yaml Spec-file is in YAML format
191+
Will try to detect format if none of --yaml or --json is set
192+
--json Spec-file is in JSON format
193+
Will try to detect format if none of --yaml or --json is set
194+
--verbose Verbose output
195+
--stop-on-error Exit on first occurred error
196+
----------------
197+
Broker connection options
198+
--broker Bootstrap-brokers, comma-separated. Default is localhost:9092
199+
Can be also set by Env variable KAFKA_BROKER
200+
--protocol Security protocol. Default is plaintext
201+
Available options: plaintext, sasl_ssl, sasl_plaintext
202+
--mechanism SASL mechanism. Default is scram-sha-256
203+
Available options: scram-sha-256, scram-sha-512
204+
--username Username for authentication
205+
Can be also set by Env variable KAFKA_USERNAME
206+
--password Password for authentication
207+
Can be also set by Env variable KAFKA_PASSWORD
208+
```
209+
210+
## How to build the binary
211+
212+
You need golang and GNU make to be installed.
213+
214+
```bash
215+
make build
216+
```
217+
218+
If you have rpm-build installed then you may build RPM-package
219+
220+
```bash
221+
make rpm
222+
```
223+
224+
## Contributing
225+
226+
This is an open source project so feel free to contribute.

kafka-ops.go

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,34 @@ func init() {
129129

130130
func main() {
131131
defer handleExit()
132-
// Connect to Kafka broker
132+
133+
//defer fmt.Println("closed")
134+
if actionApply {
135+
admin := connectToKafkaCluster()
136+
defer func() { _ = (*admin).Close() }()
137+
err := applySpecFile(admin)
138+
if err != nil {
139+
if err.Error() != "" {
140+
fmt.Println(err.Error())
141+
}
142+
panic(Exit{2})
143+
}
144+
} else if actionDump {
145+
admin := connectToKafkaCluster()
146+
defer func() { _ = (*admin).Close() }()
147+
err := dumpSpec(admin)
148+
if err != nil {
149+
if err.Error() != "" {
150+
fmt.Println(err.Error())
151+
}
152+
panic(Exit{2})
153+
}
154+
} else if actionHelp {
155+
usage()
156+
}
157+
}
158+
159+
func connectToKafkaCluster() *sarama.ClusterAdmin {
133160
brokerAddrs := strings.Split(broker, ",")
134161
config := sarama.NewConfig()
135162
config.Version = sarama.V2_2_0_0
@@ -162,27 +189,7 @@ func main() {
162189
fmt.Println("Error while creating cluster admin: " + err.Error())
163190
os.Exit(2)
164191
}
165-
defer func() { _ = admin.Close() }()
166-
//defer fmt.Println("closed")
167-
if actionApply {
168-
err = applySpecFile(&admin)
169-
if err != nil {
170-
if err.Error() != "" {
171-
fmt.Println(err.Error())
172-
}
173-
panic(Exit{2})
174-
}
175-
} else if actionDump {
176-
err = dumpSpec(&admin)
177-
if err != nil {
178-
if err.Error() != "" {
179-
fmt.Println(err.Error())
180-
}
181-
panic(Exit{2})
182-
}
183-
} else if actionHelp {
184-
usage()
185-
}
192+
return &admin
186193
}
187194

188195
func handleExit() {

0 commit comments

Comments
 (0)