diff --git a/configs/config.go b/configs/config.go index c94cc87..8147e80 100644 --- a/configs/config.go +++ b/configs/config.go @@ -60,6 +60,7 @@ type Config struct { CommitterMaxMemoryMB int `env:"COMMITTER_MAX_MEMORY_MB" envDefault:"512"` CommitterCompressionThresholdMB int `env:"COMMITTER_COMPRESSION_THRESHOLD_MB" envDefault:"50"` CommitterKafkaBatchSize int `env:"COMMITTER_KAFKA_BATCH_SIZE" envDefault:"500"` + CommitterIsLive bool `env:"COMMITTER_IS_LIVE" envDefault:"false"` StagingS3Bucket string `env:"STAGING_S3_BUCKET" envDefault:"thirdweb-insight-production"` StagingS3Region string `env:"STAGING_S3_REGION" envDefault:"us-west-2"` StagingS3AccessKeyID string `env:"STAGING_S3_ACCESS_KEY_ID"` diff --git a/internal/backfill/getbackfillboundaries.go b/internal/backfill/getbackfillboundaries.go index 6fd1508..3efbd6e 100644 --- a/internal/backfill/getbackfillboundaries.go +++ b/internal/backfill/getbackfillboundaries.go @@ -22,7 +22,7 @@ func GetBackfillBoundaries() (uint64, uint64) { if startBlock > endBlock { // since indexing is done, we call insight service to disable the indexer - DisableIndexerMaybeStartCommitter() + libs.DisableIndexerMaybeStartCommitter() // most likely this will not be called as this service will be paused. but a panic just incase log.Panic(). Uint64("start_block", startBlock). diff --git a/internal/committer/poollatest.go b/internal/committer/poollatest.go index 4fb39bf..fc2af26 100644 --- a/internal/committer/poollatest.go +++ b/internal/committer/poollatest.go @@ -17,6 +17,7 @@ func pollLatest() error { // Initialize metrics labels chainIdStr := libs.ChainIdStr indexerName := config.Cfg.ZeetProjectName + hasRightsized := false for { latestBlock, err := libs.RpcClient.GetLatestBlockNumber(context.Background()) @@ -74,5 +75,14 @@ func pollLatest() error { // Update nextCommitBlockNumber for next iteration nextBlockNumber = expectedBlockNumber metrics.CommitterNextBlockNumber.WithLabelValues(indexerName, chainIdStr).Set(float64(nextBlockNumber)) + + if !config.Cfg.CommitterIsLive && latestBlock.Int64()-int64(nextBlockNumber) < 20 && !hasRightsized { + log.Debug(). + Uint64("latest_block", latestBlock.Uint64()). + Uint64("next_commit_block", nextBlockNumber). + Msg("Latest block is close to next commit block. Resizing s3 committer") + libs.RightsizeS3Committer() + hasRightsized = true + } } } diff --git a/internal/committer/reorg.go b/internal/committer/reorg.go index 2a03e01..26501ab 100644 --- a/internal/committer/reorg.go +++ b/internal/committer/reorg.go @@ -16,6 +16,11 @@ func InitReorg() { } func RunReorgValidator() { + // indexer is not live, so we don't need to check for reorgs + if !config.Cfg.CommitterIsLive { + return + } + lastBlockCheck := int64(0) for { startBlock, endBlock, err := getReorgRange() @@ -57,7 +62,7 @@ func getReorgRange() (int64, int64, error) { endBlock = min(endBlock-5, startBlock+100) // lag by some blocks for safety if startBlock >= endBlock { - return 0, 0, fmt.Errorf("start block is greater than end block") + return 0, 0, fmt.Errorf("start block is greater than end block (%d >= %d)", startBlock, endBlock) } return startBlock, endBlock, nil diff --git a/internal/backfill/disableIndexerMaybeStartCommitter.go b/internal/libs/insightServiceRequests.go similarity index 70% rename from internal/backfill/disableIndexerMaybeStartCommitter.go rename to internal/libs/insightServiceRequests.go index ca02f0a..73f9406 100644 --- a/internal/backfill/disableIndexerMaybeStartCommitter.go +++ b/internal/libs/insightServiceRequests.go @@ -1,4 +1,4 @@ -package backfill +package libs import ( "bytes" @@ -9,7 +9,6 @@ import ( "github.com/rs/zerolog/log" config "github.com/thirdweb-dev/indexer/configs" - "github.com/thirdweb-dev/indexer/internal/libs" ) type DeployS3CommitterRequest struct { @@ -17,6 +16,15 @@ type DeployS3CommitterRequest struct { } func DisableIndexerMaybeStartCommitter() { + makeS3CommitterRequest("deploy-s3-committer") +} + +func RightsizeS3Committer() { + makeS3CommitterRequest("rightsize-s3-committer") +} + +// makeS3CommitterRequest is a common function to make HTTP requests to the insight service +func makeS3CommitterRequest(endpoint string) { serviceURL := config.Cfg.InsightServiceUrl apiKey := config.Cfg.InsightServiceApiKey zeetDeploymentId := config.Cfg.ZeetDeploymentId @@ -33,7 +41,7 @@ func DisableIndexerMaybeStartCommitter() { } // Create HTTP request - url := fmt.Sprintf("%s/service/chains/%s/deploy-s3-committer", serviceURL, libs.ChainIdStr) + url := fmt.Sprintf("%s/service/chains/%s/%s", serviceURL, ChainIdStr, endpoint) req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) if err != nil { log.Error().Err(err).Msg("Failed to create HTTP request") @@ -52,12 +60,13 @@ func DisableIndexerMaybeStartCommitter() { // Send request log.Info(). Str("url", url). + Str("endpoint", endpoint). Str("zeetDeploymentId", zeetDeploymentId). - Msg("Sending deploy-s3-committer request to disable indexer") + Msgf("Sending %s request", endpoint) resp, err := client.Do(req) if err != nil { - log.Error().Err(err).Msg("Failed to send HTTP request") + log.Error().Err(err).Msgf("Failed to send %s request", endpoint) return } defer resp.Body.Close() @@ -66,10 +75,10 @@ func DisableIndexerMaybeStartCommitter() { if resp.StatusCode >= 200 && resp.StatusCode < 300 { log.Info(). Int("statusCode", resp.StatusCode). - Msg("Successfully sent deploy-s3-committer request. Indexer disabled") + Msgf("Successfully sent %s request", endpoint) } else { log.Error(). Int("statusCode", resp.StatusCode). - Msg("Deploy-s3-committer request failed. Could not disable indexer") + Msgf("%s request failed", endpoint) } }