Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Config struct {
CommitterMaxMemoryMB int `env:"COMMITTER_MAX_MEMORY_MB" envDefault:"512"`
CommitterCompressionThresholdMB int `env:"COMMITTER_COMPRESSION_THRESHOLD_MB" envDefault:"50"`
CommitterKafkaBatchSize int `env:"COMMITTER_KAFKA_BATCH_SIZE" envDefault:"500"`
CommitterIsLive bool `env:"COMMITTER_IS_LIVE" envDefault:"false"`
StagingS3Bucket string `env:"STAGING_S3_BUCKET" envDefault:"thirdweb-insight-production"`
StagingS3Region string `env:"STAGING_S3_REGION" envDefault:"us-west-2"`
StagingS3AccessKeyID string `env:"STAGING_S3_ACCESS_KEY_ID"`
Expand Down
2 changes: 1 addition & 1 deletion internal/backfill/getbackfillboundaries.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func GetBackfillBoundaries() (uint64, uint64) {

if startBlock > endBlock {
// since indexing is done, we call insight service to disable the indexer
DisableIndexerMaybeStartCommitter()
libs.DisableIndexerMaybeStartCommitter()
// most likely this will not be called as this service will be paused. but a panic just incase
log.Panic().
Uint64("start_block", startBlock).
Expand Down
8 changes: 8 additions & 0 deletions internal/committer/poollatest.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,13 @@ func pollLatest() error {
// Update nextCommitBlockNumber for next iteration
nextBlockNumber = expectedBlockNumber
metrics.CommitterNextBlockNumber.WithLabelValues(indexerName, chainIdStr).Set(float64(nextBlockNumber))

if !config.Cfg.CommitterIsLive && latestBlock.Int64()-int64(nextBlockNumber) < 20 {
log.Debug().
Uint64("latest_block", latestBlock.Uint64()).
Uint64("next_commit_block", nextBlockNumber).
Msg("Latest block is close to next commit block. Resizing s3 committer")
libs.RightsizeS3Committer()
}
}
}
2 changes: 1 addition & 1 deletion internal/committer/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func getReorgRange() (int64, int64, error) {
endBlock = min(endBlock-5, startBlock+100) // lag by some blocks for safety

if startBlock >= endBlock {
return 0, 0, fmt.Errorf("start block is greater than end block")
return 0, 0, fmt.Errorf("start block is greater than end block (%d >= %d)", startBlock, endBlock)
}

return startBlock, endBlock, nil
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package backfill
package libs

import (
"bytes"
Expand All @@ -9,14 +9,22 @@ import (

"github.com/rs/zerolog/log"
config "github.com/thirdweb-dev/indexer/configs"
"github.com/thirdweb-dev/indexer/internal/libs"
)

type DeployS3CommitterRequest struct {
ZeetDeploymentId string `json:"zeetDeploymentId"`
}

func DisableIndexerMaybeStartCommitter() {
makeS3CommitterRequest("deploy-s3-committer")
}

func RightsizeS3Committer() {
makeS3CommitterRequest("rightsize-s3-committer")
}

// makeS3CommitterRequest is a common function to make HTTP requests to the insight service
func makeS3CommitterRequest(endpoint string) {
serviceURL := config.Cfg.InsightServiceUrl
apiKey := config.Cfg.InsightServiceApiKey
zeetDeploymentId := config.Cfg.ZeetDeploymentId
Expand All @@ -33,7 +41,7 @@ func DisableIndexerMaybeStartCommitter() {
}

// Create HTTP request
url := fmt.Sprintf("%s/service/chains/%s/deploy-s3-committer", serviceURL, libs.ChainIdStr)
url := fmt.Sprintf("%s/service/chains/%s/%s", serviceURL, ChainIdStr, endpoint)
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
log.Error().Err(err).Msg("Failed to create HTTP request")
Expand All @@ -52,12 +60,13 @@ func DisableIndexerMaybeStartCommitter() {
// Send request
log.Info().
Str("url", url).
Str("endpoint", endpoint).
Str("zeetDeploymentId", zeetDeploymentId).
Msg("Sending deploy-s3-committer request to disable indexer")
Msgf("Sending %s request", endpoint)

resp, err := client.Do(req)
if err != nil {
log.Error().Err(err).Msg("Failed to send HTTP request")
log.Error().Err(err).Msgf("Failed to send %s request", endpoint)
return
}
defer resp.Body.Close()
Expand All @@ -66,10 +75,10 @@ func DisableIndexerMaybeStartCommitter() {
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
log.Info().
Int("statusCode", resp.StatusCode).
Msg("Successfully sent deploy-s3-committer request. Indexer disabled")
Msgf("Successfully sent %s request", endpoint)
} else {
log.Error().
Int("statusCode", resp.StatusCode).
Msg("Deploy-s3-committer request failed. Could not disable indexer")
Msgf("%s request failed", endpoint)
}
}