Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
8efc439
fix: add missing pruning logic, add missing block fk constraints
seanmcgary Jun 5, 2025
d88b26d
add block_number indexes
seanmcgary Jun 5, 2025
b7efc58
fix constraints
seanmcgary Jun 5, 2025
b514e77
perf: implement a faster from-scratch sync
seanmcgary Apr 2, 2025
f5d2ade
update NewFetcher calls
seanmcgary Jun 5, 2025
df9a379
debug log, not info
seanmcgary Jun 5, 2025
f05f7d7
update fk constraints
seanmcgary Jun 5, 2025
a77b263
convert kebab case to snake case
seanmcgary Jun 5, 2025
ecc3596
empty commit
seanmcgary Jun 5, 2025
ce1218b
fix test
seanmcgary Jun 5, 2025
9b2fa82
increase log verbosity for getLogs
seanmcgary Jun 5, 2025
5c7db47
log out interesting block count
seanmcgary Jun 5, 2025
b3e7d4f
ignore conflicts on contract insert
seanmcgary Jun 6, 2025
c876b17
upsert proxy to handle error
seanmcgary Jun 6, 2025
79c3e0c
use ubuntu-24.04
seanmcgary Jun 6, 2025
f84f234
skip contracts that already exist
seanmcgary Jun 6, 2025
80e595a
additional logging
seanmcgary Jun 6, 2025
6c80d69
add more logging
seanmcgary Jun 6, 2025
f198db8
more logging
seanmcgary Jun 6, 2025
f12ec97
change the proxy contract lookup logic to be more correct
seanmcgary Jun 6, 2025
3a27ffe
remove tx wrapper
seanmcgary Jun 6, 2025
031fc9a
trying raw query
seanmcgary Jun 6, 2025
ece9200
get around etherscan for now
seanmcgary Jun 6, 2025
a09a414
more debugging
seanmcgary Jun 6, 2025
14569ec
less debug now that we fixed it
seanmcgary Jun 6, 2025
7acb1b7
make GetRewardSnapshotStatus a >= rather than strict equal to pick up…
seanmcgary Jun 6, 2025
642f595
change GetMaxSnapshotDateForCutoffDate to not rely on staging tables
seanmcgary Jun 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package _202506042023_addMissingBlockNumberForeignKeys

import (
"database/sql"
"fmt"
"github.com/Layr-Labs/sidecar/internal/config"
"gorm.io/gorm"
)

type Migration struct {
}

func (m *Migration) Up(db *sql.DB, grm *gorm.DB, cfg *config.Config) error {
tableNames := []string{
"staker_shares",
"operator_shares",
}

for _, tableName := range tableNames {
query := fmt.Sprintf(`alter table %s add constraint %s_block_number_fkey foreign key (block_number) references blocks (number) on delete cascade;`, tableName, tableName)
res := grm.Exec(query)
if res.Error != nil {
return fmt.Errorf("failed to add foreign key for table %s: %w", tableName, res.Error)
}
}
return nil
}

