Skip to content

Commit 92a35ab

Browse files
committed
Badger & S3
1 parent 3b9f694 commit 92a35ab

File tree

12 files changed

+1880
-78
lines changed

12 files changed

+1880
-78
lines changed

cmd/orchestrator.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,19 @@ func RunOrchestrator(cmd *cobra.Command, args []string) {
3232
if err != nil {
3333
log.Fatal().Err(err).Msg("Failed to create orchestrator")
3434
}
35+
3536
// Start Prometheus metrics server
3637
log.Info().Msg("Starting Metrics Server on port 2112")
3738
go func() {
3839
http.Handle("/metrics", promhttp.Handler())
39-
http.ListenAndServe(":2112", nil)
40+
if err := http.ListenAndServe(":2112", nil); err != nil {
41+
log.Error().Err(err).Msg("Metrics server error")
42+
}
4043
}()
4144

45+
// Start orchestrator (blocks until shutdown)
46+
// The orchestrator handles signals internally and coordinates shutdown
4247
orchestrator.Start()
48+
49+
log.Info().Msg("Shutdown complete")
4350
}

cmd/root.go

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,29 @@ func init() {
135135
rootCmd.PersistentFlags().Int("storage-main-kafka-redis-port", 6379, "Redis port for Kafka main storage metadata")
136136
rootCmd.PersistentFlags().String("storage-main-kafka-redis-password", "", "Redis password for Kafka main storage metadata")
137137
rootCmd.PersistentFlags().Int("storage-main-kafka-redis-db", 0, "Redis database number for Kafka main storage metadata")
138+
// Storage type selection flags
139+
rootCmd.PersistentFlags().String("storage-staging-type", "auto", "Storage type for staging (auto, clickhouse, postgres, kafka, badger, s3)")
140+
rootCmd.PersistentFlags().String("storage-main-type", "auto", "Storage type for main (auto, clickhouse, postgres, kafka, badger, s3)")
141+
rootCmd.PersistentFlags().String("storage-orchestrator-type", "auto", "Storage type for orchestrator (auto, clickhouse, postgres, badger)")
142+
// BadgerDB flags for staging storage
143+
rootCmd.PersistentFlags().String("storage-staging-badger-path", "", "BadgerDB path for staging storage")
144+
// BadgerDB flags for orchestrator storage
145+
rootCmd.PersistentFlags().String("storage-orchestrator-badger-path", "", "BadgerDB path for orchestrator storage")
146+
// S3 flags for main storage
147+
rootCmd.PersistentFlags().String("storage-main-s3-bucket", "", "S3 bucket for main storage")
148+
rootCmd.PersistentFlags().String("storage-main-s3-region", "", "S3 region for main storage")
149+
rootCmd.PersistentFlags().String("storage-main-s3-prefix", "", "S3 key prefix for main storage")
150+
rootCmd.PersistentFlags().String("storage-main-s3-accessKeyId", "", "S3 access key ID for main storage")
151+
rootCmd.PersistentFlags().String("storage-main-s3-secretAccessKey", "", "S3 secret access key for main storage")
152+
rootCmd.PersistentFlags().String("storage-main-s3-endpoint", "", "S3 endpoint URL for main storage (for S3-compatible services)")
153+
rootCmd.PersistentFlags().String("storage-main-s3-format", "parquet", "S3 storage format for main storage (parquet or json)")
154+
rootCmd.PersistentFlags().Int64("storage-main-s3-bufferSizeMB", 1024, "S3 buffer size in MB before flush for main storage")
155+
rootCmd.PersistentFlags().Int("storage-main-s3-bufferTimeoutSeconds", 300, "S3 buffer timeout in seconds before flush for main storage")
156+
rootCmd.PersistentFlags().Int("storage-main-s3-maxBlocksPerFile", 0, "S3 max blocks per file for main storage (0 = no limit)")
157+
// S3 Parquet configuration
158+
rootCmd.PersistentFlags().String("storage-main-s3-parquet-compression", "snappy", "Parquet compression type for S3 main storage")
159+
rootCmd.PersistentFlags().Int64("storage-main-s3-parquet-rowGroupSize", 256, "Parquet row group size in MB for S3 main storage")
160+
rootCmd.PersistentFlags().Int64("storage-main-s3-parquet-pageSize", 8192, "Parquet page size in KB for S3 main storage")
138161
rootCmd.PersistentFlags().String("api-host", "localhost:3000", "API host")
139162
rootCmd.PersistentFlags().String("api-basicAuth-username", "", "API basic auth username")
140163
rootCmd.PersistentFlags().String("api-basicAuth-password", "", "API basic auth password")
@@ -260,11 +283,29 @@ func init() {
260283
viper.BindPFlag("storage.main.kafka.brokers", rootCmd.PersistentFlags().Lookup("storage-main-kafka-brokers"))
261284
viper.BindPFlag("storage.main.kafka.username", rootCmd.PersistentFlags().Lookup("storage-main-kafka-username"))
262285
viper.BindPFlag("storage.main.kafka.password", rootCmd.PersistentFlags().Lookup("storage-main-kafka-password"))
263-
viper.BindPFlag("storage.main.kafka.enable_tls", rootCmd.PersistentFlags().Lookup("storage-main-kafka-enable-tls"))
286+
viper.BindPFlag("storage.main.kafka.enableTLS", rootCmd.PersistentFlags().Lookup("storage-main-kafka-enable-tls"))
264287
viper.BindPFlag("storage.main.kafka.redis.host", rootCmd.PersistentFlags().Lookup("storage-main-kafka-redis-host"))
265288
viper.BindPFlag("storage.main.kafka.redis.port", rootCmd.PersistentFlags().Lookup("storage-main-kafka-redis-port"))
266289
viper.BindPFlag("storage.main.kafka.redis.password", rootCmd.PersistentFlags().Lookup("storage-main-kafka-redis-password"))
267290
viper.BindPFlag("storage.main.kafka.redis.db", rootCmd.PersistentFlags().Lookup("storage-main-kafka-redis-db"))
291+
viper.BindPFlag("storage.staging.type", rootCmd.PersistentFlags().Lookup("storage-staging-type"))
292+
viper.BindPFlag("storage.main.type", rootCmd.PersistentFlags().Lookup("storage-main-type"))
293+
viper.BindPFlag("storage.orchestrator.type", rootCmd.PersistentFlags().Lookup("storage-orchestrator-type"))
294+
viper.BindPFlag("storage.staging.badger.path", rootCmd.PersistentFlags().Lookup("storage-staging-badger-path"))
295+
viper.BindPFlag("storage.orchestrator.badger.path", rootCmd.PersistentFlags().Lookup("storage-orchestrator-badger-path"))
296+
viper.BindPFlag("storage.main.s3.bucket", rootCmd.PersistentFlags().Lookup("storage-main-s3-bucket"))
297+
viper.BindPFlag("storage.main.s3.region", rootCmd.PersistentFlags().Lookup("storage-main-s3-region"))
298+
viper.BindPFlag("storage.main.s3.prefix", rootCmd.PersistentFlags().Lookup("storage-main-s3-prefix"))
299+
viper.BindPFlag("storage.main.s3.accessKeyId", rootCmd.PersistentFlags().Lookup("storage-main-s3-accessKeyId"))
300+
viper.BindPFlag("storage.main.s3.secretAccessKey", rootCmd.PersistentFlags().Lookup("storage-main-s3-secretAccessKey"))
301+
viper.BindPFlag("storage.main.s3.endpoint", rootCmd.PersistentFlags().Lookup("storage-main-s3-endpoint"))
302+
viper.BindPFlag("storage.main.s3.format", rootCmd.PersistentFlags().Lookup("storage-main-s3-format"))
303+
viper.BindPFlag("storage.main.s3.bufferSizeMB", rootCmd.PersistentFlags().Lookup("storage-main-s3-bufferSizeMB"))
304+
viper.BindPFlag("storage.main.s3.bufferTimeoutSeconds", rootCmd.PersistentFlags().Lookup("storage-main-s3-bufferTimeoutSeconds"))
305+
viper.BindPFlag("storage.main.s3.maxBlocksPerFile", rootCmd.PersistentFlags().Lookup("storage-main-s3-maxBlocksPerFile"))
306+
viper.BindPFlag("storage.main.s3.parquet.compression", rootCmd.PersistentFlags().Lookup("storage-main-s3-parquet-compression"))
307+
viper.BindPFlag("storage.main.s3.parquet.rowGroupSize", rootCmd.PersistentFlags().Lookup("storage-main-s3-parquet-rowGroupSize"))
308+
viper.BindPFlag("storage.main.s3.parquet.pageSize", rootCmd.PersistentFlags().Lookup("storage-main-s3-parquet-pageSize"))
268309
viper.BindPFlag("api.host", rootCmd.PersistentFlags().Lookup("api-host"))
269310
viper.BindPFlag("api.basicAuth.username", rootCmd.PersistentFlags().Lookup("api-basicAuth-username"))
270311
viper.BindPFlag("api.basicAuth.password", rootCmd.PersistentFlags().Lookup("api-basicAuth-password"))
@@ -280,7 +321,7 @@ func init() {
280321
viper.BindPFlag("publisher.brokers", rootCmd.PersistentFlags().Lookup("publisher-brokers"))
281322
viper.BindPFlag("publisher.username", rootCmd.PersistentFlags().Lookup("publisher-username"))
282323
viper.BindPFlag("publisher.password", rootCmd.PersistentFlags().Lookup("publisher-password"))
283-
viper.BindPFlag("publisher.enable_tls", rootCmd.PersistentFlags().Lookup("publisher-enable-tls"))
324+
viper.BindPFlag("publisher.enableTLS", rootCmd.PersistentFlags().Lookup("publisher-enable-tls"))
284325
viper.BindPFlag("publisher.blocks.enabled", rootCmd.PersistentFlags().Lookup("publisher-blocks-enabled"))
285326
viper.BindPFlag("publisher.blocks.topicName", rootCmd.PersistentFlags().Lookup("publisher-blocks-topicName"))
286327
viper.BindPFlag("publisher.transactions.enabled", rootCmd.PersistentFlags().Lookup("publisher-transactions-enabled"))

configs/config.go

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,37 @@ const (
6060
)
6161

6262
type StorageConnectionConfig struct {
63+
Type string `mapstructure:"type"` // "auto", "clickhouse", "postgres", "kafka", "badger", "s3"
6364
Clickhouse *ClickhouseConfig `mapstructure:"clickhouse"`
6465
Postgres *PostgresConfig `mapstructure:"postgres"`
6566
Kafka *KafkaConfig `mapstructure:"kafka"`
67+
Badger *BadgerConfig `mapstructure:"badger"`
68+
S3 *S3Config `mapstructure:"s3"`
69+
}
70+
71+
type BadgerConfig struct {
72+
Path string `mapstructure:"path"`
73+
}
74+
75+
type S3Config struct {
76+
Bucket string `mapstructure:"bucket"`
77+
Region string `mapstructure:"region"`
78+
Prefix string `mapstructure:"prefix"`
79+
AccessKeyID string `mapstructure:"accessKeyId"`
80+
SecretAccessKey string `mapstructure:"secretAccessKey"`
81+
Endpoint string `mapstructure:"endpoint"`
82+
Format string `mapstructure:"format"`
83+
Parquet *ParquetConfig `mapstructure:"parquet"`
84+
// Buffering configuration
85+
BufferSize int64 `mapstructure:"bufferSizeMB"` // Target buffer size in MB before flush (default 1024 MB = 1GB)
86+
BufferTimeout int `mapstructure:"bufferTimeoutSeconds"` // Max time in seconds before flush (default 300 = 5 min)
87+
MaxBlocksPerFile int `mapstructure:"maxBlocksPerFile"` // Max blocks per parquet file (0 = no limit, only size/timeout triggers)
88+
}
89+
90+
type ParquetConfig struct {
91+
Compression string `mapstructure:"compression"`
92+
RowGroupSize int64 `mapstructure:"rowGroupSize"`
93+
PageSize int64 `mapstructure:"pageSize"`
6694
}
6795

6896
type TableConfig struct {
@@ -113,7 +141,7 @@ type KafkaConfig struct {
113141
Brokers string `mapstructure:"brokers"`
114142
Username string `mapstructure:"username"`
115143
Password string `mapstructure:"password"`
116-
EnableTLS bool `mapstructure:"enable_tls"`
144+
EnableTLS bool `mapstructure:"enableTLS"`
117145
Redis *RedisConfig `mapstructure:"redis"`
118146
}
119147

@@ -193,7 +221,7 @@ type PublisherConfig struct {
193221
Brokers string `mapstructure:"brokers"`
194222
Username string `mapstructure:"username"`
195223
Password string `mapstructure:"password"`
196-
EnableTLS bool `mapstructure:"enable_tls"`
224+
EnableTLS bool `mapstructure:"enableTLS"`
197225
Blocks BlockPublisherConfig `mapstructure:"blocks"`
198226
Transactions TransactionPublisherConfig `mapstructure:"transactions"`
199227
Traces TracePublisherConfig `mapstructure:"traces"`

go.mod

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,20 @@ go 1.23.0
44

55
require (
66
github.com/ClickHouse/clickhouse-go/v2 v2.36.0
7+
github.com/aws/aws-sdk-go-v2 v1.38.0
8+
github.com/aws/aws-sdk-go-v2/config v1.31.0
9+
github.com/aws/aws-sdk-go-v2/service/s3 v1.87.0
10+
github.com/dgraph-io/badger/v4 v4.8.0
711
github.com/ethereum/go-ethereum v1.15.11
812
github.com/gin-gonic/gin v1.10.0
913
github.com/gorilla/schema v1.4.1
1014
github.com/holiman/uint256 v1.3.2
1115
github.com/lib/pq v1.10.9
16+
github.com/parquet-go/parquet-go v0.25.1
1217
github.com/prometheus/client_golang v1.20.4
18+
github.com/redis/go-redis/v9 v9.12.1
1319
github.com/rs/zerolog v1.33.0
14-
github.com/spf13/cobra v1.8.1
20+
github.com/spf13/cobra v1.9.1
1521
github.com/spf13/viper v1.18.0
1622
github.com/stretchr/testify v1.10.0
1723
github.com/swaggo/files v1.0.1
@@ -25,6 +31,21 @@ require (
2531
github.com/KyleBanks/depth v1.2.1 // indirect
2632
github.com/Microsoft/go-winio v0.6.2 // indirect
2733
github.com/andybalholm/brotli v1.1.1 // indirect
34+
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.0 // indirect
35+
github.com/aws/aws-sdk-go-v2/credentials v1.18.4 // indirect
36+
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.3 // indirect
37+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.3 // indirect
38+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.3 // indirect
39+
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect
40+
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.3 // indirect
41+
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.0 // indirect
42+
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.8.3 // indirect
43+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.3 // indirect
44+
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.3 // indirect
45+
github.com/aws/aws-sdk-go-v2/service/sso v1.28.0 // indirect
46+
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.33.0 // indirect
47+
github.com/aws/aws-sdk-go-v2/service/sts v1.37.0 // indirect
48+
github.com/aws/smithy-go v1.22.5 // indirect
2849
github.com/beorn7/perks v1.0.1 // indirect
2950
github.com/bits-and-blooms/bitset v1.20.0 // indirect
3051
github.com/bytedance/sonic v1.12.6 // indirect
@@ -39,14 +60,18 @@ require (
3960
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
4061
github.com/deckarep/golang-set/v2 v2.6.0 // indirect
4162
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
63+
github.com/dgraph-io/ristretto/v2 v2.2.0 // indirect
4264
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
65+
github.com/dustin/go-humanize v1.0.1 // indirect
4366
github.com/ethereum/c-kzg-4844/v2 v2.1.0 // indirect
4467
github.com/ethereum/go-verkle v0.2.2 // indirect
4568
github.com/fsnotify/fsnotify v1.7.0 // indirect
4669
github.com/gabriel-vasile/mimetype v1.4.7 // indirect
4770
github.com/gin-contrib/sse v0.1.0 // indirect
4871
github.com/go-faster/city v1.0.1 // indirect
4972
github.com/go-faster/errors v0.7.1 // indirect
73+
github.com/go-logr/logr v1.4.3 // indirect
74+
github.com/go-logr/stdr v1.2.2 // indirect
5075
github.com/go-ole/go-ole v1.3.0 // indirect
5176
github.com/go-openapi/jsonpointer v0.21.0 // indirect
5277
github.com/go-openapi/jsonreference v0.21.0 // indirect
@@ -58,6 +83,7 @@ require (
5883
github.com/goccy/go-json v0.10.4 // indirect
5984
github.com/gofrs/flock v0.8.1 // indirect
6085
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
86+
github.com/google/flatbuffers v25.2.10+incompatible // indirect
6187
github.com/google/uuid v1.6.0 // indirect
6288
github.com/gorilla/websocket v1.4.2 // indirect
6389
github.com/hashicorp/hcl v1.0.0 // indirect
@@ -86,7 +112,6 @@ require (
86112
github.com/prometheus/client_model v0.6.1 // indirect
87113
github.com/prometheus/common v0.55.0 // indirect
88114
github.com/prometheus/procfs v0.15.1 // indirect
89-
github.com/redis/go-redis/v9 v9.12.1 // indirect
90115
github.com/rivo/uniseg v0.2.0 // indirect
91116
github.com/sagikazarmark/locafero v0.4.0 // indirect
92117
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
@@ -96,7 +121,7 @@ require (
96121
github.com/sourcegraph/conc v0.3.0 // indirect
97122
github.com/spf13/afero v1.11.0 // indirect
98123
github.com/spf13/cast v1.6.0 // indirect
99-
github.com/spf13/pflag v1.0.5 // indirect
124+
github.com/spf13/pflag v1.0.6 // indirect
100125
github.com/stretchr/objx v0.5.2 // indirect
101126
github.com/subosito/gotenv v1.6.0 // indirect
102127
github.com/supranational/blst v0.3.14 // indirect
@@ -106,18 +131,20 @@ require (
106131
github.com/twmb/franz-go/pkg/kmsg v1.9.0 // indirect
107132
github.com/ugorji/go/codec v1.2.12 // indirect
108133
github.com/yusufpapurcu/wmi v1.2.4 // indirect
109-
go.opentelemetry.io/otel v1.36.0 // indirect
110-
go.opentelemetry.io/otel/trace v1.36.0 // indirect
134+
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
135+
go.opentelemetry.io/otel v1.37.0 // indirect
136+
go.opentelemetry.io/otel/metric v1.37.0 // indirect
137+
go.opentelemetry.io/otel/trace v1.37.0 // indirect
111138
go.uber.org/multierr v1.11.0 // indirect
112139
golang.org/x/arch v0.12.0 // indirect
113-
golang.org/x/crypto v0.38.0 // indirect
140+
golang.org/x/crypto v0.39.0 // indirect
114141
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect
115-
golang.org/x/net v0.40.0 // indirect
116-
golang.org/x/sync v0.14.0 // indirect
117-
golang.org/x/sys v0.33.0 // indirect
118-
golang.org/x/text v0.25.0 // indirect
119-
golang.org/x/tools v0.30.0 // indirect
120-
google.golang.org/protobuf v1.36.1 // indirect
142+
golang.org/x/net v0.41.0 // indirect
143+
golang.org/x/sync v0.15.0 // indirect
144+
golang.org/x/sys v0.34.0 // indirect
145+
golang.org/x/text v0.26.0 // indirect
146+
golang.org/x/tools v0.33.0 // indirect
147+
google.golang.org/protobuf v1.36.6 // indirect
121148
gopkg.in/ini.v1 v1.67.0 // indirect
122149
gopkg.in/yaml.v3 v3.0.1 // indirect
123150
rsc.io/tmplfunc v0.0.3 // indirect

0 commit comments

Comments
 (0)