Skip to content

Commit 7d355b1

Browse files
committed
Add ability to read connection options from spec
1 parent dcb8f66 commit 7d355b1

File tree

2 files changed

+41
-12
lines changed

2 files changed

+41
-12
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
.PHONY: rpm clean source
22

3-
KAFKA_OPS_VERSION ?= 1.0.0
3+
KAFKA_OPS_VERSION ?= 1.0.1
44
BUILD_NUMBER ?= 1
55
KAFKA_OPS ?= kafka-ops
66
REPO ?= github.com/agapoff/${KAFKA_OPS}

kafka-ops.go

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,15 @@ type arrFlags []string
4141
type Spec struct {
4242
Topics []Topic `yaml:"topics" json:"topics"`
4343
Acls []Acl `yaml:"acls" json:"acls"`
44+
Connection `yaml:"connection,omitempty" json:"connection,omitempty"`
4445
}
4546

4647
type Topic struct {
4748
Name string `yaml:"name" json:"name"`
4849
Partitions int `yaml:"partitions" json:"partitions"`
4950
ReplicationFactor int `yaml:"replication_factor" json:"replication_factor"`
5051
Configs map[string]string `yaml:"configs" json:"configs"`
51-
State string `yaml:"state" json:"state"`
52+
State string `yaml:"state,omitempty" json:"state,omitempty"`
5253
}
5354

5455
type Acl struct {
@@ -60,7 +61,7 @@ type Permission struct {
6061
Resource `yaml:"resource" json:"resource"`
6162
Allow []string `yaml:"allow_operations,omitempty,flow" json:"allow_operations,omitempty"`
6263
Deny []string `yaml:"deny_operations,omitempty" json:"deny_operations,omitempty"`
63-
State string `yaml:"state" json:"state"`
64+
State string `yaml:"state,omitempty" json:"state,omitempty"`
6465
}
6566

6667
type Resource struct {
@@ -78,6 +79,14 @@ type SingleACL struct {
7879
State string `json:"state"`
7980
}
8081

82+
type Connection struct {
83+
Broker string `yaml:"broker,omitempty" json:"broker,omitempty"`
84+
Protocol string `yaml:"protocol,omitempty" json:"protocol,omitempty"`
85+
Mechanism string `yaml:"mechanism,omitempty" json:"mechanism,omitempty"`
86+
Username string `yaml:"username,omitempty" json:"username,omitempty"`
87+
Password string `yaml:"password,omitempty" json:"password,omitempty"`
88+
}
89+
8190
type Exit struct { Code int }
8291

8392
const (
@@ -123,7 +132,7 @@ func init() {
123132
}
124133
if broker == "" {
125134
broker = loadEnvVar("KAFKA_BROKER")
126-
if broker == "" {
135+
if broker == "" && actionDump {
127136
broker = "localhost:9092"
128137
}
129138
}
@@ -151,19 +160,15 @@ func main() {
151160
//defer fmt.Println("closed")
152161

153162
if actionApply {
154-
admin := connectToKafkaCluster()
155-
defer func() { _ = (*admin).Close() }()
156-
err := applySpecFile(admin)
163+
err := applySpecFile()
157164
if err != nil {
158165
if err.Error() != "" {
159166
fmt.Println(err.Error())
160167
}
161168
panic(Exit{2})
162169
}
163170
} else if actionDump {
164-
admin := connectToKafkaCluster()
165-
defer func() { _ = (*admin).Close() }()
166-
err := dumpSpec(admin)
171+
err := dumpSpec()
167172
if err != nil {
168173
if err.Error() != "" {
169174
fmt.Println(err.Error())
@@ -220,7 +225,9 @@ func handleExit() {
220225
}
221226
}
222227

223-
func dumpSpec(admin *sarama.ClusterAdmin) error {
228+
func dumpSpec() error {
229+
admin := connectToKafkaCluster()
230+
defer func() { _ = (*admin).Close() }()
224231
// Get current topics from broker
225232
currentTopics, err := (*admin).ListTopics()
226233
if err != nil {
@@ -334,14 +341,36 @@ func listAllAcls(admin *sarama.ClusterAdmin) ([]sarama.ResourceAcls,error) {
334341
return currentAcls, nil
335342
}
336343

337-
func applySpecFile(admin *sarama.ClusterAdmin) error {
344+
func applySpecFile() error {
338345
var numOk, numChanged, numError int
339346

340347
spec, err := parseSpecFile()
341348
if err != nil {
342349
return errors.New("Can't parse spec manifest: " + err.Error())
343350
}
344351

352+
if spec.Connection.Broker != "" {
353+
broker = spec.Connection.Broker
354+
}
355+
if spec.Connection.Protocol != "" {
356+
protocol = spec.Connection.Protocol
357+
}
358+
if spec.Connection.Mechanism != "" {
359+
mechanism = spec.Connection.Mechanism
360+
}
361+
if spec.Connection.Username != "" {
362+
username = spec.Connection.Username
363+
}
364+
if spec.Connection.Password != "" {
365+
password = spec.Connection.Password
366+
}
367+
if broker == "" {
368+
broker = "localhost:9092"
369+
}
370+
371+
admin := connectToKafkaCluster()
372+
defer func() { _ = (*admin).Close() }()
373+
345374
// Get number of brokers
346375
brokers, _, err := (*admin).DescribeCluster()
347376
if err != nil {

0 commit comments

Comments
 (0)