Skip to content
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
d3c7146
Initial kafka committer
jakeloo Aug 13, 2025
cbbea07
Update config
jakeloo Aug 13, 2025
4775d57
Error on uninitialize brokers
jakeloo Aug 13, 2025
661b150
Update queries
jakeloo Aug 14, 2025
e152d5b
Option to disable TLS for kafka
jakeloo Aug 14, 2025
e61fae7
Add projection mode in blocks
jakeloo Aug 14, 2025
fc2ae64
Fix publish parallel mode
jakeloo Aug 14, 2025
191298b
Gofmt
jakeloo Aug 14, 2025
e45907a
Update schema
jakeloo Aug 15, 2025
ceeac3b
Fix backfill table
jakeloo Aug 15, 2025
64aaec5
Update kafka storage producer
jakeloo Aug 15, 2025
20dc471
Kafka + Redis
jakeloo Aug 15, 2025
0bf3097
Update schema payload
jakeloo Aug 18, 2025
6f2b72d
Merge branch 'main' into jl/commit-kafka
jakeloo Aug 19, 2025
cd434a2
Update kafka-postgres -> kafka-redis config
jakeloo Aug 19, 2025
4fd141d
Update schema to use replacing merge tree
jakeloo Aug 20, 2025
3b9f694
Fix schema
jakeloo Aug 20, 2025
92a35ab
Badger & S3
jakeloo Aug 22, 2025
eea71f4
Until block for committer
jakeloo Aug 22, 2025
68087a0
terminate when poller or committer exit
jakeloo Aug 25, 2025
c0ba962
Fix commit until block
jakeloo Aug 26, 2025
51f1398
Don't cancel active tasks in poller
jakeloo Aug 26, 2025
debc231
migrate with destination storage
jakeloo Aug 26, 2025
bddbf54
Remove RPC batch config in migrate
jakeloo Aug 26, 2025
43dad9c
Cleanup
jakeloo Aug 26, 2025
da31422
Add from_address, to_address to schema
jakeloo Aug 26, 2025
b698c18
Retry with RPC batch size reduction
jakeloo Aug 26, 2025
96dc60b
Shuffle Orchestrator and Staging interface
jakeloo Aug 26, 2025
41cc98d
store poller in committer
jakeloo Aug 26, 2025
70ea871
Simplified storage. Split kafka and redis
jakeloo Aug 26, 2025
f920a71
kafka requires orchestrator
jakeloo Aug 26, 2025
32eece5
Fix orchestrator flag
jakeloo Aug 26, 2025
e35ff76
Fix badger keys
jakeloo Aug 26, 2025
6233232
Fix backfill missing blocks in staging
jakeloo Aug 26, 2025
31d923f
Revert "Fix backfill missing blocks in staging"
jakeloo Aug 26, 2025
7cb6ff1
block buffer
jakeloo Aug 27, 2025
59aad94
Poller S3 support
jakeloo Aug 27, 2025
884a3aa
Fix boundaries for migration
jakeloo Aug 27, 2025
86f3d68
Badger for caching in s3 connector
jakeloo Aug 27, 2025
4595fa6
optimize s3 insertion
jakeloo Aug 27, 2025
136a346
redis tls. erc1155 batch mv
jakeloo Aug 28, 2025
b9828af
fix projections, use _part_offset projections
jakeloo Aug 28, 2025
69f5f78
gofmt
jakeloo Aug 28, 2025
e551c17
Fix test
jakeloo Aug 28, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
317 changes: 198 additions & 119 deletions cmd/migrate_valid.go

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion cmd/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,19 @@ func RunOrchestrator(cmd *cobra.Command, args []string) {
if err != nil {
log.Fatal().Err(err).Msg("Failed to create orchestrator")
}

// Start Prometheus metrics server
log.Info().Msg("Starting Metrics Server on port 2112")
go func() {
http.Handle("/metrics", promhttp.Handler())
http.ListenAndServe(":2112", nil)
if err := http.ListenAndServe(":2112", nil); err != nil {
log.Error().Err(err).Msg("Metrics server error")
}
}()

// Start orchestrator (blocks until shutdown)
// The orchestrator handles signals internally and coordinates shutdown
orchestrator.Start()

log.Info().Msg("Shutdown complete")
}
144 changes: 144 additions & 0 deletions cmd/root.go

