Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Config struct {
CommitterMaxMemoryMB int `env:"COMMITTER_MAX_MEMORY_MB" envDefault:"512"`
CommitterCompressionThresholdMB int `env:"COMMITTER_COMPRESSION_THRESHOLD_MB" envDefault:"50"`
CommitterKafkaBatchSize int `env:"COMMITTER_KAFKA_BATCH_SIZE" envDefault:"500"`
CommitterIsLive bool `env:"COMMITTER_IS_LIVE" envDefault:"false"`
StagingS3Bucket string `env:"STAGING_S3_BUCKET" envDefault:"thirdweb-insight-production"`
StagingS3Region string `env:"STAGING_S3_REGION" envDefault:"us-west-2"`
StagingS3AccessKeyID string `env:"STAGING_S3_ACCESS_KEY_ID"`
Expand Down
2 changes: 1 addition & 1 deletion internal/backfill/getbackfillboundaries.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func GetBackfillBoundaries() (uint64, uint64) {

if startBlock > endBlock {
// since indexing is done, we call insight service to disable the indexer
DisableIndexerMaybeStartCommitter()
libs.DisableIndexerMaybeStartCommitter()
// most likely this will not be called as this service will be paused. but a panic just incase
log.Panic().
Uint64("start_block", startBlock).
Expand Down
10 changes: 10 additions & 0 deletions internal/committer/poollatest.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func pollLatest() error {
// Initialize metrics labels
chainIdStr := libs.ChainIdStr
indexerName := config.Cfg.ZeetProjectName
hasRightsized := false

for {
latestBlock, err := libs.RpcClient.GetLatestBlockNumber(context.Background())
Expand Down Expand Up @@ -74,5 +75,14 @@ func pollLatest() error {
// Update nextCommitBlockNumber for next iteration
nextBlockNumber = expectedBlockNumber
metrics.CommitterNextBlockNumber.WithLabelValues(indexerName, chainIdStr).Set(float64(nextBlockNumber))

if !config.Cfg.CommitterIsLive && latestBlock.Int64()-int64(nextBlockNumber) < 20 && !hasRightsized {
log.Debug().
Uint64("latest_block", latestBlock.Uint64()).
Uint64("next_commit_block", nextBlockNumber).
Msg("Latest block is close to next commit block. Resizing s3 committer")
libs.RightsizeS3Committer()
hasRightsized = true
}
}
}
7 changes: 6 additions & 1 deletion internal/committer/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ func InitReorg() {
}

func RunReorgValidator() {
// indexer is not live, so we don't need to check for reorgs
if !config.Cfg.CommitterIsLive {
return
}

lastBlockCheck := int64(0)
for {
startBlock, endBlock, err := getReorgRange()
Expand Down Expand Up @@ -57,7 +62,7 @@ func getReorgRange() (int64, int64, error) {
endBlock = min(endBlock-5, startBlock+100) // lag by some blocks for safety

if startBlock >= endBlock {
return 0, 0, fmt.Errorf("start block is greater than end block")
return 0, 0, fmt.Errorf("start block is greater than end block (%d >= %d)", startBlock, endBlock)
}

return startBlock, endBlock, nil
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package backfill
package libs

import (
"bytes"
Expand All @@ -9,14 +9,22 @@ import (

"github.com/rs/zerolog/log"
config "github.com/thirdweb-dev/indexer/configs"
"github.com/thirdweb-dev/indexer/internal/libs"
)

type DeployS3CommitterRequest struct {
ZeetDeploymentId string `json:"zeetDeploymentId"`
}

func DisableIndexerMaybeStartCommitter() {
makeS3CommitterRequest("deploy-s3-committer")
}

func RightsizeS3Committer() {
makeS3CommitterRequest("rightsize-s3-committer")
}

// makeS3CommitterRequest is a common function to make HTTP requests to the insight service
func makeS3CommitterRequest(endpoint string) {
serviceURL := config.Cfg.InsightServiceUrl
apiKey := config.Cfg.InsightServiceApiKey
zeetDeploymentId := config.Cfg.ZeetDeploymentId
Expand All @@ -33,7 +41,7 @@ func DisableIndexerMaybeStartCommitter() {
}

// Create HTTP request
url := fmt.Sprintf("%s/service/chains/%s/deploy-s3-committer", serviceURL, libs.ChainIdStr)
url := fmt.Sprintf("%s/service/chains/%s/%s", serviceURL, ChainIdStr, endpoint)
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
log.Error().Err(err).Msg("Failed to create HTTP request")
Expand All @@ -52,12 +60,13 @@ func DisableIndexerMaybeStartCommitter() {
// Send request
log.Info().
Str("url", url).
Str("endpoint", endpoint).
Str("zeetDeploymentId", zeetDeploymentId).
Msg("Sending deploy-s3-committer request to disable indexer")
Msgf("Sending %s request", endpoint)

resp, err := client.Do(req)
if err != nil {
log.Error().Err(err).Msg("Failed to send HTTP request")
log.Error().Err(err).Msgf("Failed to send %s request", endpoint)
return
}
defer resp.Body.Close()
Expand All @@ -66,10 +75,10 @@ func DisableIndexerMaybeStartCommitter() {
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
log.Info().
Int("statusCode", resp.StatusCode).
Msg("Successfully sent deploy-s3-committer request. Indexer disabled")
Msgf("Successfully sent %s request", endpoint)
} else {
log.Error().
Int("statusCode", resp.StatusCode).
Msg("Deploy-s3-committer request failed. Could not disable indexer")
Msgf("%s request failed", endpoint)
}
}