Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
d3c7146
Initial kafka committer
jakeloo Aug 13, 2025
cbbea07
Update config
jakeloo Aug 13, 2025
4775d57
Error on uninitialize brokers
jakeloo Aug 13, 2025
661b150
Update queries
jakeloo Aug 14, 2025
e152d5b
Option to disable TLS for kafka
jakeloo Aug 14, 2025
e61fae7
Add projection mode in blocks
jakeloo Aug 14, 2025
fc2ae64
Fix publish parallel mode
jakeloo Aug 14, 2025
191298b
Gofmt
jakeloo Aug 14, 2025
e45907a
Update schema
jakeloo Aug 15, 2025
ceeac3b
Fix backfill table
jakeloo Aug 15, 2025
64aaec5
Update kafka storage producer
jakeloo Aug 15, 2025
20dc471
Kafka + Redis
jakeloo Aug 15, 2025
0bf3097
Update schema payload
jakeloo Aug 18, 2025
6f2b72d
Merge branch 'main' into jl/commit-kafka
jakeloo Aug 19, 2025
cd434a2
Update kafka-postgres -> kafka-redis config
jakeloo Aug 19, 2025
4fd141d
Update schema to use replacing merge tree
jakeloo Aug 20, 2025
3b9f694
Fix schema
jakeloo Aug 20, 2025
92a35ab
Badger & S3
jakeloo Aug 22, 2025
eea71f4
Until block for committer
jakeloo Aug 22, 2025
68087a0
terminate when poller or committer exit
jakeloo Aug 25, 2025
c0ba962
Fix commit until block
jakeloo Aug 26, 2025
51f1398
Don't cancel active tasks in poller
jakeloo Aug 26, 2025
debc231
migrate with destination storage
jakeloo Aug 26, 2025
bddbf54
Remove RPC batch config in migrate
jakeloo Aug 26, 2025
43dad9c
Cleanup
jakeloo Aug 26, 2025
da31422
Add from_address, to_address to schema
jakeloo Aug 26, 2025
b698c18
Retry with RPC batch size reduction
jakeloo Aug 26, 2025
96dc60b
Shuffle Orchestrator and Staging interface
jakeloo Aug 26, 2025
41cc98d
store poller in committer
jakeloo Aug 26, 2025
70ea871
Simplified storage. Split kafka and redis
jakeloo Aug 26, 2025
f920a71
kafka requires orchestrator
jakeloo Aug 26, 2025
32eece5
Fix orchestrator flag
jakeloo Aug 26, 2025
e35ff76
Fix badger keys
jakeloo Aug 26, 2025
6233232
Fix backfill missing blocks in staging
jakeloo Aug 26, 2025
31d923f
Revert "Fix backfill missing blocks in staging"
jakeloo Aug 26, 2025
7cb6ff1
block buffer
jakeloo Aug 27, 2025
59aad94
Poller S3 support
jakeloo Aug 27, 2025
884a3aa
Fix boundaries for migration
jakeloo Aug 27, 2025
86f3d68
Badger for caching in s3 connector
jakeloo Aug 27, 2025
4595fa6
optimize s3 insertion
jakeloo Aug 27, 2025
136a346
redis tls. erc1155 batch mv
jakeloo Aug 28, 2025
b9828af
fix projections, use _part_offset projections
jakeloo Aug 28, 2025
69f5f78
gofmt
jakeloo Aug 28, 2025
e551c17
Fix test
jakeloo Aug 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,18 @@ func init() {
rootCmd.PersistentFlags().Int("storage-staging-postgres-maxIdleConns", 25, "PostgreSQL max idle connections for staging storage")
rootCmd.PersistentFlags().Int("storage-staging-postgres-maxConnLifetime", 300, "PostgreSQL max connection lifetime in seconds for staging storage")
rootCmd.PersistentFlags().Int("storage-staging-postgres-connectTimeout", 10, "PostgreSQL connection timeout in seconds for staging storage")
rootCmd.PersistentFlags().String("storage-main-kafka-brokers", "", "Kafka brokers for main storage")
rootCmd.PersistentFlags().String("storage-main-kafka-username", "", "Kafka username for main storage")
rootCmd.PersistentFlags().String("storage-main-kafka-password", "", "Kafka password for main storage")
rootCmd.PersistentFlags().Bool("storage-main-kafka-enable-tls", true, "Enable TLS for Kafka connection in main storage")
rootCmd.PersistentFlags().String("storage-main-kafka-postgres-host", "", "PostgreSQL host for Kafka main storage bookkeeping")
rootCmd.PersistentFlags().Int("storage-main-kafka-postgres-port", 5432, "PostgreSQL port for Kafka main storage bookkeeping")
rootCmd.PersistentFlags().String("storage-main-kafka-postgres-username", "", "PostgreSQL username for Kafka main storage bookkeeping")
rootCmd.PersistentFlags().String("storage-main-kafka-postgres-password", "", "PostgreSQL password for Kafka main storage bookkeeping")
rootCmd.PersistentFlags().String("storage-main-kafka-postgres-database", "", "PostgreSQL database for Kafka main storage bookkeeping")
rootCmd.PersistentFlags().String("storage-main-kafka-postgres-sslMode", "require", "PostgreSQL SSL mode for Kafka main storage bookkeeping")
rootCmd.PersistentFlags().Int("storage-main-kafka-postgres-maxOpenConns", 25, "PostgreSQL max open connections for Kafka main storage bookkeeping")
rootCmd.PersistentFlags().Int("storage-main-kafka-postgres-maxIdleConns", 10, "PostgreSQL max idle connections for Kafka main storage bookkeeping")
rootCmd.PersistentFlags().String("api-host", "localhost:3000", "API host")
rootCmd.PersistentFlags().String("api-basicAuth-username", "", "API basic auth username")
rootCmd.PersistentFlags().String("api-basicAuth-password", "", "API basic auth password")
Expand All @@ -137,6 +149,9 @@ func init() {
rootCmd.PersistentFlags().Bool("publisher-enabled", false, "Toggle publisher")
rootCmd.PersistentFlags().String("publisher-mode", "default", "Publisher mode: default or parallel")
rootCmd.PersistentFlags().String("publisher-brokers", "", "Kafka brokers")
rootCmd.PersistentFlags().String("publisher-username", "", "Kafka username for publisher")
rootCmd.PersistentFlags().String("publisher-password", "", "Kafka password for publisher")
rootCmd.PersistentFlags().Bool("publisher-enable-tls", true, "Enable TLS for Kafka connection in publisher")
rootCmd.PersistentFlags().Bool("publisher-blocks-enabled", false, "Toggle block publisher")
rootCmd.PersistentFlags().String("publisher-blocks-topicName", "", "Kafka topic name for blocks")
rootCmd.PersistentFlags().Bool("publisher-transactions-enabled", false, "Toggle transaction publisher")
Expand Down Expand Up @@ -240,6 +255,18 @@ func init() {
viper.BindPFlag("storage.staging.postgres.maxIdleConns", rootCmd.PersistentFlags().Lookup("storage-staging-postgres-maxIdleConns"))
viper.BindPFlag("storage.staging.postgres.maxConnLifetime", rootCmd.PersistentFlags().Lookup("storage-staging-postgres-maxConnLifetime"))
viper.BindPFlag("storage.staging.postgres.connectTimeout", rootCmd.PersistentFlags().Lookup("storage-staging-postgres-connectTimeout"))
viper.BindPFlag("storage.main.kafka.brokers", rootCmd.PersistentFlags().Lookup("storage-main-kafka-brokers"))
viper.BindPFlag("storage.main.kafka.username", rootCmd.PersistentFlags().Lookup("storage-main-kafka-username"))
viper.BindPFlag("storage.main.kafka.password", rootCmd.PersistentFlags().Lookup("storage-main-kafka-password"))
viper.BindPFlag("storage.main.kafka.enable_tls", rootCmd.PersistentFlags().Lookup("storage-main-kafka-enable-tls"))
viper.BindPFlag("storage.main.kafka.postgres.host", rootCmd.PersistentFlags().Lookup("storage-main-kafka-postgres-host"))
viper.BindPFlag("storage.main.kafka.postgres.port", rootCmd.PersistentFlags().Lookup("storage-main-kafka-postgres-port"))
viper.BindPFlag("storage.main.kafka.postgres.username", rootCmd.PersistentFlags().Lookup("storage-main-kafka-postgres-username"))
viper.BindPFlag("storage.main.kafka.postgres.password", rootCmd.PersistentFlags().Lookup("storage-main-kafka-postgres-password"))
viper.BindPFlag("storage.main.kafka.postgres.database", rootCmd.PersistentFlags().Lookup("storage-main-kafka-postgres-database"))
viper.BindPFlag("storage.main.kafka.postgres.sslMode", rootCmd.PersistentFlags().Lookup("storage-main-kafka-postgres-sslMode"))
viper.BindPFlag("storage.main.kafka.postgres.maxOpenConns", rootCmd.PersistentFlags().Lookup("storage-main-kafka-postgres-maxOpenConns"))
viper.BindPFlag("storage.main.kafka.postgres.maxIdleConns", rootCmd.PersistentFlags().Lookup("storage-main-kafka-postgres-maxIdleConns"))
viper.BindPFlag("api.host", rootCmd.PersistentFlags().Lookup("api-host"))
viper.BindPFlag("api.basicAuth.username", rootCmd.PersistentFlags().Lookup("api-basicAuth-username"))
viper.BindPFlag("api.basicAuth.password", rootCmd.PersistentFlags().Lookup("api-basicAuth-password"))
Expand All @@ -253,6 +280,9 @@ func init() {
viper.BindPFlag("publisher.enabled", rootCmd.PersistentFlags().Lookup("publisher-enabled"))
viper.BindPFlag("publisher.mode", rootCmd.PersistentFlags().Lookup("publisher-mode"))
viper.BindPFlag("publisher.brokers", rootCmd.PersistentFlags().Lookup("publisher-brokers"))
viper.BindPFlag("publisher.username", rootCmd.PersistentFlags().Lookup("publisher-username"))
viper.BindPFlag("publisher.password", rootCmd.PersistentFlags().Lookup("publisher-password"))
viper.BindPFlag("publisher.enable_tls", rootCmd.PersistentFlags().Lookup("publisher-enable-tls"))
viper.BindPFlag("publisher.blocks.enabled", rootCmd.PersistentFlags().Lookup("publisher-blocks-enabled"))
viper.BindPFlag("publisher.blocks.topicName", rootCmd.PersistentFlags().Lookup("publisher-blocks-topicName"))
viper.BindPFlag("publisher.transactions.enabled", rootCmd.PersistentFlags().Lookup("publisher-transactions-enabled"))
Expand Down
10 changes: 10 additions & 0 deletions configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const (
type StorageConnectionConfig struct {
Clickhouse *ClickhouseConfig `mapstructure:"clickhouse"`
Postgres *PostgresConfig `mapstructure:"postgres"`
Kafka *KafkaConfig `mapstructure:"kafka"`
}

type TableConfig struct {
Expand Down Expand Up @@ -100,6 +101,14 @@ type PostgresConfig struct {
ConnectTimeout int `mapstructure:"connectTimeout"`
}

type KafkaConfig struct {
Brokers string `mapstructure:"brokers"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
EnableTLS bool `mapstructure:"enable_tls"`
Postgres *PostgresConfig `mapstructure:"postgres"`
}

type RPCBatchRequestConfig struct {
BlocksPerRequest int `mapstructure:"blocksPerRequest"`
BatchDelay int `mapstructure:"batchDelay"`
Expand Down Expand Up @@ -176,6 +185,7 @@ type PublisherConfig struct {
Brokers string `mapstructure:"brokers"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
EnableTLS bool `mapstructure:"enable_tls"`
Blocks BlockPublisherConfig `mapstructure:"blocks"`
Transactions TransactionPublisherConfig `mapstructure:"transactions"`
Traces TracePublisherConfig `mapstructure:"traces"`
Expand Down
9 changes: 5 additions & 4 deletions internal/common/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,11 @@ type BlockModel struct {
}

type BlockData struct {
Block Block
Transactions []Transaction
Logs []Log
Traces []Trace
ChainId uint64 `json:"chain_id"`
Block Block `json:"block"`
Transactions []Transaction `json:"transactions"`
Logs []Log `json:"logs"`
Traces []Trace `json:"traces"`
}

type BlockHeader struct {
Expand Down
1 change: 1 addition & 0 deletions internal/orchestrator/committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ func TestHandleGap(t *testing.T) {
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{
Blocks: 5,
})
mockRPC.EXPECT().GetChainID().Return(big.NewInt(1))
mockRPC.EXPECT().GetFullBlocks(context.Background(), []*big.Int{big.NewInt(100), big.NewInt(101), big.NewInt(102), big.NewInt(103), big.NewInt(104)}).Return([]rpc.GetFullBlockResult{
{BlockNumber: big.NewInt(100), Data: common.BlockData{Block: common.Block{Number: big.NewInt(100)}}},
{BlockNumber: big.NewInt(101), Data: common.BlockData{Block: common.Block{Number: big.NewInt(101)}}},
Expand Down
1 change: 1 addition & 0 deletions internal/orchestrator/failure_recoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func (fr *FailureRecoverer) handleWorkerResults(blockFailures []common.BlockFail
})
} else {
successfulResults = append(successfulResults, common.BlockData{
ChainId: fr.rpc.GetChainID().Uint64(),
Block: result.Data.Block,
Logs: result.Data.Logs,
Transactions: result.Data.Transactions,
Expand Down
1 change: 1 addition & 0 deletions internal/orchestrator/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ func (p *Poller) convertPollResultsToBlockData(results []rpc.GetFullBlockResult)
blockData := make([]common.BlockData, 0, len(successfulResults))
for _, result := range successfulResults {
blockData = append(blockData, common.BlockData{
ChainId: p.rpc.GetChainID().Uint64(),
Block: result.Data.Block,
Logs: result.Data.Logs,
Transactions: result.Data.Transactions,
Expand Down
1 change: 1 addition & 0 deletions internal/orchestrator/reorg_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ func (rh *ReorgHandler) handleReorg(ctx context.Context, reorgedBlockNumbers []*
return fmt.Errorf("cannot fix reorg: failed block %s: %w", result.BlockNumber.String(), result.Error)
}
data = append(data, common.BlockData{
ChainId: rh.rpc.GetChainID().Uint64(),
Block: result.Data.Block,
Logs: result.Data.Logs,
Transactions: result.Data.Transactions,
Expand Down
3 changes: 3 additions & 0 deletions internal/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ func (p *Publisher) initialize() error {
User: config.Cfg.Publisher.Username,
Pass: config.Cfg.Publisher.Password,
}.AsMechanism()))
}

if config.Cfg.Publisher.EnableTLS {
tlsDialer := &tls.Dialer{NetDialer: &net.Dialer{Timeout: 10 * time.Second}}
opts = append(opts, kgo.Dialer(tlsDialer.DialContext))
}
Comment on lines +81 to 84
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Prefer kgo.DialTLSConfig with an explicit tls.Config

Use franz-go’s TLS option directly and set a sane minimum TLS version. The current net.Dialer path provides less control.

- if config.Cfg.Publisher.EnableTLS {
-   tlsDialer := &tls.Dialer{NetDialer: &net.Dialer{Timeout: 10 * time.Second}}
-   opts = append(opts, kgo.Dialer(tlsDialer.DialContext))
- }
+ if config.Cfg.Publisher.EnableTLS {
+   tlsConf := &tls.Config{MinVersion: tls.VersionTLS12}
+   opts = append(opts, kgo.DialTLSConfig(tlsConf))
+ }
🤖 Prompt for AI Agents
internal/publisher/publisher.go around lines 81 to 84: replace the current use
of a raw net.Dialer option with franz-go's TLS dialer option by using
kgo.DialTLSConfig and an explicit tls.Config; construct a tls.Config with a sane
minimum version (e.g. MinVersion: tls.VersionTLS12), set any needed ServerName
or certificate fields from your config, and perform the actual connection using
a net.Dialer with the existing 10s timeout inside the DialTLSConfig callback so
you retain the timeout while giving you full control over TLS settings.

Expand Down
2 changes: 2 additions & 0 deletions internal/storage/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -1959,6 +1959,7 @@ func (c *ClickHouseConnector) GetValidationBlockData(chainId *big.Int, startBloc
for i, block := range blocksResult.blocks {
blockNum := block.Number.String()
blockData[i] = common.BlockData{
ChainId: chainId.Uint64(),
Block: block,
Logs: logsResult.logMap[blockNum],
Transactions: txsResult.txMap[blockNum],
Expand Down Expand Up @@ -2138,6 +2139,7 @@ func (c *ClickHouseConnector) GetFullBlockData(chainId *big.Int, blockNumbers []
for i, block := range blocksResult.blocks {
blockNum := block.Number.String()
blockData[i] = common.BlockData{
ChainId: chainId.Uint64(),
Block: block,
Logs: logsResult.logMap[blockNum],
Transactions: txsResult.txMap[blockNum],
Expand Down
4 changes: 3 additions & 1 deletion internal/storage/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,9 @@ func NewStorageConnector(cfg *config.StorageConfig) (IStorage, error) {
func NewConnector[T any](cfg *config.StorageConnectionConfig) (T, error) {
var conn interface{}
var err error
if cfg.Postgres != nil {
if cfg.Kafka != nil {
conn, err = NewKafkaPostgresConnector(cfg.Kafka)
} else if cfg.Postgres != nil {
conn, err = NewPostgresConnector(cfg.Postgres)
} else if cfg.Clickhouse != nil {
conn, err = NewClickHouseConnector(cfg.Clickhouse)
Expand Down
Loading