Skip to content

Commit d3c7146

Browse files
committed
Initial kafka committer
1 parent 7b572b1 commit d3c7146

File tree

11 files changed

+851
-4
lines changed

11 files changed

+851
-4
lines changed

cmd/root.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,19 @@ func init() {
124124
rootCmd.PersistentFlags().Int("storage-staging-postgres-maxIdleConns", 25, "PostgreSQL max idle connections for staging storage")
125125
rootCmd.PersistentFlags().Int("storage-staging-postgres-maxConnLifetime", 300, "PostgreSQL max connection lifetime in seconds for staging storage")
126126
rootCmd.PersistentFlags().Int("storage-staging-postgres-connectTimeout", 10, "PostgreSQL connection timeout in seconds for staging storage")
127+
// Kafka storage flags - only for main storage (where blockchain data is committed)
128+
rootCmd.PersistentFlags().Bool("storage-main-kafka-enabled", false, "Enable Kafka storage for main storage")
129+
rootCmd.PersistentFlags().String("storage-main-kafka-brokers", "", "Kafka brokers for main storage")
130+
rootCmd.PersistentFlags().String("storage-main-kafka-username", "", "Kafka username for main storage")
131+
rootCmd.PersistentFlags().String("storage-main-kafka-password", "", "Kafka password for main storage")
132+
rootCmd.PersistentFlags().String("storage-main-kafka-postgres-host", "", "PostgreSQL host for Kafka main storage bookkeeping")
133+
rootCmd.PersistentFlags().Int("storage-main-kafka-postgres-port", 5432, "PostgreSQL port for Kafka main storage bookkeeping")
134+
rootCmd.PersistentFlags().String("storage-main-kafka-postgres-username", "", "PostgreSQL username for Kafka main storage bookkeeping")
135+
rootCmd.PersistentFlags().String("storage-main-kafka-postgres-password", "", "PostgreSQL password for Kafka main storage bookkeeping")
136+
rootCmd.PersistentFlags().String("storage-main-kafka-postgres-database", "", "PostgreSQL database for Kafka main storage bookkeeping")
137+
rootCmd.PersistentFlags().String("storage-main-kafka-postgres-sslMode", "require", "PostgreSQL SSL mode for Kafka main storage bookkeeping")
138+
rootCmd.PersistentFlags().Int("storage-main-kafka-postgres-maxOpenConns", 25, "PostgreSQL max open connections for Kafka main storage bookkeeping")
139+
rootCmd.PersistentFlags().Int("storage-main-kafka-postgres-maxIdleConns", 10, "PostgreSQL max idle connections for Kafka main storage bookkeeping")
127140
rootCmd.PersistentFlags().String("api-host", "localhost:3000", "API host")
128141
rootCmd.PersistentFlags().String("api-basicAuth-username", "", "API basic auth username")
129142
rootCmd.PersistentFlags().String("api-basicAuth-password", "", "API basic auth password")
@@ -240,6 +253,19 @@ func init() {
240253
viper.BindPFlag("storage.staging.postgres.maxIdleConns", rootCmd.PersistentFlags().Lookup("storage-staging-postgres-maxIdleConns"))
241254
viper.BindPFlag("storage.staging.postgres.maxConnLifetime", rootCmd.PersistentFlags().Lookup("storage-staging-postgres-maxConnLifetime"))
242255
viper.BindPFlag("storage.staging.postgres.connectTimeout", rootCmd.PersistentFlags().Lookup("storage-staging-postgres-connectTimeout"))
256+
// Bind Kafka storage flags - only for main storage
257+
viper.BindPFlag("storage.main.kafka.enabled", rootCmd.PersistentFlags().Lookup("storage-main-kafka-enabled"))
258+
viper.BindPFlag("storage.main.kafka.brokers", rootCmd.PersistentFlags().Lookup("storage-main-kafka-brokers"))
259+
viper.BindPFlag("storage.main.kafka.username", rootCmd.PersistentFlags().Lookup("storage-main-kafka-username"))
260+
viper.BindPFlag("storage.main.kafka.password", rootCmd.PersistentFlags().Lookup("storage-main-kafka-password"))
261+
viper.BindPFlag("storage.main.kafka.postgres.host", rootCmd.PersistentFlags().Lookup("storage-main-kafka-postgres-host"))
262+
viper.BindPFlag("storage.main.kafka.postgres.port", rootCmd.PersistentFlags().Lookup("storage-main-kafka-postgres-port"))
263+
viper.BindPFlag("storage.main.kafka.postgres.username", rootCmd.PersistentFlags().Lookup("storage-main-kafka-postgres-username"))
264+
viper.BindPFlag("storage.main.kafka.postgres.password", rootCmd.PersistentFlags().Lookup("storage-main-kafka-postgres-password"))
265+
viper.BindPFlag("storage.main.kafka.postgres.database", rootCmd.PersistentFlags().Lookup("storage-main-kafka-postgres-database"))
266+
viper.BindPFlag("storage.main.kafka.postgres.sslMode", rootCmd.PersistentFlags().Lookup("storage-main-kafka-postgres-sslMode"))
267+
viper.BindPFlag("storage.main.kafka.postgres.maxOpenConns", rootCmd.PersistentFlags().Lookup("storage-main-kafka-postgres-maxOpenConns"))
268+
viper.BindPFlag("storage.main.kafka.postgres.maxIdleConns", rootCmd.PersistentFlags().Lookup("storage-main-kafka-postgres-maxIdleConns"))
243269
viper.BindPFlag("api.host", rootCmd.PersistentFlags().Lookup("api-host"))
244270
viper.BindPFlag("api.basicAuth.username", rootCmd.PersistentFlags().Lookup("api-basicAuth-username"))
245271
viper.BindPFlag("api.basicAuth.password", rootCmd.PersistentFlags().Lookup("api-basicAuth-password"))

configs/config.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ const (
6262
type StorageConnectionConfig struct {
6363
Clickhouse *ClickhouseConfig `mapstructure:"clickhouse"`
6464
Postgres *PostgresConfig `mapstructure:"postgres"`
65+
Kafka *KafkaConfig `mapstructure:"kafka"`
6566
}
6667

6768
type TableConfig struct {
@@ -100,6 +101,14 @@ type PostgresConfig struct {
100101
ConnectTimeout int `mapstructure:"connectTimeout"`
101102
}
102103

104+
type KafkaConfig struct {
105+
Enabled bool `mapstructure:"enabled"`
106+
Brokers string `mapstructure:"brokers"`
107+
Username string `mapstructure:"username"`
108+
Password string `mapstructure:"password"`
109+
Postgres *PostgresConfig `mapstructure:"postgres"`
110+
}
111+
103112
type RPCBatchRequestConfig struct {
104113
BlocksPerRequest int `mapstructure:"blocksPerRequest"`
105114
BatchDelay int `mapstructure:"batchDelay"`

internal/common/block.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,11 @@ type BlockModel struct {
5959
}
6060

6161
type BlockData struct {
62-
Block Block
63-
Transactions []Transaction
64-
Logs []Log
65-
Traces []Trace
62+
ChainId uint64 `json:"chain_id"`
63+
Block Block `json:"block"`
64+
Transactions []Transaction `json:"transactions"`
65+
Logs []Log `json:"logs"`
66+
Traces []Trace `json:"traces"`
6667
}
6768

6869
type BlockHeader struct {

internal/orchestrator/committer_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,7 @@ func TestHandleGap(t *testing.T) {
426426
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{
427427
Blocks: 5,
428428
})
429+
mockRPC.EXPECT().GetChainID().Return(big.NewInt(1))
429430
mockRPC.EXPECT().GetFullBlocks(context.Background(), []*big.Int{big.NewInt(100), big.NewInt(101), big.NewInt(102), big.NewInt(103), big.NewInt(104)}).Return([]rpc.GetFullBlockResult{
430431
{BlockNumber: big.NewInt(100), Data: common.BlockData{Block: common.Block{Number: big.NewInt(100)}}},
431432
{BlockNumber: big.NewInt(101), Data: common.BlockData{Block: common.Block{Number: big.NewInt(101)}}},

internal/orchestrator/failure_recoverer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ func (fr *FailureRecoverer) handleWorkerResults(blockFailures []common.BlockFail
110110
})
111111
} else {
112112
successfulResults = append(successfulResults, common.BlockData{
113+
ChainId: fr.rpc.GetChainID().Uint64(),
113114
Block: result.Data.Block,
114115
Logs: result.Data.Logs,
115116
Transactions: result.Data.Transactions,

internal/orchestrator/poller.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,7 @@ func (p *Poller) convertPollResultsToBlockData(results []rpc.GetFullBlockResult)
262262
blockData := make([]common.BlockData, 0, len(successfulResults))
263263
for _, result := range successfulResults {
264264
blockData = append(blockData, common.BlockData{
265+
ChainId: p.rpc.GetChainID().Uint64(),
265266
Block: result.Data.Block,
266267
Logs: result.Data.Logs,
267268
Transactions: result.Data.Transactions,

internal/orchestrator/reorg_handler.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,7 @@ func (rh *ReorgHandler) handleReorg(ctx context.Context, reorgedBlockNumbers []*
274274
return fmt.Errorf("cannot fix reorg: failed block %s: %w", result.BlockNumber.String(), result.Error)
275275
}
276276
data = append(data, common.BlockData{
277+
ChainId: rh.rpc.GetChainID().Uint64(),
277278
Block: result.Data.Block,
278279
Logs: result.Data.Logs,
279280
Transactions: result.Data.Transactions,

internal/storage/clickhouse.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1959,6 +1959,7 @@ func (c *ClickHouseConnector) GetValidationBlockData(chainId *big.Int, startBloc
19591959
for i, block := range blocksResult.blocks {
19601960
blockNum := block.Number.String()
19611961
blockData[i] = common.BlockData{
1962+
ChainId: chainId.Uint64(),
19621963
Block: block,
19631964
Logs: logsResult.logMap[blockNum],
19641965
Transactions: txsResult.txMap[blockNum],
@@ -2138,6 +2139,7 @@ func (c *ClickHouseConnector) GetFullBlockData(chainId *big.Int, blockNumbers []
21382139
for i, block := range blocksResult.blocks {
21392140
blockNum := block.Number.String()
21402141
blockData[i] = common.BlockData{
2142+
ChainId: chainId.Uint64(),
21412143
Block: block,
21422144
Logs: logsResult.logMap[blockNum],
21432145
Transactions: txsResult.txMap[blockNum],

internal/storage/connector.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,8 @@ func NewConnector[T any](cfg *config.StorageConnectionConfig) (T, error) {
152152
conn, err = NewPostgresConnector(cfg.Postgres)
153153
} else if cfg.Clickhouse != nil {
154154
conn, err = NewClickHouseConnector(cfg.Clickhouse)
155+
} else if cfg.Kafka != nil {
156+
conn, err = NewKafkaPostgresConnector(cfg.Kafka)
155157
} else {
156158
return *new(T), fmt.Errorf("no storage driver configured")
157159
}

0 commit comments

Comments
 (0)