Skip to content

Commit 59aad94

Browse files
committed
Poller S3 support
1 parent 7cb6ff1 commit 59aad94

File tree

8 files changed

+1496
-100
lines changed

8 files changed

+1496
-100
lines changed

cmd/root.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,19 @@ func init() {
5656
rootCmd.PersistentFlags().Bool("poller-force-from-block", false, "Force the poller to start from the block specified in `poller-from-block`")
5757
rootCmd.PersistentFlags().Int("poller-until-block", 0, "Until which block to poll")
5858
rootCmd.PersistentFlags().Int("poller-parallel-pollers", 5, "Maximum number of parallel pollers")
59+
rootCmd.PersistentFlags().String("poller-s3-bucket", "", "S3 bucket for poller archive source")
60+
rootCmd.PersistentFlags().String("poller-s3-region", "", "S3 region for poller archive source")
61+
rootCmd.PersistentFlags().String("poller-s3-prefix", "", "S3 prefix for poller archive source")
62+
rootCmd.PersistentFlags().String("poller-s3-accessKeyId", "", "S3 access key ID for poller archive source")
63+
rootCmd.PersistentFlags().String("poller-s3-secretAccessKey", "", "S3 secret access key for poller archive source")
64+
rootCmd.PersistentFlags().String("poller-s3-endpoint", "", "S3 endpoint for poller archive source (for S3-compatible services)")
65+
rootCmd.PersistentFlags().String("poller-s3-format", "parquet", "S3 storage format for poller archive source")
66+
rootCmd.PersistentFlags().String("poller-s3-cacheDir", "/tmp/insight-archive", "Local cache directory for poller archive source")
67+
rootCmd.PersistentFlags().Int("poller-s3-metadataTTL", 0, "Metadata cache TTL in seconds for poller archive source")
68+
rootCmd.PersistentFlags().Int("poller-s3-fileCacheTTL", 0, "File cache TTL in seconds for poller archive source")
69+
rootCmd.PersistentFlags().Int64("poller-s3-maxCacheSize", 0, "Max cache size in bytes for poller archive source (default 5GB)")
70+
rootCmd.PersistentFlags().Int("poller-s3-cleanupInterval", 0, "Cache cleanup interval in seconds for poller archive source")
71+
rootCmd.PersistentFlags().Int("poller-s3-maxConcurrentDownloads", 3, "Max concurrent downloads for poller archive source")
5972
rootCmd.PersistentFlags().Bool("committer-enabled", true, "Toggle committer")
6073
rootCmd.PersistentFlags().Int("committer-blocks-per-commit", 10, "How many blocks to commit each interval")
6174
rootCmd.PersistentFlags().Int("committer-interval", 1000, "How often to commit blocks in milliseconds")
@@ -247,6 +260,18 @@ func init() {
247260
viper.BindPFlag("poller.forceFromBlock", rootCmd.PersistentFlags().Lookup("poller-force-from-block"))
248261
viper.BindPFlag("poller.untilBlock", rootCmd.PersistentFlags().Lookup("poller-until-block"))
249262
viper.BindPFlag("poller.parallelPollers", rootCmd.PersistentFlags().Lookup("poller-parallel-pollers"))
263+
viper.BindPFlag("poller.s3.endpoint", rootCmd.PersistentFlags().Lookup("poller-s3-endpoint"))
264+
viper.BindPFlag("poller.s3.accessKeyId", rootCmd.PersistentFlags().Lookup("poller-s3-accessKeyId"))
265+
viper.BindPFlag("poller.s3.secretAccessKey", rootCmd.PersistentFlags().Lookup("poller-s3-secretAccessKey"))
266+
viper.BindPFlag("poller.s3.bucket", rootCmd.PersistentFlags().Lookup("poller-s3-bucket"))
267+
viper.BindPFlag("poller.s3.region", rootCmd.PersistentFlags().Lookup("poller-s3-region"))
268+
viper.BindPFlag("poller.s3.prefix", rootCmd.PersistentFlags().Lookup("poller-s3-prefix"))
269+
viper.BindPFlag("poller.s3.cacheDir", rootCmd.PersistentFlags().Lookup("poller-s3-cacheDir"))
270+
viper.BindPFlag("poller.s3.metadataTTL", rootCmd.PersistentFlags().Lookup("poller-s3-metadataTTL"))
271+
viper.BindPFlag("poller.s3.fileCacheTTL", rootCmd.PersistentFlags().Lookup("poller-s3-fileCacheTTL"))
272+
viper.BindPFlag("poller.s3.maxCacheSize", rootCmd.PersistentFlags().Lookup("poller-s3-maxCacheSize"))
273+
viper.BindPFlag("poller.s3.cleanupInterval", rootCmd.PersistentFlags().Lookup("poller-s3-cleanupInterval"))
274+
viper.BindPFlag("poller.s3.maxConcurrentDownloads", rootCmd.PersistentFlags().Lookup("poller-s3-maxConcurrentDownloads"))
250275
viper.BindPFlag("committer.enabled", rootCmd.PersistentFlags().Lookup("committer-enabled"))
251276
viper.BindPFlag("committer.blocksPerCommit", rootCmd.PersistentFlags().Lookup("committer-blocks-per-commit"))
252277
viper.BindPFlag("committer.interval", rootCmd.PersistentFlags().Lookup("committer-interval"))

configs/config.go

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"os"
77
"strings"
8+
"time"
89

910
"github.com/rs/zerolog/log"
1011
"github.com/spf13/viper"
@@ -16,13 +17,14 @@ type LogConfig struct {
1617
}
1718

1819
type PollerConfig struct {
19-
Enabled bool `mapstructure:"enabled"`
20-
Interval int `mapstructure:"interval"`
21-
BlocksPerPoll int `mapstructure:"blocksPerPoll"`
22-
FromBlock int `mapstructure:"fromBlock"`
23-
ForceFromBlock bool `mapstructure:"forceFromBlock"`
24-
UntilBlock int `mapstructure:"untilBlock"`
25-
ParallelPollers int `mapstructure:"parallelPollers"`
20+
Enabled bool `mapstructure:"enabled"`
21+
Interval int `mapstructure:"interval"`
22+
BlocksPerPoll int `mapstructure:"blocksPerPoll"`
23+
FromBlock int `mapstructure:"fromBlock"`
24+
ForceFromBlock bool `mapstructure:"forceFromBlock"`
25+
UntilBlock int `mapstructure:"untilBlock"`
26+
ParallelPollers int `mapstructure:"parallelPollers"`
27+
S3 *S3SourceConfig `mapstructure:"s3"`
2628
}
2729

2830
type CommitterConfig struct {
@@ -74,22 +76,26 @@ type StorageMainConfig struct {
7476
Postgres *PostgresConfig `mapstructure:"postgres"`
7577
Kafka *KafkaConfig `mapstructure:"kafka"`
7678
Badger *BadgerConfig `mapstructure:"badger"`
77-
S3 *S3Config `mapstructure:"s3"`
79+
S3 *S3StorageConfig `mapstructure:"s3"`
7880
}
7981

8082
type BadgerConfig struct {
8183
Path string `mapstructure:"path"`
8284
}
8385

8486
type S3Config struct {
85-
Bucket string `mapstructure:"bucket"`
86-
Region string `mapstructure:"region"`
87-
Prefix string `mapstructure:"prefix"`
88-
AccessKeyID string `mapstructure:"accessKeyId"`
89-
SecretAccessKey string `mapstructure:"secretAccessKey"`
90-
Endpoint string `mapstructure:"endpoint"`
91-
Format string `mapstructure:"format"`
92-
Parquet *ParquetConfig `mapstructure:"parquet"`
87+
Bucket string `mapstructure:"bucket"`
88+
Region string `mapstructure:"region"`
89+
Prefix string `mapstructure:"prefix"`
90+
AccessKeyID string `mapstructure:"accessKeyId"`
91+
SecretAccessKey string `mapstructure:"secretAccessKey"`
92+
Endpoint string `mapstructure:"endpoint"`
93+
}
94+
95+
type S3StorageConfig struct {
96+
S3Config `mapstructure:",squash"`
97+
Format string `mapstructure:"format"`
98+
Parquet *ParquetConfig `mapstructure:"parquet"`
9399
// Buffering configuration
94100
BufferSize int64 `mapstructure:"bufferSizeMB"` // Target buffer size in MB before flush (default 512 MB)
95101
BufferTimeout int `mapstructure:"bufferTimeoutSeconds"` // Max time in seconds before flush (default 300 = 5 min)
@@ -237,6 +243,16 @@ type PublisherConfig struct {
237243
Events EventPublisherConfig `mapstructure:"events"`
238244
}
239245

246+
type S3SourceConfig struct {
247+
S3Config `mapstructure:",squash"`
248+
CacheDir string `mapstructure:"cacheDir"`
249+
MetadataTTL time.Duration `mapstructure:"metadataTTL"`
250+
FileCacheTTL time.Duration `mapstructure:"fileCacheTTL"`
251+
MaxCacheSize int64 `mapstructure:"maxCacheSize"`
252+
CleanupInterval time.Duration `mapstructure:"cleanupInterval"`
253+
MaxConcurrentDownloads int `mapstructure:"maxConcurrentDownloads"`
254+
}
255+
240256
type WorkModeConfig struct {
241257
CheckIntervalMinutes int `mapstructure:"checkIntervalMinutes"`
242258
LiveModeThreshold int64 `mapstructure:"liveModeThreshold"`

internal/orchestrator/orchestrator.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ type Orchestrator struct {
2222
reorgHandlerEnabled bool
2323
cancel context.CancelFunc
2424
wg sync.WaitGroup
25-
shutdownOnce sync.Once
2625
}
2726

2827
func NewOrchestrator(rpc rpc.IRPCClient) (*Orchestrator, error) {
@@ -65,7 +64,10 @@ func (o *Orchestrator) Start() {
6564
workModeMonitor.RegisterChannel(pollerWorkModeChan)
6665
defer workModeMonitor.UnregisterChannel(pollerWorkModeChan)
6766

68-
poller := NewPoller(o.rpc, o.storage, WithPollerWorkModeChan(pollerWorkModeChan))
67+
poller := NewPoller(o.rpc, o.storage,
68+
WithPollerWorkModeChan(pollerWorkModeChan),
69+
WithPollerS3Source(config.Cfg.Poller.S3),
70+
)
6971
poller.Start(ctx)
7072

7173
log.Info().Msg("Poller completed")

internal/orchestrator/poller.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/thirdweb-dev/indexer/internal/common"
1313
"github.com/thirdweb-dev/indexer/internal/metrics"
1414
"github.com/thirdweb-dev/indexer/internal/rpc"
15+
"github.com/thirdweb-dev/indexer/internal/source"
1516
"github.com/thirdweb-dev/indexer/internal/storage"
1617
"github.com/thirdweb-dev/indexer/internal/worker"
1718
)
@@ -21,6 +22,7 @@ const DEFAULT_TRIGGER_INTERVAL = 1000
2122

2223
type Poller struct {
2324
rpc rpc.IRPCClient
25+
worker *worker.Worker
2426
blocksPerPoll int64
2527
triggerIntervalMs int64
2628
storage storage.IStorage
@@ -47,15 +49,33 @@ func WithPollerWorkModeChan(ch chan WorkMode) PollerOption {
4749
}
4850
}
4951

52+
func WithPollerS3Source(cfg *config.S3SourceConfig) PollerOption {
53+
return func(p *Poller) {
54+
if cfg == nil || cfg.Region == "" || cfg.Bucket == "" {
55+
return
56+
}
57+
58+
source, err := source.NewS3Source(cfg, p.rpc.GetChainID())
59+
if err != nil {
60+
log.Fatal().Err(err).Msg("Failed to create S3 source")
61+
}
62+
63+
log.Info().Msg("Poller S3 source configuration detected, setting up S3 source for poller")
64+
p.worker = worker.NewWorkerWithArchive(p.rpc, source)
65+
}
66+
}
67+
5068
func NewBoundlessPoller(rpc rpc.IRPCClient, storage storage.IStorage, opts ...PollerOption) *Poller {
5169
blocksPerPoll := config.Cfg.Poller.BlocksPerPoll
5270
if blocksPerPoll == 0 {
5371
blocksPerPoll = DEFAULT_BLOCKS_PER_POLL
5472
}
73+
5574
triggerInterval := config.Cfg.Poller.Interval
5675
if triggerInterval == 0 {
5776
triggerInterval = DEFAULT_TRIGGER_INTERVAL
5877
}
78+
5979
poller := &Poller{
6080
rpc: rpc,
6181
triggerIntervalMs: int64(triggerInterval),
@@ -68,6 +88,10 @@ func NewBoundlessPoller(rpc rpc.IRPCClient, storage storage.IStorage, opts ...Po
6888
opt(poller)
6989
}
7090

91+
if poller.worker == nil {
92+
poller.worker = worker.NewWorker(poller.rpc)
93+
}
94+
7195
return poller
7296
}
7397

@@ -235,8 +259,7 @@ func (p *Poller) PollWithoutSaving(ctx context.Context, blockNumbers []*big.Int)
235259
endBlockNumberFloat, _ := endBlock.Float64()
236260
metrics.PollerLastTriggeredBlock.Set(endBlockNumberFloat)
237261

238-
worker := worker.NewWorker(p.rpc)
239-
results := worker.Run(ctx, blockNumbers)
262+
results := p.worker.Run(ctx, blockNumbers)
240263
blockData, failedResults := p.convertPollResultsToBlockData(results)
241264
return blockData, failedResults
242265
}

0 commit comments

Comments
 (0)