Skip to content

Commit 9bd93aa

Browse files
authored
Batch deadline setting in SectorMetadata (#177)
1 parent e419bcf commit 9bd93aa

File tree

1 file changed

+62
-4
lines changed

1 file changed

+62
-4
lines changed

tasks/metadata/task_sector_expirations.go

Lines changed: 62 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"time"
66

77
cbor "github.com/ipfs/go-ipld-cbor"
8+
logging "github.com/ipfs/go-log/v2"
9+
"github.com/yugabyte/pgx/v5"
810
"golang.org/x/xerrors"
911

1012
"github.com/filecoin-project/go-address"
@@ -20,6 +22,8 @@ import (
2022
"github.com/filecoin-project/lotus/chain/types"
2123
)
2224

25+
var log = logging.Logger("metadata")
26+
2327
const SectorMetadataRefreshInterval = 191 * time.Minute
2428

2529
type SectorMetadataNodeAPI interface {
@@ -62,6 +66,47 @@ func (s *SectorMetadata) Do(taskID harmonytask.TaskID, stillOwned func() bool) (
6266
astor := adt.WrapStore(ctx, cbor.NewCborStore(s.bstore))
6367
minerStates := map[abi.ActorID]miner.State{}
6468

69+
type partitionUpdate struct {
70+
SpID uint64
71+
SectorNum uint64
72+
Partition uint64
73+
Deadline uint64
74+
}
75+
76+
const batchSize = 1000
77+
updateBatch := make([]partitionUpdate, 0, batchSize)
78+
total := 0
79+
80+
flushBatch := func() error {
81+
if len(updateBatch) == 0 {
82+
return nil
83+
}
84+
85+
total += len(updateBatch)
86+
log.Infow("updating sector partitions", "total", total)
87+
88+
_, err := s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
89+
batch := &pgx.Batch{}
90+
for _, update := range updateBatch {
91+
batch.Queue("UPDATE sectors_meta SET partition = $1, deadline = $2 WHERE sp_id = $3 AND sector_num = $4",
92+
update.Partition, update.Deadline, update.SpID, update.SectorNum)
93+
}
94+
95+
br := tx.SendBatch(ctx, batch)
96+
defer br.Close()
97+
98+
for i := 0; i < batch.Len(); i++ {
99+
_, err := br.Exec()
100+
if err != nil {
101+
return false, xerrors.Errorf("executing batch update %d: %w", i, err)
102+
}
103+
}
104+
105+
return true, nil
106+
}, harmonydb.OptionRetry())
107+
return err
108+
}
109+
65110
for _, sector := range sectors {
66111
maddr, err := address.NewIDAddress(sector.SpID)
67112
if err != nil {
@@ -70,7 +115,6 @@ func (s *SectorMetadata) Do(taskID harmonytask.TaskID, stillOwned func() bool) (
70115

71116
mstate, ok := minerStates[abi.ActorID(sector.SpID)]
72117
if !ok {
73-
74118
act, err := s.api.StateGetActor(ctx, maddr, types.EmptyTSK)
75119
if err != nil {
76120
return false, xerrors.Errorf("getting miner actor: %w", err)
@@ -106,14 +150,28 @@ func (s *SectorMetadata) Do(taskID harmonytask.TaskID, stillOwned func() bool) (
106150
}
107151

108152
if loc != nil {
109-
_, err := s.db.Exec(ctx, "update sectors_meta set partition = $1, deadline = $2 where sp_id = $3 and sector_num = $4", loc.Partition, loc.Deadline, sector.SpID, sector.SectorNum)
110-
if err != nil {
111-
return false, xerrors.Errorf("updating sector partition: %w", err)
153+
updateBatch = append(updateBatch, partitionUpdate{
154+
SpID: sector.SpID,
155+
SectorNum: sector.SectorNum,
156+
Partition: loc.Partition,
157+
Deadline: loc.Deadline,
158+
})
159+
160+
if len(updateBatch) >= batchSize {
161+
if err := flushBatch(); err != nil {
162+
return false, xerrors.Errorf("flushing batch: %w", err)
163+
}
164+
updateBatch = updateBatch[:0]
112165
}
113166
}
114167
}
115168
}
116169

170+
// Flush any remaining updates
171+
if err := flushBatch(); err != nil {
172+
return false, xerrors.Errorf("flushing final batch: %w", err)
173+
}
174+
117175
return true, nil
118176
}
119177

0 commit comments

Comments
 (0)