Large diffs are not rendered by default.

62 changes: 55 additions & 7 deletions configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type CommitterConfig struct {
Interval int `mapstructure:"interval"`
BlocksPerCommit int `mapstructure:"blocksPerCommit"`
FromBlock int `mapstructure:"fromBlock"`
UntilBlock int `mapstructure:"untilBlock"`
}

type ReorgHandlerConfig struct {
Expand All @@ -51,17 +52,39 @@ type StorageConfig struct {
Main StorageConnectionConfig `mapstructure:"main"`
Orchestrator StorageConnectionConfig `mapstructure:"orchestrator"`
}
type StorageType string

const (
StorageTypeMain StorageType = "main"
StorageTypeStaging StorageType = "staging"
StorageTypeOrchestrator StorageType = "orchestrator"
)

type StorageConnectionConfig struct {
Type string `mapstructure:"type"` // "auto", "clickhouse", "postgres", "kafka", "badger", "s3"
Clickhouse *ClickhouseConfig `mapstructure:"clickhouse"`
Postgres *PostgresConfig `mapstructure:"postgres"`
Kafka *KafkaConfig `mapstructure:"kafka"`
Badger *BadgerConfig `mapstructure:"badger"`
S3 *S3Config `mapstructure:"s3"`
}

type BadgerConfig struct {
Path string `mapstructure:"path"`
}

type S3Config struct {
Bucket string `mapstructure:"bucket"`
Region string `mapstructure:"region"`
Prefix string `mapstructure:"prefix"`
AccessKeyID string `mapstructure:"accessKeyId"`
SecretAccessKey string `mapstructure:"secretAccessKey"`
Endpoint string `mapstructure:"endpoint"`
Format string `mapstructure:"format"`
Parquet *ParquetConfig `mapstructure:"parquet"`
// Buffering configuration
BufferSize int64 `mapstructure:"bufferSizeMB"` // Target buffer size in MB before flush (default 1024 MB = 1GB)
BufferTimeout int `mapstructure:"bufferTimeoutSeconds"` // Max time in seconds before flush (default 300 = 5 min)
MaxBlocksPerFile int `mapstructure:"maxBlocksPerFile"` // Max blocks per parquet file (0 = no limit, only size/timeout triggers)
}

type ParquetConfig struct {
Compression string `mapstructure:"compression"`
RowGroupSize int64 `mapstructure:"rowGroupSize"`
PageSize int64 `mapstructure:"pageSize"`
}

type TableConfig struct {
Expand All @@ -86,6 +109,7 @@ type ClickhouseConfig struct {
EnableParallelViewProcessing bool `mapstructure:"enableParallelViewProcessing"`
MaxQueryTime int `mapstructure:"maxQueryTime"`
MaxMemoryUsage int `mapstructure:"maxMemoryUsage"`
EnableCompression bool `mapstructure:"enableCompression"`
}

type PostgresConfig struct {
Expand All @@ -101,6 +125,21 @@ type PostgresConfig struct {
ConnectTimeout int `mapstructure:"connectTimeout"`
}

type RedisConfig struct {
Host string `mapstructure:"host"`
Port int `mapstructure:"port"`
Password string `mapstructure:"password"`
DB int `mapstructure:"db"`
}

type KafkaConfig struct {
Brokers string `mapstructure:"brokers"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
EnableTLS bool `mapstructure:"enableTLS"`
Redis *RedisConfig `mapstructure:"redis"`
}

type RPCBatchRequestConfig struct {
BlocksPerRequest int `mapstructure:"blocksPerRequest"`
BatchDelay int `mapstructure:"batchDelay"`
Expand Down Expand Up @@ -177,6 +216,7 @@ type PublisherConfig struct {
Brokers string `mapstructure:"brokers"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
EnableTLS bool `mapstructure:"enableTLS"`
Blocks BlockPublisherConfig `mapstructure:"blocks"`
Transactions TransactionPublisherConfig `mapstructure:"transactions"`
Traces TracePublisherConfig `mapstructure:"traces"`
Expand All @@ -192,6 +232,13 @@ type ValidationConfig struct {
Mode string `mapstructure:"mode"` // "disabled", "minimal", "strict"
}

type MigratorConfig struct {
Destination StorageConnectionConfig `mapstructure:"destination"`
StartBlock uint `mapstructure:"startBlock"`
EndBlock uint `mapstructure:"endBlock"`
BatchSize uint `mapstructure:"batchSize"`
}

type Config struct {
RPC RPCConfig `mapstructure:"rpc"`
Log LogConfig `mapstructure:"log"`
Expand All @@ -204,6 +251,7 @@ type Config struct {
Publisher PublisherConfig `mapstructure:"publisher"`
WorkMode WorkModeConfig `mapstructure:"workMode"`
Validation ValidationConfig `mapstructure:"validation"`
Migrator MigratorConfig `mapstructure:"migrator"`
}

var Cfg Config
Expand Down
51 changes: 40 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,20 @@ go 1.23.0

require (
github.com/ClickHouse/clickhouse-go/v2 v2.36.0
github.com/aws/aws-sdk-go-v2 v1.38.0
github.com/aws/aws-sdk-go-v2/config v1.31.0
github.com/aws/aws-sdk-go-v2/service/s3 v1.87.0
github.com/dgraph-io/badger/v4 v4.8.0
github.com/ethereum/go-ethereum v1.15.11
github.com/gin-gonic/gin v1.10.0
github.com/gorilla/schema v1.4.1
github.com/holiman/uint256 v1.3.2
github.com/lib/pq v1.10.9
github.com/parquet-go/parquet-go v0.25.1
github.com/prometheus/client_golang v1.20.4
github.com/redis/go-redis/v9 v9.12.1
github.com/rs/zerolog v1.33.0
github.com/spf13/cobra v1.8.1
github.com/spf13/cobra v1.9.1
github.com/spf13/viper v1.18.0
github.com/stretchr/testify v1.10.0
github.com/swaggo/files v1.0.1
Expand All @@ -25,6 +31,21 @@ require (
github.com/KyleBanks/depth v1.2.1 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/andybalholm/brotli v1.1.1 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.0 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.18.4 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.3 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.3 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.3 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.8.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.3 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.28.0 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.33.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.37.0 // indirect
github.com/aws/smithy-go v1.22.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.20.0 // indirect
github.com/bytedance/sonic v1.12.6 // indirect
Expand All @@ -39,13 +60,18 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/deckarep/golang-set/v2 v2.6.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/dgraph-io/ristretto/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/ethereum/c-kzg-4844/v2 v2.1.0 // indirect
github.com/ethereum/go-verkle v0.2.2 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.7 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-faster/city v1.0.1 // indirect
github.com/go-faster/errors v0.7.1 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.3.0 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/jsonreference v0.21.0 // indirect
Expand All @@ -57,6 +83,7 @@ require (
github.com/goccy/go-json v0.10.4 // indirect
github.com/gofrs/flock v0.8.1 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/google/flatbuffers v25.2.10+incompatible // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
Expand Down Expand Up @@ -94,7 +121,7 @@ require (
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/pflag v1.0.6 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/supranational/blst v0.3.14 // indirect
Expand All @@ -104,18 +131,20 @@ require (
github.com/twmb/franz-go/pkg/kmsg v1.9.0 // indirect
github.com/ugorji/go/codec v1.2.12 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.opentelemetry.io/otel v1.36.0 // indirect
go.opentelemetry.io/otel/trace v1.36.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/otel v1.37.0 // indirect
go.opentelemetry.io/otel/metric v1.37.0 // indirect
go.opentelemetry.io/otel/trace v1.37.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/arch v0.12.0 // indirect
golang.org/x/crypto v0.38.0 // indirect
golang.org/x/crypto v0.39.0 // indirect
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect
golang.org/x/net v0.40.0 // indirect
golang.org/x/sync v0.14.0 // indirect
golang.org/x/sys v0.33.0 // indirect
golang.org/x/text v0.25.0 // indirect
golang.org/x/tools v0.30.0 // indirect
google.golang.org/protobuf v1.36.1 // indirect
golang.org/x/net v0.41.0 // indirect
golang.org/x/sync v0.15.0 // indirect
golang.org/x/sys v0.34.0 // indirect
golang.org/x/text v0.26.0 // indirect
golang.org/x/tools v0.33.0 // indirect
google.golang.org/protobuf v1.36.6 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
rsc.io/tmplfunc v0.0.3 // indirect
Expand Down
Loading
Loading