Skip to content

Commit e152d5b

Browse files
committed
Option to disable TLS for kafka
1 parent 661b150 commit e152d5b

File tree

4 files changed

+20
-5
lines changed

4 files changed

+20
-5
lines changed

cmd/root.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,10 @@ func init() {
124124
rootCmd.PersistentFlags().Int("storage-staging-postgres-maxIdleConns", 25, "PostgreSQL max idle connections for staging storage")
125125
rootCmd.PersistentFlags().Int("storage-staging-postgres-maxConnLifetime", 300, "PostgreSQL max connection lifetime in seconds for staging storage")
126126
rootCmd.PersistentFlags().Int("storage-staging-postgres-connectTimeout", 10, "PostgreSQL connection timeout in seconds for staging storage")
127-
// Kafka storage flags - only for main storage (where blockchain data is committed)
128127
rootCmd.PersistentFlags().String("storage-main-kafka-brokers", "", "Kafka brokers for main storage")
129128
rootCmd.PersistentFlags().String("storage-main-kafka-username", "", "Kafka username for main storage")
130129
rootCmd.PersistentFlags().String("storage-main-kafka-password", "", "Kafka password for main storage")
130+
rootCmd.PersistentFlags().Bool("storage-main-kafka-enable-tls", true, "Enable TLS for Kafka connection in main storage")
131131
rootCmd.PersistentFlags().String("storage-main-kafka-postgres-host", "", "PostgreSQL host for Kafka main storage bookkeeping")
132132
rootCmd.PersistentFlags().Int("storage-main-kafka-postgres-port", 5432, "PostgreSQL port for Kafka main storage bookkeeping")
133133
rootCmd.PersistentFlags().String("storage-main-kafka-postgres-username", "", "PostgreSQL username for Kafka main storage bookkeeping")
@@ -149,6 +149,9 @@ func init() {
149149
rootCmd.PersistentFlags().Bool("publisher-enabled", false, "Toggle publisher")
150150
rootCmd.PersistentFlags().String("publisher-mode", "default", "Publisher mode: default or parallel")
151151
rootCmd.PersistentFlags().String("publisher-brokers", "", "Kafka brokers")
152+
rootCmd.PersistentFlags().String("publisher-username", "", "Kafka username for publisher")
153+
rootCmd.PersistentFlags().String("publisher-password", "", "Kafka password for publisher")
154+
rootCmd.PersistentFlags().Bool("publisher-enable-tls", true, "Enable TLS for Kafka connection in publisher")
152155
rootCmd.PersistentFlags().Bool("publisher-blocks-enabled", false, "Toggle block publisher")
153156
rootCmd.PersistentFlags().String("publisher-blocks-topicName", "", "Kafka topic name for blocks")
154157
rootCmd.PersistentFlags().Bool("publisher-transactions-enabled", false, "Toggle transaction publisher")
@@ -255,6 +258,7 @@ func init() {
255258
viper.BindPFlag("storage.main.kafka.brokers", rootCmd.PersistentFlags().Lookup("storage-main-kafka-brokers"))
256259
viper.BindPFlag("storage.main.kafka.username", rootCmd.PersistentFlags().Lookup("storage-main-kafka-username"))
257260
viper.BindPFlag("storage.main.kafka.password", rootCmd.PersistentFlags().Lookup("storage-main-kafka-password"))
261+
viper.BindPFlag("storage.main.kafka.enable_tls", rootCmd.PersistentFlags().Lookup("storage-main-kafka-enable-tls"))
258262
viper.BindPFlag("storage.main.kafka.postgres.host", rootCmd.PersistentFlags().Lookup("storage-main-kafka-postgres-host"))
259263
viper.BindPFlag("storage.main.kafka.postgres.port", rootCmd.PersistentFlags().Lookup("storage-main-kafka-postgres-port"))
260264
viper.BindPFlag("storage.main.kafka.postgres.username", rootCmd.PersistentFlags().Lookup("storage-main-kafka-postgres-username"))
@@ -276,6 +280,9 @@ func init() {
276280
viper.BindPFlag("publisher.enabled", rootCmd.PersistentFlags().Lookup("publisher-enabled"))
277281
viper.BindPFlag("publisher.mode", rootCmd.PersistentFlags().Lookup("publisher-mode"))
278282
viper.BindPFlag("publisher.brokers", rootCmd.PersistentFlags().Lookup("publisher-brokers"))
283+
viper.BindPFlag("publisher.username", rootCmd.PersistentFlags().Lookup("publisher-username"))
284+
viper.BindPFlag("publisher.password", rootCmd.PersistentFlags().Lookup("publisher-password"))
285+
viper.BindPFlag("publisher.enable_tls", rootCmd.PersistentFlags().Lookup("publisher-enable-tls"))
279286
viper.BindPFlag("publisher.blocks.enabled", rootCmd.PersistentFlags().Lookup("publisher-blocks-enabled"))
280287
viper.BindPFlag("publisher.blocks.topicName", rootCmd.PersistentFlags().Lookup("publisher-blocks-topicName"))
281288
viper.BindPFlag("publisher.transactions.enabled", rootCmd.PersistentFlags().Lookup("publisher-transactions-enabled"))

configs/config.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,10 +102,11 @@ type PostgresConfig struct {
102102
}
103103

104104
type KafkaConfig struct {
105-
Brokers string `mapstructure:"brokers"`
106-
Username string `mapstructure:"username"`
107-
Password string `mapstructure:"password"`
108-
Postgres *PostgresConfig `mapstructure:"postgres"`
105+
Brokers string `mapstructure:"brokers"`
106+
Username string `mapstructure:"username"`
107+
Password string `mapstructure:"password"`
108+
EnableTLS bool `mapstructure:"enable_tls"`
109+
Postgres *PostgresConfig `mapstructure:"postgres"`
109110
}
110111

111112
type RPCBatchRequestConfig struct {
@@ -184,6 +185,7 @@ type PublisherConfig struct {
184185
Brokers string `mapstructure:"brokers"`
185186
Username string `mapstructure:"username"`
186187
Password string `mapstructure:"password"`
188+
EnableTLS bool `mapstructure:"enable_tls"`
187189
Blocks BlockPublisherConfig `mapstructure:"blocks"`
188190
Transactions TransactionPublisherConfig `mapstructure:"transactions"`
189191
Traces TracePublisherConfig `mapstructure:"traces"`

internal/publisher/publisher.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ func (p *Publisher) initialize() error {
7676
User: config.Cfg.Publisher.Username,
7777
Pass: config.Cfg.Publisher.Password,
7878
}.AsMechanism()))
79+
}
80+
81+
if config.Cfg.Publisher.EnableTLS {
7982
tlsDialer := &tls.Dialer{NetDialer: &net.Dialer{Timeout: 10 * time.Second}}
8083
opts = append(opts, kgo.Dialer(tlsDialer.DialContext))
8184
}

internal/storage/kafka_publisher.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ func NewKafkaPublisher(cfg *config.KafkaConfig) (*KafkaPublisher, error) {
4747
User: cfg.Username,
4848
Pass: cfg.Password,
4949
}.AsMechanism()))
50+
}
51+
52+
if cfg.EnableTLS {
5053
tlsDialer := &tls.Dialer{NetDialer: &net.Dialer{Timeout: 10 * time.Second}}
5154
opts = append(opts, kgo.Dialer(tlsDialer.DialContext))
5255
}

0 commit comments

Comments
 (0)