@@ -3,6 +3,7 @@ package kafka
33import (
44 "encoding/json"
55 "fmt"
6+ "strconv"
67 "strings"
78
89 crd "github.com/RedHatInsights/clowder/apis/cloud.redhat.com/v1alpha1"
@@ -199,14 +200,19 @@ type genericConfig map[string]string
199200
200201func (s mskProvider ) connectConfig (config * apiextensions.JSON ) error {
201202
203+ replicas := 3
204+ if s .Env .Spec .Providers .Kafka .KafkaConnectReplicaCount != 0 {
205+ replicas = s .Env .Spec .Providers .Kafka .KafkaConnectReplicaCount
206+ }
207+
202208 connectConfig := genericConfig {
203- "config.storage.replication.factor" : "3" ,
209+ "config.storage.replication.factor" : strconv . Itoa ( replicas ) ,
204210 "config.storage.topic" : fmt .Sprintf ("%v-connect-cluster-configs" , s .Env .Name ),
205211 "connector.client.config.override.policy" : "All" ,
206212 "group.id" : "connect-cluster" ,
207- "offset.storage.replication.factor" : "3" ,
213+ "offset.storage.replication.factor" : strconv . Itoa ( replicas ) ,
208214 "offset.storage.topic" : fmt .Sprintf ("%v-connect-cluster-offsets" , s .Env .Name ),
209- "status.storage.replication.factor" : "3" ,
215+ "status.storage.replication.factor" : strconv . Itoa ( replicas ) ,
210216 "status.storage.topic" : fmt .Sprintf ("%v-connect-cluster-status" , s .Env .Name ),
211217 }
212218
0 commit comments