Skip to content

Commit 821798a

Browse files
authored
right size s3 committer after it is caught up (#298)
* right size s3 committer after it is caught up * no need to check for reorg when indexer is not live
1 parent c0c492e commit 821798a

File tree

5 files changed

+34
-9
lines changed

5 files changed

+34
-9
lines changed

configs/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ type Config struct {
6060
CommitterMaxMemoryMB int `env:"COMMITTER_MAX_MEMORY_MB" envDefault:"512"`
6161
CommitterCompressionThresholdMB int `env:"COMMITTER_COMPRESSION_THRESHOLD_MB" envDefault:"50"`
6262
CommitterKafkaBatchSize int `env:"COMMITTER_KAFKA_BATCH_SIZE" envDefault:"500"`
63+
CommitterIsLive bool `env:"COMMITTER_IS_LIVE" envDefault:"false"`
6364
StagingS3Bucket string `env:"STAGING_S3_BUCKET" envDefault:"thirdweb-insight-production"`
6465
StagingS3Region string `env:"STAGING_S3_REGION" envDefault:"us-west-2"`
6566
StagingS3AccessKeyID string `env:"STAGING_S3_ACCESS_KEY_ID"`

internal/backfill/getbackfillboundaries.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func GetBackfillBoundaries() (uint64, uint64) {
2222

2323
if startBlock > endBlock {
2424
// since indexing is done, we call insight service to disable the indexer
25-
DisableIndexerMaybeStartCommitter()
25+
libs.DisableIndexerMaybeStartCommitter()
2626
// most likely this will not be called as this service will be paused. but a panic just incase
2727
log.Panic().
2828
Uint64("start_block", startBlock).

internal/committer/poollatest.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ func pollLatest() error {
1717
// Initialize metrics labels
1818
chainIdStr := libs.ChainIdStr
1919
indexerName := config.Cfg.ZeetProjectName
20+
hasRightsized := false
2021

2122
for {
2223
latestBlock, err := libs.RpcClient.GetLatestBlockNumber(context.Background())
@@ -74,5 +75,14 @@ func pollLatest() error {
7475
// Update nextCommitBlockNumber for next iteration
7576
nextBlockNumber = expectedBlockNumber
7677
metrics.CommitterNextBlockNumber.WithLabelValues(indexerName, chainIdStr).Set(float64(nextBlockNumber))
78+
79+
if !config.Cfg.CommitterIsLive && latestBlock.Int64()-int64(nextBlockNumber) < 20 && !hasRightsized {
80+
log.Debug().
81+
Uint64("latest_block", latestBlock.Uint64()).
82+
Uint64("next_commit_block", nextBlockNumber).
83+
Msg("Latest block is close to next commit block. Resizing s3 committer")
84+
libs.RightsizeS3Committer()
85+
hasRightsized = true
86+
}
7787
}
7888
}

internal/committer/reorg.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ func InitReorg() {
1616
}
1717

1818
func RunReorgValidator() {
19+
// indexer is not live, so we don't need to check for reorgs
20+
if !config.Cfg.CommitterIsLive {
21+
return
22+
}
23+
1924
lastBlockCheck := int64(0)
2025
for {
2126
startBlock, endBlock, err := getReorgRange()
@@ -57,7 +62,7 @@ func getReorgRange() (int64, int64, error) {
5762
endBlock = min(endBlock-5, startBlock+100) // lag by some blocks for safety
5863

5964
if startBlock >= endBlock {
60-
return 0, 0, fmt.Errorf("start block is greater than end block")
65+
return 0, 0, fmt.Errorf("start block is greater than end block (%d >= %d)", startBlock, endBlock)
6166
}
6267

6368
return startBlock, endBlock, nil

internal/backfill/disableIndexerMaybeStartCommitter.go renamed to internal/libs/insightServiceRequests.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package backfill
1+
package libs
22

33
import (
44
"bytes"
@@ -9,14 +9,22 @@ import (
99

1010
"github.com/rs/zerolog/log"
1111
config "github.com/thirdweb-dev/indexer/configs"
12-
"github.com/thirdweb-dev/indexer/internal/libs"
1312
)
1413

1514
type DeployS3CommitterRequest struct {
1615
ZeetDeploymentId string `json:"zeetDeploymentId"`
1716
}
1817

1918
func DisableIndexerMaybeStartCommitter() {
19+
makeS3CommitterRequest("deploy-s3-committer")
20+
}
21+
22+
func RightsizeS3Committer() {
23+
makeS3CommitterRequest("rightsize-s3-committer")
24+
}
25+
26+
// makeS3CommitterRequest is a common function to make HTTP requests to the insight service
27+
func makeS3CommitterRequest(endpoint string) {
2028
serviceURL := config.Cfg.InsightServiceUrl
2129
apiKey := config.Cfg.InsightServiceApiKey
2230
zeetDeploymentId := config.Cfg.ZeetDeploymentId
@@ -33,7 +41,7 @@ func DisableIndexerMaybeStartCommitter() {
3341
}
3442

3543
// Create HTTP request
36-
url := fmt.Sprintf("%s/service/chains/%s/deploy-s3-committer", serviceURL, libs.ChainIdStr)
44+
url := fmt.Sprintf("%s/service/chains/%s/%s", serviceURL, ChainIdStr, endpoint)
3745
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
3846
if err != nil {
3947
log.Error().Err(err).Msg("Failed to create HTTP request")
@@ -52,12 +60,13 @@ func DisableIndexerMaybeStartCommitter() {
5260
// Send request
5361
log.Info().
5462
Str("url", url).
63+
Str("endpoint", endpoint).
5564
Str("zeetDeploymentId", zeetDeploymentId).
56-
Msg("Sending deploy-s3-committer request to disable indexer")
65+
Msgf("Sending %s request", endpoint)
5766

5867
resp, err := client.Do(req)
5968
if err != nil {
60-
log.Error().Err(err).Msg("Failed to send HTTP request")
69+
log.Error().Err(err).Msgf("Failed to send %s request", endpoint)
6170
return
6271
}
6372
defer resp.Body.Close()
@@ -66,10 +75,10 @@ func DisableIndexerMaybeStartCommitter() {
6675
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
6776
log.Info().
6877
Int("statusCode", resp.StatusCode).
69-
Msg("Successfully sent deploy-s3-committer request. Indexer disabled")
78+
Msgf("Successfully sent %s request", endpoint)
7079
} else {
7180
log.Error().
7281
Int("statusCode", resp.StatusCode).
73-
Msg("Deploy-s3-committer request failed. Could not disable indexer")
82+
Msgf("%s request failed", endpoint)
7483
}
7584
}

0 commit comments

Comments
 (0)