Skip to content

Commit bd4abc6

Browse files
authored
fix(staking): stake amount history (#46)
* fix(staking): stake amount history (#45) * fix(staking): stake amount history api * fix(staking): stake amount history api * fix(staking): stake amount history api * fix(staking): stake amount history api
1 parent 71db949 commit bd4abc6

File tree

7 files changed

+367
-5
lines changed

7 files changed

+367
-5
lines changed

db/cl_staking_event.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
package db
22

3-
import "gorm.io/gorm"
3+
import (
4+
"errors"
5+
"time"
6+
7+
"gorm.io/gorm"
8+
)
49

510
type CLStakingEvent struct {
611
ID uint64 `gorm:"primarykey"`
@@ -12,6 +17,11 @@ type CLStakingEvent struct {
1217
Amount string `gorm:"not null;column:amount"`
1318
}
1419

20+
type CLSuccessfulStakingEvent struct {
21+
CLStakingEvent
22+
BlockTime time.Time `gorm:"not null;column:block_time"`
23+
}
24+
1525
func (CLStakingEvent) TableName() string {
1626
return "cl_staking_events"
1727
}
@@ -25,3 +35,22 @@ func BatchCreateCLStakingEvents(db *gorm.DB, indexer string, events []*CLStaking
2535
return UpdateIndexPoint(tx, indexer, height)
2636
})
2737
}
38+
39+
func GetSuccessfulCLStakingEventsAfter(db *gorm.DB, eventTypes []string, blockHeight int64) ([]*CLSuccessfulStakingEvent, error) {
40+
var events []*CLSuccessfulStakingEvent
41+
42+
if err := db.
43+
Table("cl_staking_events AS e").
44+
Joins("JOIN cl_blocks AS b ON e.block_height = b.height").
45+
Select("e.*, b.time AS block_time").
46+
Where("e.event_type IN (?)", eventTypes).
47+
Where("e.status_ok = ?", true).
48+
Where("e.block_height > ?", blockHeight).
49+
Scan(&events).Error; errors.Is(err, gorm.ErrRecordNotFound) {
50+
return events, nil
51+
} else if err != nil {
52+
return nil, err
53+
}
54+
55+
return events, nil
56+
}

db/cl_total_stake_hist.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package db
2+
3+
import (
4+
"gorm.io/gorm"
5+
"gorm.io/gorm/clause"
6+
)
7+
8+
type CLTotalStakeHist struct {
9+
ID uint64 `gorm:"primarykey"`
10+
11+
TotalStakeAmount int64 `gorm:"not null;column:total_stake_amount"`
12+
UpdatedAtBlock int64 `gorm:"not null;column:updated_at_block;index:idx_cl_total_stake_hist_updated_at_block,unique"`
13+
UpdatedAtTime int64 `gorm:"not null;column:updated_at_time;index:idx_cl_total_stake_hist_updated_at_time,unique"`
14+
}
15+
16+
func (CLTotalStakeHist) TableName() string {
17+
return "cl_total_stake_hists"
18+
}
19+
20+
func GetLatestCLTotalStakeHist(db *gorm.DB) (*CLTotalStakeHist, error) {
21+
var stake CLTotalStakeHist
22+
23+
if err := db.Order("updated_at_time DESC").First(&stake).Error; err != nil {
24+
return nil, err
25+
}
26+
27+
return &stake, nil
28+
}
29+
30+
func GetCLTotalStakeHists(db *gorm.DB) ([]*CLTotalStakeHist, error) {
31+
var stakes []*CLTotalStakeHist
32+
33+
if err := db.Order("updated_at_time ASC").Find(&stakes).Error; err != nil {
34+
return nil, err
35+
}
36+
37+
return stakes, nil
38+
}
39+
40+
func GetLatestCLTotalStakeHistBefore(db *gorm.DB, timestamp int64) (*CLTotalStakeHist, error) {
41+
var stake CLTotalStakeHist
42+
43+
if err := db.Where("updated_at_time <= ?", timestamp).Order("updated_at_time DESC").First(&stake).Error; err != nil {
44+
return nil, err
45+
}
46+
47+
return &stake, nil
48+
}
49+
50+
func GetCLTotalStakeHistsAfter(db *gorm.DB, timestamp int64) ([]*CLTotalStakeHist, error) {
51+
var stakes []*CLTotalStakeHist
52+
53+
if err := db.Where("updated_at_time > ?", timestamp).Order("updated_at_time ASC").Find(&stakes).Error; err != nil {
54+
return nil, err
55+
}
56+
57+
return stakes, nil
58+
}
59+
60+
func UpsertCLGenesisTotalStakeHist(db *gorm.DB, indexer string, stake *CLTotalStakeHist) error {
61+
return db.Transaction(func(tx *gorm.DB) error {
62+
if err := tx.Clauses(clause.OnConflict{
63+
Columns: []clause.Column{{Name: "updated_at_time"}},
64+
DoNothing: true,
65+
}).Create(stake).Error; err != nil {
66+
return err
67+
}
68+
69+
return nil
70+
})
71+
}
72+
73+
func BatchUpsertCLTotalStakeHists(db *gorm.DB, indexer string, stakes []*CLTotalStakeHist) error {
74+
return db.Transaction(func(tx *gorm.DB) error {
75+
if err := tx.Clauses(clause.OnConflict{
76+
Columns: []clause.Column{{Name: "updated_at_time"}},
77+
DoUpdates: clause.Assignments(map[string]interface{}{
78+
"total_stake_amount": gorm.Expr("excluded.total_stake_amount"),
79+
}),
80+
}).CreateInBatches(stakes, 100).Error; err != nil {
81+
return err
82+
}
83+
84+
return nil
85+
})
86+
}

pkg/indexer/cl_staking_event.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ func (c *CLStakingEventIndexer) getStakingEvents(from, to int64) ([]*db.CLStakin
203203
return nil, fmt.Errorf("event %s: sender address not found", eventType)
204204
}
205205

206-
if strings.ToLower(delAddr) != strings.ToLower(senderAddr) {
206+
if !strings.EqualFold(delAddr, senderAddr) {
207207
eventType = EventType2Behalf[eventType]
208208
}
209209
case TypeUnjail:
@@ -225,7 +225,7 @@ func (c *CLStakingEventIndexer) getStakingEvents(from, to int64) ([]*db.CLStakin
225225
return nil, fmt.Errorf("convert validator compressed key to address from event %s failed: %w", eventType, err)
226226
}
227227

228-
if strings.ToLower(valAddr.String()) != strings.ToLower(senderAddr) {
228+
if !strings.EqualFold(valAddr.String(), senderAddr) {
229229
eventType = EventType2Behalf[eventType]
230230
}
231231
}

pkg/indexer/cl_total_stake.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ const (
2121
genesisTime = "2025-01-19T15:00:00.00000000Z"
2222
timeLayout = "2006-01-02T15:04:05.999999999Z"
2323

24-
clUnbondingPeriod = 1209600 * time.Second // 14 days
25-
2624
stakeEventType = "delegate_success"
2725
unstakeEventType = "undelegate_success"
2826
)

pkg/indexer/cl_total_stake_hist.go

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
package indexer
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sort"
7+
"strconv"
8+
"time"
9+
10+
"github.com/rs/zerolog/log"
11+
"gorm.io/gorm"
12+
13+
"github.com/piplabs/story-staking-api/db"
14+
)
15+
16+
const (
17+
genesisBlockHeight = 1
18+
)
19+
20+
var _ Indexer = (*CLTotalStakeHistIndexer)(nil)
21+
22+
type CLTotalStakeHistIndexer struct {
23+
ctx context.Context
24+
25+
dbOperator *gorm.DB
26+
}
27+
28+
func NewCLTotalStakeHistIndexer(ctx context.Context, dbOperator *gorm.DB) (*CLTotalStakeHistIndexer, error) {
29+
c := &CLTotalStakeHistIndexer{
30+
ctx: ctx,
31+
32+
dbOperator: dbOperator,
33+
}
34+
35+
if err := c.init(); err != nil {
36+
return nil, fmt.Errorf("init indexer %s failed: %w", c.Name(), err)
37+
}
38+
39+
return c, nil
40+
}
41+
42+
func (c *CLTotalStakeHistIndexer) Name() string {
43+
return "cl_total_stake_hist"
44+
}
45+
46+
func (c *CLTotalStakeHistIndexer) Run() {
47+
log.Info().Str("indexer", c.Name()).Msg("Start indexing")
48+
49+
ticker := time.NewTicker(10 * time.Second)
50+
defer ticker.Stop()
51+
52+
for {
53+
select {
54+
case <-c.ctx.Done():
55+
return
56+
case <-ticker.C:
57+
if err := c.index(); err != nil {
58+
log.Error().Err(err).Str("indexer", c.Name()).Msg("index cl total stake hist failed")
59+
}
60+
}
61+
}
62+
}
63+
64+
func (c *CLTotalStakeHistIndexer) init() error {
65+
gt, err := time.Parse(timeLayout, genesisTime)
66+
if err != nil {
67+
return fmt.Errorf("parse genesis time failed: %w", err)
68+
}
69+
70+
if err := db.UpsertCLGenesisTotalStakeHist(c.dbOperator, c.Name(), &db.CLTotalStakeHist{
71+
TotalStakeAmount: genesisStakeAmount,
72+
UpdatedAtBlock: genesisBlockHeight,
73+
UpdatedAtTime: gt.Unix(),
74+
}); err != nil {
75+
return fmt.Errorf("upsert genesis cl total stake failed: %w", err)
76+
}
77+
78+
return nil
79+
}
80+
81+
func (c *CLTotalStakeHistIndexer) index() error {
82+
latestHist, err := db.GetLatestCLTotalStakeHist(c.dbOperator)
83+
if err != nil {
84+
return fmt.Errorf("get latest cl total stake hist failed: %w", err)
85+
}
86+
87+
events, err := db.GetSuccessfulCLStakingEventsAfter(c.dbOperator, []string{TypeStake, TypeUnstake}, latestHist.UpdatedAtBlock)
88+
if err != nil {
89+
return fmt.Errorf("get successful cl staking events failed: %w", err)
90+
} else if len(events) == 0 {
91+
return nil
92+
}
93+
94+
blk2StakeChange, blk2BlockTime := make(map[int64]int64), make(map[int64]int64)
95+
for _, event := range events {
96+
var amount int64
97+
if event.Amount == "" {
98+
// special bypass for aeneid
99+
amount = 0
100+
} else {
101+
amt, err := strconv.ParseInt(event.Amount, 10, 64)
102+
if err != nil {
103+
return fmt.Errorf("parse amount %s failed: %w", event.Amount, err)
104+
}
105+
amount = amt
106+
}
107+
108+
switch event.EventType {
109+
case TypeStake:
110+
blk2StakeChange[event.BlockHeight] += amount
111+
case TypeUnstake:
112+
blk2StakeChange[event.BlockHeight] -= amount
113+
}
114+
115+
blk2BlockTime[event.BlockHeight] = event.BlockTime.Unix()
116+
}
117+
118+
type stakeChange struct {
119+
BlockHeight int64
120+
BlockTime int64
121+
StakeChangeAmount int64
122+
}
123+
124+
scs := make([]*stakeChange, 0)
125+
for blkno, amt := range blk2StakeChange {
126+
scs = append(scs, &stakeChange{
127+
BlockHeight: blkno,
128+
BlockTime: blk2BlockTime[blkno],
129+
StakeChangeAmount: amt,
130+
})
131+
}
132+
133+
sort.Slice(scs, func(i, j int) bool {
134+
return scs[i].BlockHeight < scs[j].BlockHeight
135+
})
136+
137+
totalStakeAmount := latestHist.TotalStakeAmount
138+
clTotalStakeHists := make([]*db.CLTotalStakeHist, 0)
139+
for _, sc := range scs {
140+
totalStakeAmount += sc.StakeChangeAmount
141+
clTotalStakeHists = append(clTotalStakeHists, &db.CLTotalStakeHist{
142+
TotalStakeAmount: totalStakeAmount,
143+
UpdatedAtBlock: sc.BlockHeight,
144+
UpdatedAtTime: sc.BlockTime,
145+
})
146+
}
147+
148+
return db.BatchUpsertCLTotalStakeHists(c.dbOperator, c.Name(), clTotalStakeHists)
149+
}

0 commit comments

Comments
 (0)