Skip to content

Commit d7d9a32

Browse files
authored
Use transactions in sector migrations (#169)
1 parent 5e26663 commit d7d9a32

File tree

1 file changed

+83
-73
lines changed

1 file changed

+83
-73
lines changed

cmd/curio/guidedsetup/shared.go

Lines changed: 83 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -361,13 +361,17 @@ func MigrateSectors(ctx context.Context, maddr address.Address, mmeta datastore.
361361
}
362362
}
363363

364-
for _, sectr := range sectors {
364+
for i, sectr := range sectors {
365365
if !migratableState(sectr.State) || sectr.State == sector.Removed {
366366
continue
367367
}
368+
if i > 0 && i%1000 == 0 {
369+
fmt.Printf("Migration: %d / %d (%0.2f%%)\n", i, len(sectors), float64(i)/float64(len(sectors))*100)
370+
}
368371

369-
// Insert sector metadata
370-
_, err := db.Exec(ctx, `
372+
_, err := db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) {
373+
// Insert sector metadata
374+
_, err := tx.Exec(`
371375
INSERT INTO sectors_meta (sp_id, sector_num, reg_seal_proof, ticket_epoch, ticket_value,
372376
orig_sealed_cid, orig_unsealed_cid, cur_sealed_cid, cur_unsealed_cid,
373377
msg_cid_precommit, msg_cid_commit, msg_cid_update, seed_epoch, seed_value)
@@ -377,65 +381,65 @@ func MigrateSectors(ctx context.Context, maddr address.Address, mmeta datastore.
377381
orig_sealed_cid = excluded.orig_sealed_cid, orig_unsealed_cid = excluded.orig_unsealed_cid, cur_sealed_cid = excluded.cur_sealed_cid,
378382
cur_unsealed_cid = excluded.cur_unsealed_cid, msg_cid_precommit = excluded.msg_cid_precommit, msg_cid_commit = excluded.msg_cid_commit,
379383
msg_cid_update = excluded.msg_cid_update, seed_epoch = excluded.seed_epoch, seed_value = excluded.seed_value`,
380-
mid,
381-
sectr.SectorNumber,
382-
sectr.SectorType,
383-
sectr.TicketEpoch,
384-
sectr.TicketValue,
385-
cidPtrToStrptr(sectr.CommR),
386-
cidPtrToStrptr(sectr.CommD),
387-
cidPtrToStrptr(coalescePtrs(sectr.UpdateSealed, sectr.CommR)),
388-
cidPtrToStrptr(coalescePtrs(sectr.UpdateUnsealed, sectr.CommD)),
389-
cidPtrToStrptr(sectr.PreCommitMessage),
390-
cidPtrToStrptr(sectr.CommitMessage),
391-
cidPtrToStrptr(sectr.ReplicaUpdateMessage),
392-
sectr.SeedEpoch,
393-
sectr.SeedValue,
394-
)
395-
if err != nil {
396-
b, _ := json.MarshalIndent(sectr, "", " ")
397-
fmt.Println(string(b))
384+
mid,
385+
sectr.SectorNumber,
386+
sectr.SectorType,
387+
sectr.TicketEpoch,
388+
sectr.TicketValue,
389+
cidPtrToStrptr(sectr.CommR),
390+
cidPtrToStrptr(sectr.CommD),
391+
cidPtrToStrptr(coalescePtrs(sectr.UpdateSealed, sectr.CommR)),
392+
cidPtrToStrptr(coalescePtrs(sectr.UpdateUnsealed, sectr.CommD)),
393+
cidPtrToStrptr(sectr.PreCommitMessage),
394+
cidPtrToStrptr(sectr.CommitMessage),
395+
cidPtrToStrptr(sectr.ReplicaUpdateMessage),
396+
sectr.SeedEpoch,
397+
sectr.SeedValue,
398+
)
399+
if err != nil {
400+
b, _ := json.MarshalIndent(sectr, "", " ")
401+
fmt.Println(string(b))
398402

399-
return xerrors.Errorf("inserting/updating sectors_meta for sector %d: %w", sectr.SectorNumber, err)
400-
}
403+
return false, xerrors.Errorf("inserting/updating sectors_meta for sector %d: %w", sectr.SectorNumber, err)
404+
}
401405

402-
// Process each piece within the sector
403-
for j, piece := range sectr.Pieces {
404-
dealID := int64(0)
405-
startEpoch := int64(0)
406-
endEpoch := int64(0)
407-
var pamJSON *string
408-
var dealProposalJSONStr *string
409-
410-
if piece.HasDealInfo() {
411-
dealInfo := piece.DealInfo()
412-
if dealInfo.Impl().DealProposal != nil {
413-
dealID = int64(dealInfo.Impl().DealID)
414-
}
406+
// Process each piece within the sector
407+
for j, piece := range sectr.Pieces {
408+
dealID := int64(0)
409+
startEpoch := int64(0)
410+
endEpoch := int64(0)
411+
var pamJSON *string
412+
var dealProposalJSONStr *string
413+
414+
if piece.HasDealInfo() {
415+
dealInfo := piece.DealInfo()
416+
if dealInfo.Impl().DealProposal != nil {
417+
dealID = int64(dealInfo.Impl().DealID)
418+
}
415419

416-
startEpoch = int64(must.One(dealInfo.StartEpoch()))
417-
endEpoch = int64(must.One(dealInfo.EndEpoch()))
418-
if piece.Impl().PieceActivationManifest != nil {
419-
pam, err := json.Marshal(piece.Impl().PieceActivationManifest)
420-
if err != nil {
421-
return xerrors.Errorf("error marshalling JSON for piece %d in sector %d: %w", j, sectr.SectorNumber, err)
420+
startEpoch = int64(must.One(dealInfo.StartEpoch()))
421+
endEpoch = int64(must.One(dealInfo.EndEpoch()))
422+
if piece.Impl().PieceActivationManifest != nil {
423+
pam, err := json.Marshal(piece.Impl().PieceActivationManifest)
424+
if err != nil {
425+
return false, xerrors.Errorf("error marshalling JSON for piece %d in sector %d: %w", j, sectr.SectorNumber, err)
426+
}
427+
ps := string(pam)
428+
pamJSON = &ps
422429
}
423-
ps := string(pam)
424-
pamJSON = &ps
425-
}
426-
if piece.Impl().DealProposal != nil {
427-
dealProposalJSON, err := json.Marshal(piece.Impl().DealProposal)
428-
if err != nil {
429-
return xerrors.Errorf("error marshalling deal proposal JSON for piece %d in sector %d: %w", j, sectr.SectorNumber, err)
430+
if piece.Impl().DealProposal != nil {
431+
dealProposalJSON, err := json.Marshal(piece.Impl().DealProposal)
432+
if err != nil {
433+
return false, xerrors.Errorf("error marshalling deal proposal JSON for piece %d in sector %d: %w", j, sectr.SectorNumber, err)
434+
}
435+
dp := string(dealProposalJSON)
436+
dealProposalJSONStr = &dp
430437
}
431-
dp := string(dealProposalJSON)
432-
dealProposalJSONStr = &dp
433-
}
434438

435-
}
439+
}
436440

437-
// Splitting the SQL statement for readability and adding new fields
438-
_, err = db.Exec(ctx, `
441+
// Splitting the SQL statement for readability and adding new fields
442+
_, err = tx.Exec(`
439443
INSERT INTO sectors_meta_pieces (
440444
sp_id, sector_num, piece_num, piece_cid, piece_size,
441445
requested_keep_data, raw_data_size, start_epoch, orig_end_epoch,
@@ -452,25 +456,31 @@ func MigrateSectors(ctx context.Context, maddr address.Address, mmeta datastore.
452456
f05_deal_id = excluded.f05_deal_id,
453457
ddo_pam = excluded.ddo_pam,
454458
f05_deal_proposal = excluded.f05_deal_proposal`,
455-
mid,
456-
sectr.SectorNumber,
457-
j,
458-
piece.PieceCID(),
459-
piece.Piece().Size,
460-
piece.HasDealInfo(),
461-
nil, // raw_data_size might be calculated based on the piece size, or retrieved if available
462-
startEpoch,
463-
endEpoch,
464-
dealID,
465-
pamJSON,
466-
dealProposalJSONStr,
467-
)
468-
if err != nil {
469-
b, _ := json.MarshalIndent(sectr, "", " ")
470-
fmt.Println(string(b))
471-
472-
return xerrors.Errorf("inserting/updating sector_meta_pieces for sector %d, piece %d: %w", sectr.SectorNumber, j, err)
459+
mid,
460+
sectr.SectorNumber,
461+
j,
462+
piece.PieceCID(),
463+
piece.Piece().Size,
464+
piece.HasDealInfo(),
465+
nil, // raw_data_size might be calculated based on the piece size, or retrieved if available
466+
startEpoch,
467+
endEpoch,
468+
dealID,
469+
pamJSON,
470+
dealProposalJSONStr,
471+
)
472+
if err != nil {
473+
b, _ := json.MarshalIndent(sectr, "", " ")
474+
fmt.Println(string(b))
475+
476+
return false, xerrors.Errorf("inserting/updating sector_meta_pieces for sector %d, piece %d: %w", sectr.SectorNumber, j, err)
477+
}
473478
}
479+
480+
return true, nil
481+
}, harmonydb.OptionRetry())
482+
if err != nil {
483+
return xerrors.Errorf("processing sector %d: %w", sectr.SectorNumber, err)
474484
}
475485
}
476486

0 commit comments

Comments
 (0)