Skip to content

Commit 061cc55

Browse files
committed
right size s3 committer after it is caught up
1 parent 839f548 commit 061cc55

File tree

4 files changed

+26
-8
lines changed

4 files changed

+26
-8
lines changed

configs/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ type Config struct {
6060
CommitterMaxMemoryMB int `env:"COMMITTER_MAX_MEMORY_MB" envDefault:"512"`
6161
CommitterCompressionThresholdMB int `env:"COMMITTER_COMPRESSION_THRESHOLD_MB" envDefault:"50"`
6262
CommitterKafkaBatchSize int `env:"COMMITTER_KAFKA_BATCH_SIZE" envDefault:"500"`
63+
CommitterIsLive bool `env:"COMMITTER_IS_LIVE" envDefault:"false"`
6364
StagingS3Bucket string `env:"STAGING_S3_BUCKET" envDefault:"thirdweb-insight-production"`
6465
StagingS3Region string `env:"STAGING_S3_REGION" envDefault:"us-west-2"`
6566
StagingS3AccessKeyID string `env:"STAGING_S3_ACCESS_KEY_ID"`

internal/backfill/getbackfillboundaries.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func GetBackfillBoundaries() (uint64, uint64) {
2222

2323
if startBlock > endBlock {
2424
// since indexing is done, we call insight service to disable the indexer
25-
DisableIndexerMaybeStartCommitter()
25+
libs.DisableIndexerMaybeStartCommitter()
2626
// most likely this will not be called as this service will be paused. but a panic just incase
2727
log.Panic().
2828
Uint64("start_block", startBlock).

internal/committer/poollatest.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,5 +74,13 @@ func pollLatest() error {
7474
// Update nextCommitBlockNumber for next iteration
7575
nextBlockNumber = expectedBlockNumber
7676
metrics.CommitterNextBlockNumber.WithLabelValues(indexerName, chainIdStr).Set(float64(nextBlockNumber))
77+
78+
if !config.Cfg.CommitterIsLive && latestBlock.Int64()-int64(nextBlockNumber) < 20 {
79+
log.Debug().
80+
Uint64("latest_block", latestBlock.Uint64()).
81+
Uint64("next_commit_block", nextBlockNumber).
82+
Msg("Latest block is close to next commit block. Resizing s3 committer")
83+
libs.RightsizeS3Committer()
84+
}
7785
}
7886
}

internal/backfill/disableIndexerMaybeStartCommitter.go renamed to internal/libs/insightServiceRequests.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package backfill
1+
package libs
22

33
import (
44
"bytes"
@@ -9,14 +9,22 @@ import (
99

1010
"github.com/rs/zerolog/log"
1111
config "github.com/thirdweb-dev/indexer/configs"
12-
"github.com/thirdweb-dev/indexer/internal/libs"
1312
)
1413

1514
type DeployS3CommitterRequest struct {
1615
ZeetDeploymentId string `json:"zeetDeploymentId"`
1716
}
1817

1918
func DisableIndexerMaybeStartCommitter() {
19+
makeS3CommitterRequest("deploy-s3-committer")
20+
}
21+
22+
func RightsizeS3Committer() {
23+
makeS3CommitterRequest("rightsize-s3-committer")
24+
}
25+
26+
// makeS3CommitterRequest is a common function to make HTTP requests to the insight service
27+
func makeS3CommitterRequest(endpoint string) {
2028
serviceURL := config.Cfg.InsightServiceUrl
2129
apiKey := config.Cfg.InsightServiceApiKey
2230
zeetDeploymentId := config.Cfg.ZeetDeploymentId
@@ -33,7 +41,7 @@ func DisableIndexerMaybeStartCommitter() {
3341
}
3442

3543
// Create HTTP request
36-
url := fmt.Sprintf("%s/service/chains/%s/deploy-s3-committer", serviceURL, libs.ChainIdStr)
44+
url := fmt.Sprintf("%s/service/chains/%s/%s", serviceURL, ChainIdStr, endpoint)
3745
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
3846
if err != nil {
3947
log.Error().Err(err).Msg("Failed to create HTTP request")
@@ -52,12 +60,13 @@ func DisableIndexerMaybeStartCommitter() {
5260
// Send request
5361
log.Info().
5462
Str("url", url).
63+
Str("endpoint", endpoint).
5564
Str("zeetDeploymentId", zeetDeploymentId).
56-
Msg("Sending deploy-s3-committer request to disable indexer")
65+
Msgf("Sending %s request", endpoint)
5766

5867
resp, err := client.Do(req)
5968
if err != nil {
60-
log.Error().Err(err).Msg("Failed to send HTTP request")
69+
log.Error().Err(err).Msgf("Failed to send %s request", endpoint)
6170
return
6271
}
6372
defer resp.Body.Close()
@@ -66,10 +75,10 @@ func DisableIndexerMaybeStartCommitter() {
6675
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
6776
log.Info().
6877
Int("statusCode", resp.StatusCode).
69-
Msg("Successfully sent deploy-s3-committer request. Indexer disabled")
78+
Msgf("Successfully sent %s request", endpoint)
7079
} else {
7180
log.Error().
7281
Int("statusCode", resp.StatusCode).
73-
Msg("Deploy-s3-committer request failed. Could not disable indexer")
82+
Msgf("%s request failed", endpoint)
7483
}
7584
}

0 commit comments

Comments
 (0)