From 8efc439c425dc9345e393af6d15172a149bb7aae Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Wed, 4 Jun 2025 20:57:16 -0500 Subject: [PATCH 01/27] fix: add missing pruning logic, add missing block fk constraints --- .../up.go | 31 ++++++ pkg/postgres/migrations/migrator.go | 2 + pkg/rewards/rewards.go | 99 ++++++++++++++++--- 3 files changed, 116 insertions(+), 16 deletions(-) create mode 100644 pkg/postgres/migrations/202506042023_addMissingBlockNumberForeignKeys/up.go diff --git a/pkg/postgres/migrations/202506042023_addMissingBlockNumberForeignKeys/up.go b/pkg/postgres/migrations/202506042023_addMissingBlockNumberForeignKeys/up.go new file mode 100644 index 000000000..ac8f73602 --- /dev/null +++ b/pkg/postgres/migrations/202506042023_addMissingBlockNumberForeignKeys/up.go @@ -0,0 +1,31 @@ +package _202506042023_addMissingBlockNumberForeignKeys + +import ( + "database/sql" + "fmt" + "github.com/Layr-Labs/sidecar/internal/config" + "gorm.io/gorm" +) + +type Migration struct { +} + +func (m *Migration) Up(db *sql.DB, grm *gorm.DB, cfg *config.Config) error { + tableNames := []string{ + "staker_shares", + "operator_shares", + } + + for _, tableName := range tableNames { + query := fmt.Sprintf(`alter table %s add constraint %s_block_number_fkey foreign key (block_number) references blocks (number) on delete cascade;`, tableName, tableName) + res := grm.Exec(query) + if res.Error != nil { + return fmt.Errorf("failed to add foreign key for table %s: %w", tableName, res.Error) + } + } + return nil +} + +func (m *Migration) GetName() string { + return "202506042023_addMissingBlockNumberForeignKeys" +} diff --git a/pkg/postgres/migrations/migrator.go b/pkg/postgres/migrations/migrator.go index 8e1b16a1b..bae7a5f4c 100644 --- a/pkg/postgres/migrations/migrator.go +++ b/pkg/postgres/migrations/migrator.go @@ -71,6 +71,7 @@ import ( _202503311108_goldRewardHashIndex "github.com/Layr-Labs/sidecar/pkg/postgres/migrations/202503311108_goldRewardHashIndex" _202504240743_fixQueuedSlashingWithdrawalsPk "github.com/Layr-Labs/sidecar/pkg/postgres/migrations/202504240743_fixQueuedSlashingWithdrawalsPk" _202505092007_startupJobs "github.com/Layr-Labs/sidecar/pkg/postgres/migrations/202505092007_startupJobs" + _202506042023_addMissingBlockNumberForeignKeys "github.com/Layr-Labs/sidecar/pkg/postgres/migrations/202506042023_addMissingBlockNumberForeignKeys" "go.uber.org/zap" "gorm.io/gorm" "time" @@ -213,6 +214,7 @@ func (m *Migrator) MigrateAll() error { &_202503171414_slashingWithdrawals.Migration{}, &_202504240743_fixQueuedSlashingWithdrawalsPk.Migration{}, &_202505092007_startupJobs.Migration{}, + &_202506042023_addMissingBlockNumberForeignKeys.Migration{}, } for _, migration := range migrations { diff --git a/pkg/rewards/rewards.go b/pkg/rewards/rewards.go index 923babd6a..9f6d460c8 100644 --- a/pkg/rewards/rewards.go +++ b/pkg/rewards/rewards.go @@ -13,7 +13,6 @@ import ( "sync/atomic" - "slices" "strings" "strconv" @@ -432,7 +431,42 @@ func (rc *RewardsCalculator) findRewardsTablesBySnapshotDate(snapshotDate string return rewardsTables, nil } +// FindRewardHashesToPrune finds all reward hashes that will be pruned when deleting corrupted state. +func (rc *RewardsCalculator) FindRewardHashesToPrune(blockHeight uint64) ([]string, error) { + query := ` + select + distinct(reward_hash) as reward_hash + from ( + select reward_hash from combined_rewards where block_number >= @blockNumber + union all + select reward_hash from operator_directed_rewards where block_number >= @blockNumber + union all + select + odosrs.reward_hash + from operator_directed_operator_set_reward_submissions as odosrs + -- operator_directed_operator_set_reward_submissions lacks a block_time column, so we need to join blocks + join blocks as b on (b.number = odosrs.block_number) + where + b.number >= @blockNumber + ) as t + ` + var rewardHashes []string + res := rc.grm.Raw(query, sql.Named("blockNumber", blockHeight)).Scan(&rewardHashes) + if res.Error != nil { + rc.logger.Sugar().Errorw("Failed to find reward hashes to prune", "error", res.Error) + return nil, res.Error + } + return rewardHashes, nil +} + +// DeleteCorruptedRewardsFromBlockHeight deletes rewards data that is >= the provided block height. func (rc *RewardsCalculator) DeleteCorruptedRewardsFromBlockHeight(blockHeight uint64) error { + rewardHashesToPrune, err := rc.FindRewardHashesToPrune(blockHeight) + if err != nil { + rc.logger.Sugar().Errorw("Failed to find reward hashes to prune", "error", err) + return err + } + generatedSnapshot, err := rc.findGeneratedRewardSnapshotByBlock(blockHeight) if err != nil { rc.logger.Sugar().Errorw("Failed to find generated snapshot", "error", err) @@ -462,9 +496,12 @@ func (rc *RewardsCalculator) DeleteCorruptedRewardsFromBlockHeight(blockHeight u lowerBoundSnapshot = nil } - snapshotDates := make([]string, 0) + // Find all suffixed rewards tables that were generated as part of the rewards calculation process. + // e.g. gold_6_rfae_stakers_2024_12_01 + // We since we're deleting the blocks where these rewards were applicable, we need to drop these tables + // so that they can be created again when we run the rewards calculation again. for _, snapshot := range snapshotsToDelete { - snapshotDates = append(snapshotDates, snapshot.SnapshotDate) + // find and drop tables that were created for this snapshot (e.g. gold_6_rfae_stakers_2024_12_01) tableNames, err := rc.findRewardsTablesBySnapshotDate(snapshot.SnapshotDate) if err != nil { rc.logger.Sugar().Errorw("Failed to find rewards tables", "error", err) @@ -489,19 +526,49 @@ func (rc *RewardsCalculator) DeleteCorruptedRewardsFromBlockHeight(blockHeight u } } - // sort all snapshot dates in ascending order to purge from gold table - slices.SortFunc(snapshotDates, func(i, j string) int { - return strings.Compare(i, j) - }) + // prune snapshot tables based on snapshot dates. + // - tables that have been migrated to the "new" single table format that is never dropped will end up getting backfilled. + // - tables that we still drop, dont need to be pruned since they'll get recreated. + tablesToPrune := []string{ + "combined_rewards", + "default_operator_split_snapshots", + "operator_avs_registration_snapshots", + "operator_avs_split_snapshots", + "operator_avs_strategy_snapshots", + "operator_directed_rewards", + "operator_pi_split_snapshots", + "operator_share_snapshots", + "staker_delegation_snapshots", + "staker_share_snapshots", + } + for _, tableName := range tablesToPrune { + var query string + + // if we have no lower bound, that means we're deleting everything + if lowerBoundSnapshot == nil { + query = fmt.Sprintf(`truncate table %s cascade`, tableName) + } else { + query = fmt.Sprintf(`delete from %s where snapshot >= '%s'`, tableName, lowerBoundSnapshot.SnapshotDate) + } - // purge from gold table - if lowerBoundSnapshot != nil { - rc.logger.Sugar().Infow("Purging rewards from gold table where snapshot >=", "snapshotDate", lowerBoundSnapshot.SnapshotDate) - res = rc.grm.Exec(`delete from gold_table where snapshot >= @snapshotDate`, sql.Named("snapshotDate", lowerBoundSnapshot.SnapshotDate)) - } else { - // if the lower bound is nil, ther we're deleting everything - rc.logger.Sugar().Infow("Purging all rewards from gold table") - res = rc.grm.Exec(`delete from gold_table`) + res = rc.grm.Exec(query) + if res.Error != nil { + rc.logger.Sugar().Errorw("Failed to prune snapshot table", "tableName", tableName, "error", res.Error) + return res.Error + } + } + + // Purge rewards from gold_table based on reward_hash. + // Since reward submissions are submitted at a block, but can be arbitrarily backwards looking, + // we cant just delete by snapshot date. We have to get all reward snapshots that were created by + // a specific reward hash that we're deleting. + goldPurgeQuery := ` + delete from gold_table where reward_hash in @rewardHashes + ` + res = rc.grm.Exec(goldPurgeQuery, sql.Named("rewardHashes", rewardHashesToPrune)) + if res.Error != nil { + rc.logger.Sugar().Errorw("Failed to delete rewards from gold table by reward_hash", "error", res.Error) + return res.Error } if res.Error != nil { @@ -510,7 +577,7 @@ func (rc *RewardsCalculator) DeleteCorruptedRewardsFromBlockHeight(blockHeight u } if lowerBoundSnapshot != nil { rc.logger.Sugar().Infow("Deleted rewards from gold table", - zap.String("snapshotDate", lowerBoundSnapshot.SnapshotDate), + zap.Int("rewardHashCount", len(rewardHashesToPrune)), zap.Int64("recordsDeleted", res.RowsAffected), ) } else { From d88b26d27dacaf9a2ad865f1eff87bd406c9c258 Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Wed, 4 Jun 2025 21:47:18 -0500 Subject: [PATCH 02/27] add block_number indexes --- .../up.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/pkg/postgres/migrations/202506042023_addMissingBlockNumberForeignKeys/up.go b/pkg/postgres/migrations/202506042023_addMissingBlockNumberForeignKeys/up.go index ac8f73602..d4fd0cbdb 100644 --- a/pkg/postgres/migrations/202506042023_addMissingBlockNumberForeignKeys/up.go +++ b/pkg/postgres/migrations/202506042023_addMissingBlockNumberForeignKeys/up.go @@ -23,6 +23,18 @@ func (m *Migration) Up(db *sql.DB, grm *gorm.DB, cfg *config.Config) error { return fmt.Errorf("failed to add foreign key for table %s: %w", tableName, res.Error) } } + + queries := []string{ + `create index if not exists idx_staker_shares_block_number on sidecar_mainnet_ethereum.staker_shares(block_number);`, + `create index if not exists idx_operator_shares_block_number on sidecar_mainnet_ethereum.operator_shares(block_number);`, + } + for _, query := range queries { + res := grm.Exec(query) + if res.Error != nil { + return fmt.Errorf("failed to create index: %w", res.Error) + } + } + return nil } From b7efc5853f4d7b15160eac5df1ba98e3bba83125 Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Thu, 5 Jun 2025 10:07:42 -0500 Subject: [PATCH 03/27] fix constraints --- .../202506042023_addMissingBlockNumberForeignKeys/up.go | 7 +++++-- pkg/rewards/rewards.go | 1 - 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/postgres/migrations/202506042023_addMissingBlockNumberForeignKeys/up.go b/pkg/postgres/migrations/202506042023_addMissingBlockNumberForeignKeys/up.go index d4fd0cbdb..5d86838f1 100644 --- a/pkg/postgres/migrations/202506042023_addMissingBlockNumberForeignKeys/up.go +++ b/pkg/postgres/migrations/202506042023_addMissingBlockNumberForeignKeys/up.go @@ -25,8 +25,11 @@ func (m *Migration) Up(db *sql.DB, grm *gorm.DB, cfg *config.Config) error { } queries := []string{ - `create index if not exists idx_staker_shares_block_number on sidecar_mainnet_ethereum.staker_shares(block_number);`, - `create index if not exists idx_operator_shares_block_number on sidecar_mainnet_ethereum.operator_shares(block_number);`, + `create index if not exists idx_staker_shares_block_number on staker_shares(block_number);`, + `create index if not exists idx_operator_shares_block_number on operator_shares(block_number);`, + `create index if not exists idx_combined_rewards_block_number on combined_rewards(block_number);`, + `alter table rewards_claimed drop constraint rewards_claimed_transaction_hash_log_index_key`, + `alter table rewards_claimed add constraint rewards_claimed_transaction_hash_log_index_key unique (transaction_hash, log_index);`, } for _, query := range queries { res := grm.Exec(query) diff --git a/pkg/rewards/rewards.go b/pkg/rewards/rewards.go index 9f6d460c8..7ec5d5079 100644 --- a/pkg/rewards/rewards.go +++ b/pkg/rewards/rewards.go @@ -530,7 +530,6 @@ func (rc *RewardsCalculator) DeleteCorruptedRewardsFromBlockHeight(blockHeight u // - tables that have been migrated to the "new" single table format that is never dropped will end up getting backfilled. // - tables that we still drop, dont need to be pruned since they'll get recreated. tablesToPrune := []string{ - "combined_rewards", "default_operator_split_snapshots", "operator_avs_registration_snapshots", "operator_avs_split_snapshots", From b514e77bb19810d1db447219df1f09c05193123a Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Tue, 1 Apr 2025 23:41:50 -0500 Subject: [PATCH 04/27] perf: implement a faster from-scratch sync --- cmd/database.go | 2 +- cmd/debugger/main.go | 2 +- cmd/operatorRestakedStrategies.go | 2 +- cmd/run.go | 4 +- examples/transactionBackfiller/main.go | 2 +- pkg/contractStore/contractStore.go | 2 +- .../postgresContractStore.go | 4 +- .../postgresContractStore_test.go | 2 +- pkg/fetcher/fetcher.go | 235 +++++++++++++++--- pkg/indexer/indexer.go | 17 +- pkg/indexer/restakedStrategies_test.go | 2 +- pkg/pipeline/pipeline.go | 2 +- pkg/pipeline/pipelineIntegration_test.go | 2 +- pkg/providers/interfaces.go | 5 + pkg/sidecar/blockIndexer.go | 2 +- pkg/transactionBackfiller/backfiller_test.go | 2 +- 16 files changed, 235 insertions(+), 52 deletions(-) create mode 100644 pkg/providers/interfaces.go diff --git a/cmd/database.go b/cmd/database.go index 83afe3cea..268ffd273 100644 --- a/cmd/database.go +++ b/cmd/database.go @@ -112,7 +112,7 @@ var runDatabaseCmd = &cobra.Command{ precommitProcessors.LoadPrecommitProcessors(sm, grm, l) - fetchr := fetcher.NewFetcher(client, &fetcher.FetcherConfig{UseGetBlockReceipts: cfg.EthereumRpcConfig.UseGetBlockReceipts}, l) + fetchr := fetcher.NewFetcher(client, &fetcher.FetcherConfig{UseGetBlockReceipts: cfg.EthereumRpcConfig.UseGetBlockReceipts}, contractStore, l) cc := sequentialContractCaller.NewSequentialContractCaller(client, cfg, cfg.EthereumRpcConfig.ContractCallBatchSize, l) diff --git a/cmd/debugger/main.go b/cmd/debugger/main.go index 70a12dd75..f151592da 100644 --- a/cmd/debugger/main.go +++ b/cmd/debugger/main.go @@ -109,7 +109,7 @@ func main() { precommitProcessors.LoadPrecommitProcessors(sm, grm, l) - fetchr := fetcher.NewFetcher(client, &fetcher.FetcherConfig{UseGetBlockReceipts: cfg.EthereumRpcConfig.UseGetBlockReceipts}, l) + fetchr := fetcher.NewFetcher(client, &fetcher.FetcherConfig{UseGetBlockReceipts: cfg.EthereumRpcConfig.UseGetBlockReceipts}, contractStore, l) cc := sequentialContractCaller.NewSequentialContractCaller(client, cfg, cfg.EthereumRpcConfig.ContractCallBatchSize, l) diff --git a/cmd/operatorRestakedStrategies.go b/cmd/operatorRestakedStrategies.go index 63f3533b1..6811d8c98 100644 --- a/cmd/operatorRestakedStrategies.go +++ b/cmd/operatorRestakedStrategies.go @@ -84,7 +84,7 @@ var runOperatorRestakedStrategiesCmd = &cobra.Command{ precommitProcessors.LoadPrecommitProcessors(sm, grm, l) - fetchr := fetcher.NewFetcher(client, &fetcher.FetcherConfig{UseGetBlockReceipts: cfg.EthereumRpcConfig.UseGetBlockReceipts}, l) + fetchr := fetcher.NewFetcher(client, &fetcher.FetcherConfig{UseGetBlockReceipts: cfg.EthereumRpcConfig.UseGetBlockReceipts}, contractStore, l) cc := sequentialContractCaller.NewSequentialContractCaller(client, cfg, cfg.EthereumRpcConfig.ContractCallBatchSize, l) diff --git a/cmd/run.go b/cmd/run.go index 4eac89227..2bee2f571 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -144,8 +144,8 @@ var runCmd = &cobra.Command{ } precommitProcessors.LoadPrecommitProcessors(sm, grm, l) - - fetchr := fetcher.NewFetcher(client, &fetcher.FetcherConfig{UseGetBlockReceipts: cfg.EthereumRpcConfig.UseGetBlockReceipts}, l) + + fetchr := fetcher.NewFetcher(client, &fetcher.FetcherConfig{UseGetBlockReceipts: cfg.EthereumRpcConfig.UseGetBlockReceipts}, contractStore, l) cc := sequentialContractCaller.NewSequentialContractCaller(client, cfg, cfg.EthereumRpcConfig.ContractCallBatchSize, l) diff --git a/examples/transactionBackfiller/main.go b/examples/transactionBackfiller/main.go index dc0d0a51e..3dbda4e7e 100644 --- a/examples/transactionBackfiller/main.go +++ b/examples/transactionBackfiller/main.go @@ -66,7 +66,7 @@ func setup(ethConfig *ethereum.EthereumClientConfig) ( cm := contractManager.NewContractManager(grm, contractStore, client, af, l) - fetchr := fetcher.NewFetcher(client, &fetcher.FetcherConfig{UseGetBlockReceipts: cfg.EthereumRpcConfig.UseGetBlockReceipts}, l) + fetchr := fetcher.NewFetcher(client, &fetcher.FetcherConfig{UseGetBlockReceipts: cfg.EthereumRpcConfig.UseGetBlockReceipts}, contractStore, l) return cfg, l, fetchr, mds, cm, nil diff --git a/pkg/contractStore/contractStore.go b/pkg/contractStore/contractStore.go index b259f69dd..e383a97e7 100644 --- a/pkg/contractStore/contractStore.go +++ b/pkg/contractStore/contractStore.go @@ -14,7 +14,7 @@ var CoreContracts embed.FS type ContractStore interface { GetContractForAddress(address string) (*Contract, error) GetProxyContractForAddress(blockNumber uint64, address string) (*ProxyContract, error) - GetAllProxyAddressesInString() ([]string, error) + ListInterestingContractAddresses() ([]string, error) CreateContract(address string, abiJson string, verified bool, bytecodeHash string, matchingContractAddress string, checkedForAbi bool, contractType ContractType) (*Contract, error) FindOrCreateContract(address string, abiJson string, verified bool, bytecodeHash string, matchingContractAddress string, checkedForAbi bool, contractType ContractType) (*Contract, bool, error) diff --git a/pkg/contractStore/postgresContractStore/postgresContractStore.go b/pkg/contractStore/postgresContractStore/postgresContractStore.go index f1a6e0c46..892f17d55 100644 --- a/pkg/contractStore/postgresContractStore/postgresContractStore.go +++ b/pkg/contractStore/postgresContractStore/postgresContractStore.go @@ -76,8 +76,8 @@ func (s *PostgresContractStore) GetProxyContractForAddress(blockNumber uint64, a return proxyContract, nil } -// GetAllProxyAddressesInString returns all proxy addresses in the database as a slice of strings. -func (s *PostgresContractStore) GetAllProxyAddressesInString() ([]string, error) { +// ListInterestingContractAddresses returns all proxy addresses in the database as a slice of strings. +func (s *PostgresContractStore) ListInterestingContractAddresses() ([]string, error) { var addresses []string result := s.Db.Raw(`select distinct(contract_address) from proxy_contracts`).Scan(&addresses) diff --git a/pkg/contractStore/postgresContractStore/postgresContractStore_test.go b/pkg/contractStore/postgresContractStore/postgresContractStore_test.go index b3deb66ce..192897bed 100644 --- a/pkg/contractStore/postgresContractStore/postgresContractStore_test.go +++ b/pkg/contractStore/postgresContractStore/postgresContractStore_test.go @@ -171,7 +171,7 @@ func Test_PostgresContractStore(t *testing.T) { assert.Equal(t, proxyContract.ProxyContractAddress, proxy.ProxyContractAddress) }) t.Run("Get all proxy addresses in string", func(t *testing.T) { - addresses, err := cs.GetAllProxyAddressesInString() + addresses, err := cs.ListInterestingContractAddresses() assert.Nil(t, err) assert.True(t, len(addresses) > 0) assert.Contains(t, addresses, createdContracts[0].ContractAddress) diff --git a/pkg/fetcher/fetcher.go b/pkg/fetcher/fetcher.go index c3dc3773b..44e8f6123 100644 --- a/pkg/fetcher/fetcher.go +++ b/pkg/fetcher/fetcher.go @@ -5,6 +5,8 @@ package fetcher import ( "context" + "github.com/Layr-Labs/sidecar/pkg/providers" + "github.com/Layr-Labs/sidecar/pkg/utils" "slices" "sync" "time" @@ -30,14 +32,22 @@ type Fetcher struct { Logger *zap.Logger // FetcherConfig contains the configuration specific to the Fetcher FetcherConfig *FetcherConfig + + InterestingLogsProvider providers.InterestingContractsProvider } // NewFetcher creates a new Fetcher with the provided Ethereum client, configuration, and logger. -func NewFetcher(ethClient *ethereum.Client, cfg *FetcherConfig, l *zap.Logger) *Fetcher { +func NewFetcher( + ethClient *ethereum.Client, + cfg *FetcherConfig, + ilp providers.InterestingContractsProvider, + l *zap.Logger, +) *Fetcher { return &Fetcher{ - EthClient: ethClient, - Logger: l, - FetcherConfig: cfg, + EthClient: ethClient, + Logger: l, + FetcherConfig: cfg, + InterestingLogsProvider: ilp, } } @@ -197,7 +207,7 @@ func (f *Fetcher) FetchBlocksWithRetries(ctx context.Context, startBlockInclusiv retries := []int{1, 2, 4, 8, 16, 32, 64} var e error for i, r := range retries { - fetchedBlocks, err := f.FetchBlocks(ctx, startBlockInclusive, endBlockInclusive) + fetchedBlocks, err := f.FetchBlocksWithReceipts(ctx, startBlockInclusive, endBlockInclusive) if err == nil { if i > 0 { f.Logger.Sugar().Infow("successfully fetched blocks for range after retries", @@ -224,13 +234,141 @@ func (f *Fetcher) FetchBlocksWithRetries(ctx context.Context, startBlockInclusiv ) return nil, e } - -func (f *Fetcher) FetchInterestingBlocksAndLogsForContractsForBlockRange(ctx context.Context, startBlockInclusive uint64, endBlockInclusive uint64, contractAddresses []string) ([]uint64, []*ethereum.EthereumEventLog, error) { +func (f *Fetcher) FetchLogsForContractsForBlockRange(ctx context.Context, startBlockInclusive uint64, endBlockInclusive uint64, contractAddresses []string) ([]uint64, error) { f.Logger.Sugar().Debugw("Fetching logs for contracts", zap.Uint64("startBlock", startBlockInclusive), zap.Uint64("endBlock", endBlockInclusive), ) + logsCollector := make(chan []*ethereum.EthereumEventLog, len(contractAddresses)) + errorCollector := make(chan error, len(contractAddresses)) + + wg := &sync.WaitGroup{} + for _, contractAddress := range contractAddresses { + wg.Add(1) + go func(contractAddress string) { + defer wg.Done() + f.Logger.Sugar().Debugw("Fetching logs for contract", + zap.Uint64("startBlock", startBlockInclusive), + zap.Uint64("endBlock", endBlockInclusive), + zap.String("contract", contractAddress), + ) + logs, err := f.EthClient.GetLogs(ctx, contractAddress, startBlockInclusive, endBlockInclusive) + f.Logger.Sugar().Debugw("Fetched logs for contract", + zap.Uint64("startBlock", startBlockInclusive), + zap.Uint64("endBlock", endBlockInclusive), + zap.String("contract", contractAddress), + zap.Int("count", len(logs)), + ) + if err != nil { + f.Logger.Sugar().Errorw("failed to fetch logs for contracts", + zap.Uint64("startBlock", startBlockInclusive), + zap.Uint64("endBlock", endBlockInclusive), + zap.Strings("contracts", contractAddresses), + zap.Error(err), + ) + errorCollector <- err + return + } + logsCollector <- logs + }(contractAddress) + } + wg.Wait() + close(logsCollector) + close(errorCollector) + f.Logger.Sugar().Debugw("Finished fetching logs for contracts", + zap.Uint64("startBlock", startBlockInclusive), + zap.Uint64("endBlock", endBlockInclusive), + zap.Strings("contracts", contractAddresses), + ) + interestingBlockNumbers := make(map[uint64]bool) + // collectedLogs := make([]*ethereum.EthereumEventLog, 0) + for logs := range logsCollector { + // collectedLogs = append(collectedLogs, logs...) + for _, log := range logs { + interestingBlockNumbers[log.BlockNumber.Value()] = true + } + } + var err error + for e := range errorCollector { + err = e + return nil, err + } + + blockNumbers := make([]uint64, 0) + for blockNumber := range interestingBlockNumbers { + blockNumbers = append(blockNumbers, blockNumber) + } + slices.Sort(blockNumbers) + + return blockNumbers, err +} + +func (f *Fetcher) FetchFilteredBlocksWithRetries(ctx context.Context, startBlockInclusive uint64, endBlockInclusive uint64) ([]*FetchedBlock, error) { + f.Logger.Sugar().Debugw("Fetching filtered blocks with retries", + zap.Uint64("startBlock", startBlockInclusive), + zap.Uint64("endBlock", endBlockInclusive), + ) + // Fetch logs for all contracts and topics + contractAddresses, err := f.InterestingLogsProvider.ListInterestingContractAddresses() + if err != nil { + f.Logger.Sugar().Errorw("failed to list interesting contract addresses", zap.Error(err)) + return nil, err + } + + interestingBlockNumbers, err := f.FetchLogsForContractsForBlockRange(ctx, startBlockInclusive, endBlockInclusive, contractAddresses) + if err != nil { + f.Logger.Sugar().Errorw("failed to fetch logs for contracts", zap.Error(err)) + return nil, err + } + blocks, err := f.FetchBlocks(ctx, startBlockInclusive, endBlockInclusive) + if err != nil { + f.Logger.Sugar().Errorw("failed to fetch blocks", + zap.Uint64("startBlock", startBlockInclusive), + zap.Uint64("endBlock", endBlockInclusive), + zap.Error(err), + ) + return nil, err + } + + // fetch receipts for only the interesting blocks + f.Logger.Sugar().Debugw("Fetching receipts for interesting blocks", zap.Int("count", len(interestingBlockNumbers))) + + // pick blocks from the interesting block numbers + interestingBlocks := utils.Filter(blocks, func(b *ethereum.EthereumBlock) bool { + return slices.Contains(interestingBlockNumbers, b.Number.Value()) + }) + + // get receipts for only the interesting blocks + receipts := make([]*FetchedBlock, 0) + + if len(interestingBlocks) > 0 { + receipts, err = f.FetchReceiptsForBlocks(ctx, interestingBlocks) + if err != nil { + f.Logger.Sugar().Errorw("failed to fetch receipts for interesting blocks", zap.Error(err)) + return nil, err + } + } + + finalBlocks := utils.Map(blocks, func(b *ethereum.EthereumBlock, i uint64) *FetchedBlock { + if slices.Contains(interestingBlockNumbers, b.Number.Value()) { + foundBlock := utils.Find(receipts, func(r *FetchedBlock) bool { + return r.Block.Number.Value() == b.Number.Value() + }) + if foundBlock != nil { + return foundBlock + } + } + return &FetchedBlock{ + Block: b, + TxReceipts: make(map[string]*ethereum.EthereumTransactionReceipt), + } + }) + + return finalBlocks, nil +} + +func (f *Fetcher) FetchInterestingBlocksAndLogsForContractsForBlockRange(ctx context.Context, startBlockInclusive uint64, endBlockInclusive uint64, contractAddresses []string) ([]uint64, []*ethereum.EthereumEventLog, error) { // Define a reasonable chunk size to avoid HTTP 413 errors // This value can be adjusted based on your specific node's limitations const chunkSize uint64 = 4000 @@ -362,19 +500,7 @@ func (f *Fetcher) FetchBlockList(ctx context.Context, blockNumbers []uint64) ([] return blocks, nil } -func (f *Fetcher) FetchBlockListWithReceipts(ctx context.Context, blockNumbers []uint64) ([]*FetchedBlock, error) { - if len(blockNumbers) == 0 { - return []*FetchedBlock{}, nil - } - - blocks, err := f.FetchBlockList(ctx, blockNumbers) - if err != nil { - f.Logger.Sugar().Errorw("failed to fetch block list", - zap.Error(err), - ) - return nil, err - } - +func (f *Fetcher) FetchReceiptsForBlocks(ctx context.Context, blocks []*ethereum.EthereumBlock) ([]*FetchedBlock, error) { fetchedBlockResponses := make(chan *FetchedBlock, len(blocks)) foundErrorsChan := make(chan bool, 1) @@ -401,18 +527,14 @@ func (f *Fetcher) FetchBlockListWithReceipts(ctx context.Context, blockNumbers [ wg.Wait() close(fetchedBlockResponses) close(foundErrorsChan) - foundErrors := <-foundErrorsChan - if foundErrors { return nil, errors.New("failed to fetch receipts for some blocks") } - fetchedBlocks := make([]*FetchedBlock, 0) for fb := range fetchedBlockResponses { fetchedBlocks = append(fetchedBlocks, fb) } - if len(fetchedBlocks) != len(blocks) { f.Logger.Sugar().Errorw("failed to fetch all blocks", zap.Int("fetched", len(fetchedBlocks)), @@ -420,7 +542,6 @@ func (f *Fetcher) FetchBlockListWithReceipts(ctx context.Context, blockNumbers [ ) return nil, errors.New("failed to fetch all blocks") } - // ensure blocks are sorted ascending slices.SortFunc(fetchedBlocks, func(i, j *FetchedBlock) int { return int(i.Block.Number.Value() - j.Block.Number.Value()) @@ -428,8 +549,8 @@ func (f *Fetcher) FetchBlockListWithReceipts(ctx context.Context, blockNumbers [ f.Logger.Sugar().Debugw("Fetched blocks", zap.Int("count", len(fetchedBlocks)), - zap.Uint64("startBlock", blockNumbers[0]), - zap.Uint64("endBlock", blockNumbers[len(blockNumbers)-1]), + zap.Uint64("startBlock", blocks[0].Number.Value()), + zap.Uint64("endBlock", blocks[len(blocks)-1].Number.Value()), ) return fetchedBlocks, nil @@ -438,11 +559,67 @@ func (f *Fetcher) FetchBlockListWithReceipts(ctx context.Context, blockNumbers [ // FetchBlocks retrieves a range of blocks and their transaction receipts. // It uses batch requests to fetch blocks and parallel processing to fetch receipts. // Returns an array of FetchedBlock objects sorted by block number. -func (f *Fetcher) FetchBlocks(ctx context.Context, startBlockInclusive uint64, endBlockInclusive uint64) ([]*FetchedBlock, error) { +func (f *Fetcher) FetchBlocks(ctx context.Context, startBlockInclusive uint64, endBlockInclusive uint64) ([]*ethereum.EthereumBlock, error) { blockNumbers := make([]uint64, 0) for i := startBlockInclusive; i <= endBlockInclusive; i++ { blockNumbers = append(blockNumbers, i) } - return f.FetchBlockListWithReceipts(ctx, blockNumbers) + if len(blockNumbers) == 0 { + return []*ethereum.EthereumBlock{}, nil + } + + blockRequests := make([]*ethereum.RPCRequest, 0) + for i, n := range blockNumbers { + blockRequests = append(blockRequests, ethereum.GetBlockByNumberRequest(n, uint(i))) + } + blockResponses, err := f.EthClient.BatchCall(ctx, blockRequests) + if err != nil { + f.Logger.Sugar().Errorw("failed to batch call for blocks", zap.Error(err)) + return nil, err + } + if len(blockResponses) != len(blockNumbers) { + f.Logger.Sugar().Errorw("failed to fetch all blocks", + zap.Int("fetched", len(blockResponses)), + zap.Int("expected", len(blockNumbers)), + ) + return nil, errors.New("failed to fetch all blocks") + } + blocks := make([]*ethereum.EthereumBlock, 0) + for _, response := range blockResponses { + b, err := ethereum.RPCMethod_getBlockByNumber.ResponseParser(response.Result) + if err != nil { + f.Logger.Sugar().Errorw("failed to parse block", + zap.Error(err), + zap.Uint("response ID", *response.ID), + ) + return nil, err + } + blocks = append(blocks, b) + } + if len(blocks) != len(blockNumbers) { + f.Logger.Sugar().Errorw("failed to fetch all blocks", + zap.Int("fetched", len(blocks)), + zap.Int("expected", len(blockNumbers)), + ) + return nil, err + } + return blocks, nil +} + +// FetchBlocksWithReceipts retrieves a range of blocks and their transaction receipts. +// It uses batch requests to fetch blocks and parallel processing to fetch receipts. +// Returns an array of FetchedBlock objects sorted by block number. +func (f *Fetcher) FetchBlocksWithReceipts(ctx context.Context, startBlockInclusive uint64, endBlockInclusive uint64) ([]*FetchedBlock, error) { + blocks, err := f.FetchBlocks(ctx, startBlockInclusive, endBlockInclusive) + if err != nil { + f.Logger.Sugar().Errorw("failed to fetch blocks", + zap.Uint64("startBlock", startBlockInclusive), + zap.Uint64("endBlock", endBlockInclusive), + zap.Error(err), + ) + return nil, err + } + + return f.FetchReceiptsForBlocks(ctx, blocks) } diff --git a/pkg/indexer/indexer.go b/pkg/indexer/indexer.go index 136957651..98514e572 100644 --- a/pkg/indexer/indexer.go +++ b/pkg/indexer/indexer.go @@ -164,13 +164,14 @@ func (idx *Indexer) ParseInterestingTransactionsAndLogs(ctx context.Context, fet for i, tx := range fetchedBlock.Block.Transactions { txReceipt, ok := fetchedBlock.TxReceipts[tx.Hash.Value()] if !ok { - idx.Logger.Sugar().Errorw("Receipt not found for transaction", - zap.String("txHash", tx.Hash.Value()), - zap.Uint64("block", tx.BlockNumber.Value()), - ) - return nil, NewIndexError(IndexError_ReceiptNotFound, fmt.Errorf("receipt not found for transaction")). - WithBlockNumber(tx.BlockNumber.Value()). - WithTransactionHash(tx.Hash.Value()) + continue + // idx.Logger.Sugar().Errorw("Receipt not found for transaction", + // zap.String("txHash", tx.Hash.Value()), + // zap.Uint64("block", tx.BlockNumber.Value()), + // ) + // return nil, NewIndexError(IndexError_ReceiptNotFound, fmt.Errorf("receipt not found for transaction")). + // WithBlockNumber(tx.BlockNumber.Value()). + // WithTransactionHash(tx.Hash.Value()) } parsedTransactionAndLogs, err := idx.ParseTransactionLogs(tx, txReceipt) @@ -250,7 +251,7 @@ func (idx *Indexer) IsInterestingAddress(addr string) bool { return true } - addresses, err := idx.ContractStore.GetAllProxyAddressesInString() + addresses, err := idx.ContractStore.ListInterestingContractAddresses() if err != nil { return false } diff --git a/pkg/indexer/restakedStrategies_test.go b/pkg/indexer/restakedStrategies_test.go index 709045a5b..0032b28f8 100644 --- a/pkg/indexer/restakedStrategies_test.go +++ b/pkg/indexer/restakedStrategies_test.go @@ -85,7 +85,7 @@ func Test_IndexerRestakedStrategies(t *testing.T) { mds := pgStorage.NewPostgresBlockStore(grm, l, cfg) - fetchr := fetcher.NewFetcher(client, &fetcher.FetcherConfig{UseGetBlockReceipts: cfg.EthereumRpcConfig.UseGetBlockReceipts}, l) + fetchr := fetcher.NewFetcher(client, &fetcher.FetcherConfig{UseGetBlockReceipts: cfg.EthereumRpcConfig.UseGetBlockReceipts}, contractStore, l) mccc := multicallContractCaller.NewMulticallContractCaller(client, l) diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index 8c18dd224..21fdef2d7 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -601,7 +601,7 @@ func (p *Pipeline) RunForBlockBatch(ctx context.Context, startBlock uint64, endB fetchSpan.SetTag("end_block", endBlock) fetchStartTime := time.Now() - fetchedBlocks, err := p.Fetcher.FetchBlocksWithRetries(fetchCtx, startBlock, endBlock) + fetchedBlocks, err := p.Fetcher.FetchFilteredBlocksWithRetries(fetchCtx, startBlock, endBlock) fetchDuration := time.Since(fetchStartTime) fetchSpan.SetTag("duration_ms", fetchDuration.Milliseconds()) diff --git a/pkg/pipeline/pipelineIntegration_test.go b/pkg/pipeline/pipelineIntegration_test.go index 227424824..5d6066712 100644 --- a/pkg/pipeline/pipelineIntegration_test.go +++ b/pkg/pipeline/pipelineIntegration_test.go @@ -115,7 +115,7 @@ func setup(ethConfig *ethereum.EthereumClientConfig) ( rcq := rewardsCalculatorQueue.NewRewardsCalculatorQueue(rc, l) - fetchr := fetcher.NewFetcher(client, &fetcher.FetcherConfig{UseGetBlockReceipts: cfg.EthereumRpcConfig.UseGetBlockReceipts}, l) + fetchr := fetcher.NewFetcher(client, &fetcher.FetcherConfig{UseGetBlockReceipts: cfg.EthereumRpcConfig.UseGetBlockReceipts}, contractStore, l) cc := sequentialContractCaller.NewSequentialContractCaller(client, cfg, 10, l) diff --git a/pkg/providers/interfaces.go b/pkg/providers/interfaces.go new file mode 100644 index 000000000..ae31b65a2 --- /dev/null +++ b/pkg/providers/interfaces.go @@ -0,0 +1,5 @@ +package providers + +type InterestingContractsProvider interface { + ListInterestingContractAddresses() ([]string, error) +} diff --git a/pkg/sidecar/blockIndexer.go b/pkg/sidecar/blockIndexer.go index 5795f4979..2d6c97760 100644 --- a/pkg/sidecar/blockIndexer.go +++ b/pkg/sidecar/blockIndexer.go @@ -352,7 +352,7 @@ func (s *Sidecar) IndexFromCurrentToTip(ctx context.Context) error { } tip := currentTip.Load() - batchEndBlock := int64(currentBlock + 100) + batchEndBlock := int64(currentBlock + 200) if batchEndBlock > int64(tip) { batchEndBlock = int64(tip) } diff --git a/pkg/transactionBackfiller/backfiller_test.go b/pkg/transactionBackfiller/backfiller_test.go index b46963856..46e3690aa 100644 --- a/pkg/transactionBackfiller/backfiller_test.go +++ b/pkg/transactionBackfiller/backfiller_test.go @@ -71,7 +71,7 @@ func setup(ethConfig *ethereum.EthereumClientConfig) ( cm := contractManager.NewContractManager(grm, contractStore, client, af, l) - fetchr := fetcher.NewFetcher(client, &fetcher.FetcherConfig{UseGetBlockReceipts: cfg.EthereumRpcConfig.UseGetBlockReceipts}, l) + fetchr := fetcher.NewFetcher(client, &fetcher.FetcherConfig{UseGetBlockReceipts: cfg.EthereumRpcConfig.UseGetBlockReceipts}, contractStore, l) return cfg, l, fetchr, mds, grm, cm, dbname, nil From f5d2adec9a4df75cbfc24e1d58a0e47123d97f24 Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Wed, 4 Jun 2025 20:11:59 -0500 Subject: [PATCH 05/27] update NewFetcher calls --- cmd/debugger.go | 2 +- .../202505092016_fixRewardsClaimedTransactions/job_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/debugger.go b/cmd/debugger.go index 2c56af0d6..dcd7c88a6 100644 --- a/cmd/debugger.go +++ b/cmd/debugger.go @@ -114,7 +114,7 @@ var runDebuggerCmd = &cobra.Command{ precommitProcessors.LoadPrecommitProcessors(sm, grm, l) - fetchr := fetcher.NewFetcher(client, &fetcher.FetcherConfig{UseGetBlockReceipts: cfg.EthereumRpcConfig.UseGetBlockReceipts}, l) + fetchr := fetcher.NewFetcher(client, &fetcher.FetcherConfig{UseGetBlockReceipts: cfg.EthereumRpcConfig.UseGetBlockReceipts}, contractStore, l) cc := sequentialContractCaller.NewSequentialContractCaller(client, cfg, cfg.EthereumRpcConfig.ContractCallBatchSize, l) diff --git a/pkg/startupJobs/202505092016_fixRewardsClaimedTransactions/job_test.go b/pkg/startupJobs/202505092016_fixRewardsClaimedTransactions/job_test.go index 1e7275773..feb992b4e 100644 --- a/pkg/startupJobs/202505092016_fixRewardsClaimedTransactions/job_test.go +++ b/pkg/startupJobs/202505092016_fixRewardsClaimedTransactions/job_test.go @@ -85,7 +85,7 @@ func setup(ethConfig *ethereum.EthereumClientConfig) ( l.Sugar().Fatalw("Failed to load meta state models", zap.Error(err)) } - fetchr := fetcher.NewFetcher(client, &fetcher.FetcherConfig{UseGetBlockReceipts: cfg.EthereumRpcConfig.UseGetBlockReceipts}, l) + fetchr := fetcher.NewFetcher(client, &fetcher.FetcherConfig{UseGetBlockReceipts: cfg.EthereumRpcConfig.UseGetBlockReceipts}, contractStore, l) cc := sequentialContractCaller.NewSequentialContractCaller(client, cfg, 10, l) From df9a379c22b94ffdc91db44f2a8a4a4db918718c Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Thu, 5 Jun 2025 12:23:05 -0500 Subject: [PATCH 06/27] debug log, not info --- pkg/clients/ethereum/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/clients/ethereum/client.go b/pkg/clients/ethereum/client.go index 04c75f228..4be8d9e22 100644 --- a/pkg/clients/ethereum/client.go +++ b/pkg/clients/ethereum/client.go @@ -278,7 +278,7 @@ func (c *Client) GetBlockTransactionReceipts(ctx context.Context, blockNumber ui } func (c *Client) GetLogs(ctx context.Context, address string, fromBlock uint64, toBlock uint64) ([]*EthereumEventLog, error) { - c.Logger.Sugar().Infow("GetLogs", + c.Logger.Sugar().Debugw("GetLogs", zap.String("address", address), zap.Uint64("fromBlock", fromBlock), zap.Uint64("toBlock", toBlock), From f05f7d757e34a9538bd2ba605e729d01c9423309 Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Thu, 5 Jun 2025 13:10:10 -0500 Subject: [PATCH 07/27] update fk constraints --- .../202506042023_addMissingBlockNumberForeignKeys/up.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/postgres/migrations/202506042023_addMissingBlockNumberForeignKeys/up.go b/pkg/postgres/migrations/202506042023_addMissingBlockNumberForeignKeys/up.go index 5d86838f1..49d2ea17e 100644 --- a/pkg/postgres/migrations/202506042023_addMissingBlockNumberForeignKeys/up.go +++ b/pkg/postgres/migrations/202506042023_addMissingBlockNumberForeignKeys/up.go @@ -14,6 +14,9 @@ func (m *Migration) Up(db *sql.DB, grm *gorm.DB, cfg *config.Config) error { tableNames := []string{ "staker_shares", "operator_shares", + "combined_rewards", + "disabled_distribution_roots", + "operator_directed_rewards", } for _, tableName := range tableNames { @@ -28,6 +31,7 @@ func (m *Migration) Up(db *sql.DB, grm *gorm.DB, cfg *config.Config) error { `create index if not exists idx_staker_shares_block_number on staker_shares(block_number);`, `create index if not exists idx_operator_shares_block_number on operator_shares(block_number);`, `create index if not exists idx_combined_rewards_block_number on combined_rewards(block_number);`, + `alter table rewards_claimed drop constraint rewards_claimed_transaction_hash_log_index_key`, `alter table rewards_claimed add constraint rewards_claimed_transaction_hash_log_index_key unique (transaction_hash, log_index);`, } From a77b263190cc5d19d9ff86a6139dab5186b7e87a Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Thu, 5 Jun 2025 15:39:44 -0500 Subject: [PATCH 08/27] convert kebab case to snake case --- internal/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/config/config.go b/internal/config/config.go index 3b55daef5..daad444fd 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -212,7 +212,7 @@ var ( EthereumRpcUseNativeBatchCall = "ethereum.use_native_batch_call" EthereumRpcNativeBatchCallSize = "ethereum.native_batch_call_size" EthereumRpcChunkedBatchCallSize = "ethereum.chunked_batch_call_size" - EthereumUseGetBlockReceipts = "ethereum.use-get-block-receipts" + EthereumUseGetBlockReceipts = "ethereum.use_get_block_receipts" EthereumLatestBlockType = "ethereum.latest-block-type" DataDogStatsdEnabled = "datadog.statsd.enabled" From ecc359666387845b7e0e704de317e13bda638c03 Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Thu, 5 Jun 2025 16:09:36 -0500 Subject: [PATCH 09/27] empty commit From ce1218b41592bdc5d32c5079dcd420deae12f7c1 Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Thu, 5 Jun 2025 16:14:49 -0500 Subject: [PATCH 10/27] fix test --- .../202506042023_addMissingBlockNumberForeignKeys/up.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/postgres/migrations/202506042023_addMissingBlockNumberForeignKeys/up.go b/pkg/postgres/migrations/202506042023_addMissingBlockNumberForeignKeys/up.go index 49d2ea17e..9542a254a 100644 --- a/pkg/postgres/migrations/202506042023_addMissingBlockNumberForeignKeys/up.go +++ b/pkg/postgres/migrations/202506042023_addMissingBlockNumberForeignKeys/up.go @@ -14,7 +14,6 @@ func (m *Migration) Up(db *sql.DB, grm *gorm.DB, cfg *config.Config) error { tableNames := []string{ "staker_shares", "operator_shares", - "combined_rewards", "disabled_distribution_roots", "operator_directed_rewards", } From 9b2fa8267f7a1bcd27a06b8ddb7a15541ccdefa2 Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Thu, 5 Jun 2025 16:21:24 -0500 Subject: [PATCH 11/27] increase log verbosity for getLogs --- pkg/fetcher/fetcher.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/fetcher/fetcher.go b/pkg/fetcher/fetcher.go index 44e8f6123..3298fbfa4 100644 --- a/pkg/fetcher/fetcher.go +++ b/pkg/fetcher/fetcher.go @@ -43,6 +43,7 @@ func NewFetcher( ilp providers.InterestingContractsProvider, l *zap.Logger, ) *Fetcher { + l.Sugar().Infow("Created fetcher", zap.Any("config", cfg)) return &Fetcher{ EthClient: ethClient, Logger: l, @@ -275,7 +276,7 @@ func (f *Fetcher) FetchLogsForContractsForBlockRange(ctx context.Context, startB wg.Wait() close(logsCollector) close(errorCollector) - f.Logger.Sugar().Debugw("Finished fetching logs for contracts", + f.Logger.Sugar().Infow("Finished fetching logs for contracts", zap.Uint64("startBlock", startBlockInclusive), zap.Uint64("endBlock", endBlockInclusive), zap.Strings("contracts", contractAddresses), From 5c7db472ce475eb532b0823f38da520c4705ed59 Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Thu, 5 Jun 2025 16:28:19 -0500 Subject: [PATCH 12/27] log out interesting block count --- pkg/fetcher/fetcher.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/fetcher/fetcher.go b/pkg/fetcher/fetcher.go index 3298fbfa4..8e18e5fbd 100644 --- a/pkg/fetcher/fetcher.go +++ b/pkg/fetcher/fetcher.go @@ -276,11 +276,6 @@ func (f *Fetcher) FetchLogsForContractsForBlockRange(ctx context.Context, startB wg.Wait() close(logsCollector) close(errorCollector) - f.Logger.Sugar().Infow("Finished fetching logs for contracts", - zap.Uint64("startBlock", startBlockInclusive), - zap.Uint64("endBlock", endBlockInclusive), - zap.Strings("contracts", contractAddresses), - ) interestingBlockNumbers := make(map[uint64]bool) // collectedLogs := make([]*ethereum.EthereumEventLog, 0) for logs := range logsCollector { @@ -289,6 +284,12 @@ func (f *Fetcher) FetchLogsForContractsForBlockRange(ctx context.Context, startB interestingBlockNumbers[log.BlockNumber.Value()] = true } } + f.Logger.Sugar().Infow("Finished fetching logs for contracts", + zap.Uint64("startBlock", startBlockInclusive), + zap.Uint64("endBlock", endBlockInclusive), + zap.Strings("contracts", contractAddresses), + zap.Int("interestingBlocks", len(interestingBlockNumbers)), + ) var err error for e := range errorCollector { err = e From b3e7d4f1578d5175bf96c5bdcd1b5b0a1c266873 Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Thu, 5 Jun 2025 20:09:45 -0500 Subject: [PATCH 13/27] ignore conflicts on contract insert --- .../postgresContractStore/postgresContractStore.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/contractStore/postgresContractStore/postgresContractStore.go b/pkg/contractStore/postgresContractStore/postgresContractStore.go index 892f17d55..c50682c59 100644 --- a/pkg/contractStore/postgresContractStore/postgresContractStore.go +++ b/pkg/contractStore/postgresContractStore/postgresContractStore.go @@ -113,8 +113,14 @@ func (s *PostgresContractStore) CreateContract( MatchingContractAddress: matchingContractAddress, CheckedForAbi: checkedForAbi, } - - result := s.Db.Create(contract) + clauses := []clause.Expression{ + clause.Returning{}, + clause.OnConflict{ + Columns: []clause.Column{{Name: "contract_address"}}, + DoNothing: true, + }, + } + result := s.Db.Model(&contractStore.Contract{}).Clauses(clauses...).Create(&contract) if result.Error != nil { return nil, result.Error } From c876b17a7592d392aaced88742e1b176e15410bc Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Thu, 5 Jun 2025 20:10:51 -0500 Subject: [PATCH 14/27] upsert proxy to handle error --- pkg/contractManager/contractManager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/contractManager/contractManager.go b/pkg/contractManager/contractManager.go index 127d0ba16..e88c2a950 100644 --- a/pkg/contractManager/contractManager.go +++ b/pkg/contractManager/contractManager.go @@ -154,7 +154,7 @@ func (cm *ContractManager) CreateUpgradedProxyContract( } // Create a proxy contract - _, err := cm.ContractStore.CreateProxyContract(blockNumber, contractAddress, proxyContractAddress) + _, _, err := cm.ContractStore.FindOrCreateProxyContract(blockNumber, contractAddress, proxyContractAddress) if err != nil { cm.Logger.Sugar().Errorw("Failed to create proxy contract", zap.Error(err), From 79c3e0c98d9c0c258145ae90ad95102134e25bf9 Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Thu, 5 Jun 2025 20:16:54 -0500 Subject: [PATCH 15/27] use ubuntu-24.04 --- .github/workflows/main.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index cfeb140fb..7040cefee 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -52,7 +52,8 @@ jobs: echo $PATH make lint build-container: - runs-on: protocol-gha-runners + #runs-on: protocol-gha-runners + runs-on: ubuntu-24.04 steps: - name: Checkout uses: actions/checkout@v4 From f84f234db38dc30cedc82edf1d4864af5f9b6fbe Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Thu, 5 Jun 2025 20:48:38 -0500 Subject: [PATCH 16/27] skip contracts that already exist --- pkg/contractManager/contractManager.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/pkg/contractManager/contractManager.go b/pkg/contractManager/contractManager.go index e88c2a950..c0e896e19 100644 --- a/pkg/contractManager/contractManager.go +++ b/pkg/contractManager/contractManager.go @@ -154,7 +154,7 @@ func (cm *ContractManager) CreateUpgradedProxyContract( } // Create a proxy contract - _, _, err := cm.ContractStore.FindOrCreateProxyContract(blockNumber, contractAddress, proxyContractAddress) + _, found, err := cm.ContractStore.FindOrCreateProxyContract(blockNumber, contractAddress, proxyContractAddress) if err != nil { cm.Logger.Sugar().Errorw("Failed to create proxy contract", zap.Error(err), @@ -163,6 +163,14 @@ func (cm *ContractManager) CreateUpgradedProxyContract( ) return err } + // if the proxy exists, no need to create it again + if found { + cm.Logger.Sugar().Infow("Proxy contract already exists, skipping creation", + zap.String("contractAddress", contractAddress), + zap.String("proxyContractAddress", proxyContractAddress), + ) + return nil + } // Fetch bytecode hash bytecodeHash, err := cm.AbiFetcher.FetchContractBytecodeHash(ctx, proxyContractAddress) @@ -179,7 +187,8 @@ func (cm *ContractManager) CreateUpgradedProxyContract( if err != nil { cm.Logger.Sugar().Errorw("Failed to fetch contract abi", zap.Error(err), - zap.String("address", proxyContractAddress), + zap.String("proxyContractAddress", proxyContractAddress), + zap.String("contractAddress", contractAddress), ) return err } From 80e595aa31c73201c79d624a4da32255903e2491 Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Thu, 5 Jun 2025 20:49:32 -0500 Subject: [PATCH 17/27] additional logging --- pkg/contractManager/contractManager.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/contractManager/contractManager.go b/pkg/contractManager/contractManager.go index c0e896e19..30dc0a66c 100644 --- a/pkg/contractManager/contractManager.go +++ b/pkg/contractManager/contractManager.go @@ -126,7 +126,11 @@ func (cm *ContractManager) HandleContractUpgrade(ctx context.Context, blockNumbe err := cm.CreateUpgradedProxyContract(ctx, blockNumber, upgradedLog.Address, newProxiedAddress) if err != nil { - cm.Logger.Sugar().Errorw("Failed to create proxy contract", zap.Error(err)) + cm.Logger.Sugar().Errorw("Failed to create proxy contract", + zap.String("contractAddress", upgradedLog.Address), + zap.String("newProxiedAddress", newProxiedAddress), + zap.Error(err), + ) return err } cm.Logger.Sugar().Infow("Upgraded proxy contract", zap.String("contractAddress", upgradedLog.Address), zap.String("proxyContractAddress", newProxiedAddress)) From 6c80d695643474f0cb1929e318ffaa29ebc09fa1 Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Thu, 5 Jun 2025 20:59:44 -0500 Subject: [PATCH 18/27] add more logging --- pkg/contractManager/contractManager.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/contractManager/contractManager.go b/pkg/contractManager/contractManager.go index 30dc0a66c..149b5a27f 100644 --- a/pkg/contractManager/contractManager.go +++ b/pkg/contractManager/contractManager.go @@ -147,6 +147,11 @@ func (cm *ContractManager) CreateUpgradedProxyContract( contractAddress string, proxyContractAddress string, ) error { + cm.Logger.Sugar().Infow("Creating upgraded proxy contract", + zap.String("contractAddress", contractAddress), + zap.String("proxyContractAddress", proxyContractAddress), + zap.Uint64("blockNumber", blockNumber), + ) // Check if proxy contract already exists proxyContract, _ := cm.ContractStore.GetProxyContractForAddress(blockNumber, contractAddress) if proxyContract != nil { From f198db8cfdd192e26d9df1749e1065176813667f Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Thu, 5 Jun 2025 21:13:01 -0500 Subject: [PATCH 19/27] more logging --- .../postgresContractStore/postgresContractStore.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/pkg/contractStore/postgresContractStore/postgresContractStore.go b/pkg/contractStore/postgresContractStore/postgresContractStore.go index c50682c59..5b7aa9333 100644 --- a/pkg/contractStore/postgresContractStore/postgresContractStore.go +++ b/pkg/contractStore/postgresContractStore/postgresContractStore.go @@ -200,17 +200,26 @@ func (s *PostgresContractStore) FindOrCreateProxyContract( upsertedContract, err := helpers.WrapTxAndCommit[*contractStore.ProxyContract](func(tx *gorm.DB) (*contractStore.ProxyContract, error) { contract := &contractStore.ProxyContract{} // Proxy contracts are unique on block_number && contract - result := tx.First(&contract, "contract_address = ? and block_number = ?", contractAddress, blockNumber) + result := tx.Debug().First(&contract, "contract_address = ? and block_number = ?", contractAddress, blockNumber) if result.Error != nil && !errors.Is(result.Error, gorm.ErrRecordNotFound) { return nil, result.Error } // found contract if contract.ContractAddress == contractAddress { + s.Logger.Sugar().Infow("Found existing proxy contract", + zap.String("contractAddress", contractAddress), + zap.String("proxyContractAddress", proxyContractAddress), + ) found = true return contract, nil } + s.Logger.Sugar().Infow("Proxy contract not found, creating new one", + zap.String("contractAddress", contractAddress), + zap.String("proxyContractAddress", proxyContractAddress), + ) + proxyContract, err := s.CreateProxyContract(blockNumber, contractAddress, proxyContractAddress) if err != nil { s.Logger.Sugar().Errorw("Failed to create proxy contract", zap.Error(err), zap.String("contractAddress", contractAddress)) From f12ec97d8120572e5946c18a79ae6ea81f68bdcc Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Thu, 5 Jun 2025 21:14:38 -0500 Subject: [PATCH 20/27] change the proxy contract lookup logic to be more correct --- .../postgresContractStore/postgresContractStore.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/contractStore/postgresContractStore/postgresContractStore.go b/pkg/contractStore/postgresContractStore/postgresContractStore.go index 5b7aa9333..196211690 100644 --- a/pkg/contractStore/postgresContractStore/postgresContractStore.go +++ b/pkg/contractStore/postgresContractStore/postgresContractStore.go @@ -200,7 +200,7 @@ func (s *PostgresContractStore) FindOrCreateProxyContract( upsertedContract, err := helpers.WrapTxAndCommit[*contractStore.ProxyContract](func(tx *gorm.DB) (*contractStore.ProxyContract, error) { contract := &contractStore.ProxyContract{} // Proxy contracts are unique on block_number && contract - result := tx.Debug().First(&contract, "contract_address = ? and block_number = ?", contractAddress, blockNumber) + result := tx.Debug().First(&contract, "contract_address = ? and proxy_contract_address = ?", contractAddress, proxyContractAddress) if result.Error != nil && !errors.Is(result.Error, gorm.ErrRecordNotFound) { return nil, result.Error } From 3a27ffe9db041097eb85b9b8b44c579310ae6317 Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Thu, 5 Jun 2025 21:23:19 -0500 Subject: [PATCH 21/27] remove tx wrapper --- .../postgresContractStore.go | 49 +++++++++---------- 1 file changed, 22 insertions(+), 27 deletions(-) diff --git a/pkg/contractStore/postgresContractStore/postgresContractStore.go b/pkg/contractStore/postgresContractStore/postgresContractStore.go index 196211690..926d3121e 100644 --- a/pkg/contractStore/postgresContractStore/postgresContractStore.go +++ b/pkg/contractStore/postgresContractStore/postgresContractStore.go @@ -193,42 +193,37 @@ func (s *PostgresContractStore) FindOrCreateProxyContract( contractAddress string, proxyContractAddress string, ) (*contractStore.ProxyContract, bool, error) { - found := false contractAddress = strings.ToLower(contractAddress) proxyContractAddress = strings.ToLower(proxyContractAddress) - upsertedContract, err := helpers.WrapTxAndCommit[*contractStore.ProxyContract](func(tx *gorm.DB) (*contractStore.ProxyContract, error) { - contract := &contractStore.ProxyContract{} - // Proxy contracts are unique on block_number && contract - result := tx.Debug().First(&contract, "contract_address = ? and proxy_contract_address = ?", contractAddress, proxyContractAddress) - if result.Error != nil && !errors.Is(result.Error, gorm.ErrRecordNotFound) { - return nil, result.Error - } - - // found contract - if contract.ContractAddress == contractAddress { - s.Logger.Sugar().Infow("Found existing proxy contract", - zap.String("contractAddress", contractAddress), - zap.String("proxyContractAddress", proxyContractAddress), - ) - found = true - return contract, nil - } + contract := &contractStore.ProxyContract{} + // Proxy contracts are unique on block_number && contract + result := s.Db.Model(&contractStore.ProxyContract{}).Debug().First(&contract, "contract_address = ? and proxy_contract_address = ?", contractAddress, proxyContractAddress) + if result.Error != nil && !errors.Is(result.Error, gorm.ErrRecordNotFound) { + return nil, false, result.Error + } - s.Logger.Sugar().Infow("Proxy contract not found, creating new one", + // found contract + if contract.ContractAddress == contractAddress { + s.Logger.Sugar().Infow("Found existing proxy contract", zap.String("contractAddress", contractAddress), zap.String("proxyContractAddress", proxyContractAddress), ) + return contract, true, nil + } - proxyContract, err := s.CreateProxyContract(blockNumber, contractAddress, proxyContractAddress) - if err != nil { - s.Logger.Sugar().Errorw("Failed to create proxy contract", zap.Error(err), zap.String("contractAddress", contractAddress)) - return nil, err - } + s.Logger.Sugar().Infow("Proxy contract not found, creating new one", + zap.String("contractAddress", contractAddress), + zap.String("proxyContractAddress", proxyContractAddress), + ) - return proxyContract, nil - }, s.Db, nil) - return upsertedContract, found, err + proxyContract, err := s.CreateProxyContract(blockNumber, contractAddress, proxyContractAddress) + if err != nil { + s.Logger.Sugar().Errorw("Failed to create proxy contract", zap.Error(err), zap.String("contractAddress", contractAddress)) + return nil, false, err + } + + return proxyContract, false, err } func (s *PostgresContractStore) GetProxyContractWithImplementations(contractAddress string) ([]*contractStore.Contract, error) { From 031fc9ab7c7027741ed4ea4e7b134c1744c04403 Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Thu, 5 Jun 2025 21:29:31 -0500 Subject: [PATCH 22/27] trying raw query --- .../postgresContractStore/postgresContractStore.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/pkg/contractStore/postgresContractStore/postgresContractStore.go b/pkg/contractStore/postgresContractStore/postgresContractStore.go index 926d3121e..73163a8f4 100644 --- a/pkg/contractStore/postgresContractStore/postgresContractStore.go +++ b/pkg/contractStore/postgresContractStore/postgresContractStore.go @@ -198,7 +198,16 @@ func (s *PostgresContractStore) FindOrCreateProxyContract( contract := &contractStore.ProxyContract{} // Proxy contracts are unique on block_number && contract - result := s.Db.Model(&contractStore.ProxyContract{}).Debug().First(&contract, "contract_address = ? and proxy_contract_address = ?", contractAddress, proxyContractAddress) + query := ` + select + * + from proxy_contracts + where + contract_address = @contractAddress + and proxy_contract_address = @proxyContractAddress + limit 1 + ` + result := s.Db.Debug().Raw(query, sql.Named("contractAddress", contractAddress), sql.Named("proxyContractAddress", proxyContractAddress)).Scan(&contract) if result.Error != nil && !errors.Is(result.Error, gorm.ErrRecordNotFound) { return nil, false, result.Error } From ece9200228c4dfc81792d3ef7555b2fd2a227490 Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Thu, 5 Jun 2025 21:49:02 -0500 Subject: [PATCH 23/27] get around etherscan for now --- pkg/contractManager/contractManager.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/pkg/contractManager/contractManager.go b/pkg/contractManager/contractManager.go index 149b5a27f..84e61729e 100644 --- a/pkg/contractManager/contractManager.go +++ b/pkg/contractManager/contractManager.go @@ -181,6 +181,23 @@ func (cm *ContractManager) CreateUpgradedProxyContract( return nil } + foundContract, err := cm.ContractStore.GetContractForAddress(proxyContractAddress) + if err != nil { + cm.Logger.Sugar().Errorw("Failed to find contract, proceeding with creation", + zap.String("contractAddress", contractAddress), + zap.String("proxyContractAddress", proxyContractAddress), + zap.Error(err), + ) + } + if err == nil && foundContract != nil { + cm.Logger.Sugar().Infow("Proxy contract already exists in the store, skipping creation", + zap.String("contractAddress", contractAddress), + zap.String("proxyContractAddress", proxyContractAddress), + zap.Error(err), + ) + return nil + } + // Fetch bytecode hash bytecodeHash, err := cm.AbiFetcher.FetchContractBytecodeHash(ctx, proxyContractAddress) if err != nil { From a09a41402ef611a784e271f1b1aee7e62536fb4b Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Thu, 5 Jun 2025 21:59:54 -0500 Subject: [PATCH 24/27] more debugging --- pkg/contractManager/contractManager.go | 5 +++++ .../postgresContractStore/postgresContractStore.go | 3 ++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/contractManager/contractManager.go b/pkg/contractManager/contractManager.go index 84e61729e..8e10df4d6 100644 --- a/pkg/contractManager/contractManager.go +++ b/pkg/contractManager/contractManager.go @@ -197,6 +197,11 @@ func (cm *ContractManager) CreateUpgradedProxyContract( ) return nil } + cm.Logger.Sugar().Infow("Proxy contract does not exist, creating new contract", + zap.String("contractAddress", contractAddress), + zap.String("proxyContractAddress", proxyContractAddress), + zap.Error(err), + ) // Fetch bytecode hash bytecodeHash, err := cm.AbiFetcher.FetchContractBytecodeHash(ctx, proxyContractAddress) diff --git a/pkg/contractStore/postgresContractStore/postgresContractStore.go b/pkg/contractStore/postgresContractStore/postgresContractStore.go index 73163a8f4..c500f64fc 100644 --- a/pkg/contractStore/postgresContractStore/postgresContractStore.go +++ b/pkg/contractStore/postgresContractStore/postgresContractStore.go @@ -47,7 +47,8 @@ func NewPostgresContractStore(db *gorm.DB, l *zap.Logger, cfg *config.Config) *P func (s *PostgresContractStore) GetContractForAddress(address string) (*contractStore.Contract, error) { var contract *contractStore.Contract - result := s.Db.First(&contract, "contract_address = ?", address) + address = strings.ToLower(address) + result := s.Db.Model(&contractStore.Contract{}).Debug().Find(&contract, "contract_address = ?", address) if result.Error != nil { if result.Error == gorm.ErrRecordNotFound { s.Logger.Sugar().Debugf("Contract not found in store '%s'", address) From 14569ec34c1712e6af5aa287d649d0903b586c2e Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Thu, 5 Jun 2025 22:04:35 -0500 Subject: [PATCH 25/27] less debug now that we fixed it --- .../postgresContractStore/postgresContractStore.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/contractStore/postgresContractStore/postgresContractStore.go b/pkg/contractStore/postgresContractStore/postgresContractStore.go index c500f64fc..4ebba5282 100644 --- a/pkg/contractStore/postgresContractStore/postgresContractStore.go +++ b/pkg/contractStore/postgresContractStore/postgresContractStore.go @@ -48,7 +48,7 @@ func (s *PostgresContractStore) GetContractForAddress(address string) (*contract var contract *contractStore.Contract address = strings.ToLower(address) - result := s.Db.Model(&contractStore.Contract{}).Debug().Find(&contract, "contract_address = ?", address) + result := s.Db.Model(&contractStore.Contract{}).Find(&contract, "contract_address = ?", address) if result.Error != nil { if result.Error == gorm.ErrRecordNotFound { s.Logger.Sugar().Debugf("Contract not found in store '%s'", address) @@ -208,7 +208,7 @@ func (s *PostgresContractStore) FindOrCreateProxyContract( and proxy_contract_address = @proxyContractAddress limit 1 ` - result := s.Db.Debug().Raw(query, sql.Named("contractAddress", contractAddress), sql.Named("proxyContractAddress", proxyContractAddress)).Scan(&contract) + result := s.Db.Raw(query, sql.Named("contractAddress", contractAddress), sql.Named("proxyContractAddress", proxyContractAddress)).Scan(&contract) if result.Error != nil && !errors.Is(result.Error, gorm.ErrRecordNotFound) { return nil, false, result.Error } From 7acb1b795bc4f418d0d6d67d971aac443b683fd4 Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Fri, 6 Jun 2025 10:04:59 -0500 Subject: [PATCH 26/27] make GetRewardSnapshotStatus a >= rather than strict equal to pick up on rewards that were generated as part of a later date --- pkg/fetcher/fetcher.go | 2 +- pkg/rewards/rewards.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/fetcher/fetcher.go b/pkg/fetcher/fetcher.go index 8e18e5fbd..1d6d37acb 100644 --- a/pkg/fetcher/fetcher.go +++ b/pkg/fetcher/fetcher.go @@ -284,7 +284,7 @@ func (f *Fetcher) FetchLogsForContractsForBlockRange(ctx context.Context, startB interestingBlockNumbers[log.BlockNumber.Value()] = true } } - f.Logger.Sugar().Infow("Finished fetching logs for contracts", + f.Logger.Sugar().Debugw("Finished fetching logs for contracts", zap.Uint64("startBlock", startBlockInclusive), zap.Uint64("endBlock", endBlockInclusive), zap.Strings("contracts", contractAddresses), diff --git a/pkg/rewards/rewards.go b/pkg/rewards/rewards.go index 7ec5d5079..c41c74ff9 100644 --- a/pkg/rewards/rewards.go +++ b/pkg/rewards/rewards.go @@ -193,7 +193,7 @@ func (rc *RewardsCalculator) UpdateRewardSnapshotStatus(snapshotDate string, sta func (rc *RewardsCalculator) GetRewardSnapshotStatus(snapshotDate string) (*storage.GeneratedRewardsSnapshots, error) { var r = &storage.GeneratedRewardsSnapshots{} - res := rc.grm.Model(&storage.GeneratedRewardsSnapshots{}).Where("snapshot_date = ?", snapshotDate).First(&r) + res := rc.grm.Model(&storage.GeneratedRewardsSnapshots{}).Where("snapshot_date >= ?", snapshotDate).First(&r) if res.Error != nil { if errors.Is(res.Error, gorm.ErrRecordNotFound) { return nil, nil From 642f59517b54a41b0917f791d93243d4fe2fec9f Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Fri, 6 Jun 2025 10:17:02 -0500 Subject: [PATCH 27/27] change GetMaxSnapshotDateForCutoffDate to not rely on staging tables --- pkg/rewards/rewards.go | 46 +++++++++++------------------------------- 1 file changed, 12 insertions(+), 34 deletions(-) diff --git a/pkg/rewards/rewards.go b/pkg/rewards/rewards.go index c41c74ff9..9d7d9e9f8 100644 --- a/pkg/rewards/rewards.go +++ b/pkg/rewards/rewards.go @@ -238,40 +238,18 @@ func (rc *RewardsCalculator) MerkelizeRewardsForSnapshot(snapshotDate string) ( } func (rc *RewardsCalculator) GetMaxSnapshotDateForCutoffDate(cutoffDate string) (string, error) { - goldStagingTableName := rewardsUtils.GetGoldTableNames(cutoffDate)[rewardsUtils.Table_15_GoldStaging] - - // check to see if there are any rows at all in the staging table - var count int64 - res := rc.grm.Raw(fmt.Sprintf(`select count(*) as count from %s`, goldStagingTableName)).Scan(&count) - if res.Error != nil { - rc.logger.Sugar().Errorw("Failed to get count of rows in staging table", "error", res.Error) - return "", res.Error - } - - // if there arent any rows, we need to create what the rewardsCalcEndDate would be - if count == 0 { - cutoffDateTime, err := time.Parse(time.DateOnly, cutoffDate) - if err != nil { - rc.logger.Sugar().Errorw("Failed to parse cutoff date", "error", err) - return "", err - } - // since cutoffDate is exclusive, the possible rewardsCalcEndDate is cutoffDate - 1day - rewardsCalcEndDate := cutoffDateTime.Add(-24 * time.Hour).Format(time.DateOnly) - rc.logger.Sugar().Infow("No rows found in staging table, using cutoff date", - zap.String("cutoffDate", cutoffDate), - zap.String("rewardsCalcEndDate", rewardsCalcEndDate), - ) - return rewardsCalcEndDate, nil - } - - var maxSnapshotStr string - query := fmt.Sprintf(`select to_char(max(snapshot), 'YYYY-MM-DD') as snapshot from %s`, goldStagingTableName) - res = rc.grm.Raw(query).Scan(&maxSnapshotStr) - if res.Error != nil { - rc.logger.Sugar().Errorw("Failed to get max snapshot date", "error", res.Error) - return "", res.Error - } - return maxSnapshotStr, nil + cutoffDateTime, err := time.Parse(time.DateOnly, cutoffDate) + if err != nil { + rc.logger.Sugar().Errorw("Failed to parse cutoff date", "error", err) + return "", err + } + // since cutoffDate is exclusive, the possible rewardsCalcEndDate is cutoffDate - 1day + rewardsCalcEndDate := cutoffDateTime.Add(-24 * time.Hour).Format(time.DateOnly) + rc.logger.Sugar().Infow("No rows found in staging table, using cutoff date", + zap.String("cutoffDate", cutoffDate), + zap.String("rewardsCalcEndDate", rewardsCalcEndDate), + ) + return rewardsCalcEndDate, nil } func (rc *RewardsCalculator) BackfillAllStakerOperators() error {