func (m *Migration) GetName() string {
return "202506042023_addMissingBlockNumberForeignKeys"
}
2 changes: 2 additions & 0 deletions pkg/postgres/migrations/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import (
_202503311108_goldRewardHashIndex "github.com/Layr-Labs/sidecar/pkg/postgres/migrations/202503311108_goldRewardHashIndex"
_202504240743_fixQueuedSlashingWithdrawalsPk "github.com/Layr-Labs/sidecar/pkg/postgres/migrations/202504240743_fixQueuedSlashingWithdrawalsPk"
_202505092007_startupJobs "github.com/Layr-Labs/sidecar/pkg/postgres/migrations/202505092007_startupJobs"
_202506042023_addMissingBlockNumberForeignKeys "github.com/Layr-Labs/sidecar/pkg/postgres/migrations/202506042023_addMissingBlockNumberForeignKeys"
"go.uber.org/zap"
"gorm.io/gorm"
"time"
Expand Down Expand Up @@ -213,6 +214,7 @@ func (m *Migrator) MigrateAll() error {
&_202503171414_slashingWithdrawals.Migration{},
&_202504240743_fixQueuedSlashingWithdrawalsPk.Migration{},
&_202505092007_startupJobs.Migration{},
&_202506042023_addMissingBlockNumberForeignKeys.Migration{},
}

for _, migration := range migrations {
Expand Down
99 changes: 83 additions & 16 deletions pkg/rewards/rewards.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"sync/atomic"

"slices"
"strings"

"strconv"
Expand Down Expand Up @@ -432,7 +431,42 @@ func (rc *RewardsCalculator) findRewardsTablesBySnapshotDate(snapshotDate string
return rewardsTables, nil
}

// FindRewardHashesToPrune finds all reward hashes that will be pruned when deleting corrupted state.
func (rc *RewardsCalculator) FindRewardHashesToPrune(blockHeight uint64) ([]string, error) {
query := `
select
distinct(reward_hash) as reward_hash
from (
select reward_hash from combined_rewards where block_number >= @blockNumber
Copy link
Contributor

@serichoi65 serichoi65 Jun 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first commit must've not had the change from >= to >

union all
select reward_hash from operator_directed_rewards where block_number >= @blockNumber
union all
select
odosrs.reward_hash
from operator_directed_operator_set_reward_submissions as odosrs
-- operator_directed_operator_set_reward_submissions lacks a block_time column, so we need to join blocks
join blocks as b on (b.number = odosrs.block_number)
where
b.number >= @blockNumber
) as t
`
var rewardHashes []string
res := rc.grm.Raw(query, sql.Named("blockNumber", blockHeight)).Scan(&rewardHashes)
if res.Error != nil {
rc.logger.Sugar().Errorw("Failed to find reward hashes to prune", "error", res.Error)
return nil, res.Error
}
return rewardHashes, nil
}

// DeleteCorruptedRewardsFromBlockHeight deletes rewards data that is >= the provided block height.
func (rc *RewardsCalculator) DeleteCorruptedRewardsFromBlockHeight(blockHeight uint64) error {
rewardHashesToPrune, err := rc.FindRewardHashesToPrune(blockHeight)
if err != nil {
rc.logger.Sugar().Errorw("Failed to find reward hashes to prune", "error", err)
return err
}

generatedSnapshot, err := rc.findGeneratedRewardSnapshotByBlock(blockHeight)
if err != nil {
rc.logger.Sugar().Errorw("Failed to find generated snapshot", "error", err)
Expand Down Expand Up @@ -462,9 +496,12 @@ func (rc *RewardsCalculator) DeleteCorruptedRewardsFromBlockHeight(blockHeight u
lowerBoundSnapshot = nil
}

snapshotDates := make([]string, 0)
// Find all suffixed rewards tables that were generated as part of the rewards calculation process.
// e.g. gold_6_rfae_stakers_2024_12_01
// We since we're deleting the blocks where these rewards were applicable, we need to drop these tables
// so that they can be created again when we run the rewards calculation again.
for _, snapshot := range snapshotsToDelete {
snapshotDates = append(snapshotDates, snapshot.SnapshotDate)
// find and drop tables that were created for this snapshot (e.g. gold_6_rfae_stakers_2024_12_01)
tableNames, err := rc.findRewardsTablesBySnapshotDate(snapshot.SnapshotDate)
if err != nil {
rc.logger.Sugar().Errorw("Failed to find rewards tables", "error", err)
Expand All @@ -489,19 +526,49 @@ func (rc *RewardsCalculator) DeleteCorruptedRewardsFromBlockHeight(blockHeight u
}
}

// sort all snapshot dates in ascending order to purge from gold table
slices.SortFunc(snapshotDates, func(i, j string) int {
return strings.Compare(i, j)
})
// prune snapshot tables based on snapshot dates.
// - tables that have been migrated to the "new" single table format that is never dropped will end up getting backfilled.
// - tables that we still drop, dont need to be pruned since they'll get recreated.
tablesToPrune := []string{
"combined_rewards",
"default_operator_split_snapshots",
"operator_avs_registration_snapshots",
"operator_avs_split_snapshots",
"operator_avs_strategy_snapshots",
"operator_directed_rewards",
"operator_pi_split_snapshots",
"operator_share_snapshots",
"staker_delegation_snapshots",
"staker_share_snapshots",
}
for _, tableName := range tablesToPrune {
var query string

// if we have no lower bound, that means we're deleting everything
if lowerBoundSnapshot == nil {
query = fmt.Sprintf(`truncate table %s cascade`, tableName)
} else {
query = fmt.Sprintf(`delete from %s where snapshot >= '%s'`, tableName, lowerBoundSnapshot.SnapshotDate)
}

// purge from gold table
if lowerBoundSnapshot != nil {
rc.logger.Sugar().Infow("Purging rewards from gold table where snapshot >=", "snapshotDate", lowerBoundSnapshot.SnapshotDate)
res = rc.grm.Exec(`delete from gold_table where snapshot >= @snapshotDate`, sql.Named("snapshotDate", lowerBoundSnapshot.SnapshotDate))
} else {
// if the lower bound is nil, ther we're deleting everything
rc.logger.Sugar().Infow("Purging all rewards from gold table")
res = rc.grm.Exec(`delete from gold_table`)
res = rc.grm.Exec(query)
if res.Error != nil {
rc.logger.Sugar().Errorw("Failed to prune snapshot table", "tableName", tableName, "error", res.Error)
return res.Error
}
}

// Purge rewards from gold_table based on reward_hash.
// Since reward submissions are submitted at a block, but can be arbitrarily backwards looking,
// we cant just delete by snapshot date. We have to get all reward snapshots that were created by
// a specific reward hash that we're deleting.
goldPurgeQuery := `
delete from gold_table where reward_hash in @rewardHashes
`
res = rc.grm.Exec(goldPurgeQuery, sql.Named("rewardHashes", rewardHashesToPrune))
if res.Error != nil {
rc.logger.Sugar().Errorw("Failed to delete rewards from gold table by reward_hash", "error", res.Error)
return res.Error
}

if res.Error != nil {
Expand All @@ -510,7 +577,7 @@ func (rc *RewardsCalculator) DeleteCorruptedRewardsFromBlockHeight(blockHeight u
}
if lowerBoundSnapshot != nil {
rc.logger.Sugar().Infow("Deleted rewards from gold table",
zap.String("snapshotDate", lowerBoundSnapshot.SnapshotDate),
zap.Int("rewardHashCount", len(rewardHashesToPrune)),
zap.Int64("recordsDeleted", res.RowsAffected),
)
} else {
Expand Down