Skip to content

Commit 0bb5d4d

Browse files
authored
fix: add debug logs for pdcleaner (#1910)
* add debug logs * delete offset in batches * fix log location * refactor index provider * clean piece tracker table * reduce rpc log level
1 parent 49b2de4 commit 0bb5d4d

File tree

10 files changed

+145
-14
lines changed

10 files changed

+145
-14
lines changed

extern/boostd-data/client/client.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type Store struct {
4040
UnflagPiece func(ctx context.Context, pieceCid cid.Cid, maddr address.Address) error
4141
FlaggedPiecesList func(ctx context.Context, filter *types.FlaggedPiecesListFilter, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error)
4242
FlaggedPiecesCount func(ctx context.Context, filter *types.FlaggedPiecesListFilter) (int, error)
43+
UntrackPiece func(ctx context.Context, pieceCid cid.Cid, maddr address.Address) error
4344
}
4445
closer jsonrpc.ClientCloser
4546
dialOpts []jsonrpc.Option
@@ -197,3 +198,7 @@ func (s *Store) FlaggedPiecesList(ctx context.Context, filter *types.FlaggedPiec
197198
func (s *Store) FlaggedPiecesCount(ctx context.Context, filter *types.FlaggedPiecesListFilter) (int, error) {
198199
return s.client.FlaggedPiecesCount(ctx, filter)
199200
}
201+
202+
func (s *Store) UntrackPiece(ctx context.Context, pieceCid cid.Cid, maddr address.Address) error {
203+
return s.client.UntrackPiece(ctx, pieceCid, maddr)
204+
}

extern/boostd-data/cmd/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ func main() {
3838
func before(cctx *cli.Context) error {
3939
_ = logging.SetLogLevel("boostd-data", "INFO")
4040
_ = logging.SetLogLevel("boostd-data-yb", "INFO")
41+
_ = logging.SetLogLevel("rpc", "ERROR")
4142

4243
if cliutil.IsVeryVerbose {
4344
_ = logging.SetLogLevel("boostd-data", "DEBUG")

extern/boostd-data/ldb/service.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -829,3 +829,26 @@ func (s *Store) RemoveIndexes(ctx context.Context, pieceCid cid.Cid) error {
829829

830830
return err
831831
}
832+
833+
func (s *Store) UntrackPiece(ctx context.Context, pieceCid cid.Cid, maddr address.Address) error {
834+
log.Debugw("handle.untack-piece")
835+
836+
ctx, span := tracing.Tracer.Start(ctx, "store.untrack_pieces")
837+
defer span.End()
838+
839+
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Endpoint, "ldb.untrack_pieces"))
840+
stop := metrics.Timer(ctx, metrics.APIRequestDuration)
841+
defer stop()
842+
843+
defer func(now time.Time) {
844+
log.Debugw("handled.untack-piece", "took", time.Since(now).String())
845+
}(time.Now())
846+
847+
stats.Record(s.ctx, metrics.FailureUntrackPieceCount.M(1))
848+
849+
// LEVELDB does not have a separate PieceTracker table
850+
// All pieces to be checked are picked from the main table
851+
// so there is no need to delete anything
852+
return nil
853+
854+
}

extern/boostd-data/metrics/metrics.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ var (
5151
SuccessUnflagPieceCount = stats.Int64("success_unflag_piece_count", "Counter of unflag piece success", stats.UnitDimensionless)
5252
SuccessFlaggedPiecesListCount = stats.Int64("success_flagged_pieces_list_count", "Counter of flagged pieces list success", stats.UnitDimensionless)
5353
SuccessFlaggedPiecesCountCount = stats.Int64("success_flagged_pieces_count_count", "Counter of flagged pieces count success", stats.UnitDimensionless)
54+
SuccessUntrackPieceCount = stats.Int64("success_untrack_piece", "Counter of untrack piece success", stats.UnitDimensionless)
5455
FailureAddDealForPieceCount = stats.Int64("failure_add_deal_for_piece_count", "Counter of add deal failure", stats.UnitDimensionless)
5556
FailureAddIndexCount = stats.Int64("failure_add_index_count", "Counter of add index failure", stats.UnitDimensionless)
5657
FailureIsIndexedCount = stats.Int64("failure_is_indexed_count", "Counter of is indexed failure", stats.UnitDimensionless)
@@ -72,6 +73,7 @@ var (
7273
FailureUnflagPieceCount = stats.Int64("failure_unflag_piece_count", "Counter of unflag piece failure", stats.UnitDimensionless)
7374
FailureFlaggedPiecesListCount = stats.Int64("failure_flagged_pieces_list_count", "Counter of flagged pieces list failure", stats.UnitDimensionless)
7475
FailureFlaggedPiecesCountCount = stats.Int64("failure_flagged_pieces_count_count", "Counter of flagged pieces count failure", stats.UnitDimensionless)
76+
FailureUntrackPieceCount = stats.Int64("failure_untrack_piece", "Counter of untrack piece failure", stats.UnitDimensionless)
7577
)
7678

7779
var (
@@ -110,6 +112,7 @@ var (
110112
SuccessUnflagPieceCountView = &view.View{Measure: SuccessUnflagPieceCount, Aggregation: view.Count()}
111113
SuccessFlaggedPiecesListCountView = &view.View{Measure: SuccessFlaggedPiecesListCount, Aggregation: view.Count()}
112114
SuccessFlaggedPiecesCountCountView = &view.View{Measure: SuccessFlaggedPiecesCountCount, Aggregation: view.Count()}
115+
SuccessUntrackPieceCountView = &view.View{Measure: SuccessUntrackPieceCount, Aggregation: view.Count()}
113116
FailureAddDealForPieceCountView = &view.View{Measure: FailureAddDealForPieceCount, Aggregation: view.Count()}
114117
FailureAddIndexCountView = &view.View{Measure: FailureAddIndexCount, Aggregation: view.Count()}
115118
FailureIsIndexedCountView = &view.View{Measure: FailureIsIndexedCount, Aggregation: view.Count()}
@@ -131,6 +134,7 @@ var (
131134
FailureUnflagPieceCountView = &view.View{Measure: FailureUnflagPieceCount, Aggregation: view.Count()}
132135
FailureFlaggedPiecesListCountView = &view.View{Measure: FailureFlaggedPiecesListCount, Aggregation: view.Count()}
133136
FailureFlaggedPiecesCountCountView = &view.View{Measure: FailureFlaggedPiecesCountCount, Aggregation: view.Count()}
137+
FailureUntrackPieceCountView = &view.View{Measure: FailureUntrackPieceCount, Aggregation: view.Count()}
134138
)
135139

136140
// DefaultViews is an array of OpenCensus views for metric gathering purposes
@@ -159,6 +163,7 @@ var DefaultViews = func() []*view.View {
159163
SuccessUnflagPieceCountView,
160164
SuccessFlaggedPiecesListCountView,
161165
SuccessFlaggedPiecesCountCountView,
166+
SuccessUntrackPieceCountView,
162167
FailureAddDealForPieceCountView,
163168
FailureAddIndexCountView,
164169
FailureIsIndexedCountView,
@@ -180,6 +185,7 @@ var DefaultViews = func() []*view.View {
180185
FailureUnflagPieceCountView,
181186
FailureFlaggedPiecesListCountView,
182187
FailureFlaggedPiecesCountCountView,
188+
FailureUntrackPieceCountView,
183189
}
184190
//views = append(views, blockstore.DefaultViews...)
185191
views = append(views, rpcmetrics.DefaultViews...)

extern/boostd-data/yugabyte/piecedoctor.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -527,3 +527,30 @@ func (s *Store) FlaggedPiecesCount(ctx context.Context, filter *types.FlaggedPie
527527
failureMetrics = false
528528
return count, nil
529529
}
530+
531+
func (s *Store) UntrackPiece(ctx context.Context, pieceCid cid.Cid, maddr address.Address) error {
532+
ctx, span := tracing.Tracer.Start(ctx, "store.untrack_piece")
533+
span.SetAttributes(attribute.String("pieceCid", pieceCid.String()))
534+
defer span.End()
535+
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Endpoint, "yb.untrack_piece"))
536+
stop := metrics.Timer(ctx, metrics.APIRequestDuration)
537+
defer stop()
538+
failureMetrics := true
539+
defer func() {
540+
if failureMetrics {
541+
stats.Record(s.ctx, metrics.FailureUntrackPieceCount.M(1))
542+
} else {
543+
stats.Record(s.ctx, metrics.SuccessUntrackPieceCount.M(1))
544+
}
545+
}()
546+
547+
qry := `DELETE FROM PieceTracker WHERE MinerAddr = $1 AND PieceCid = $2`
548+
_, err := s.db.Exec(ctx, qry, maddr.String(), pieceCid.String())
549+
if err != nil {
550+
return fmt.Errorf("untracking piece %s %s: %w", maddr, pieceCid, err)
551+
}
552+
553+
failureMetrics = false
554+
return nil
555+
556+
}

extern/boostd-data/yugabyte/service.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -894,7 +894,7 @@ func (s *Store) RemoveIndexes(ctx context.Context, pieceCid cid.Cid) error {
894894
return fmt.Errorf("removing indexes for piece %s: getting recs: %w", pieceCid, err)
895895
}
896896

897-
// Delete from multihash -> piece cids index
897+
// Delete from multihash -> piece cids index and PieceBlockOffsetSize
898898
var eg errgroup.Group
899899
for i := 0; i < s.settings.PayloadPiecesParallelism; i++ {
900900
eg.Go(func() error {
@@ -914,6 +914,11 @@ func (s *Store) RemoveIndexes(ctx context.Context, pieceCid cid.Cid) error {
914914
if err != nil {
915915
return fmt.Errorf("deleting from PayloadToPieces: %w", err)
916916
}
917+
q = `DELETE FROM PieceBlockOffsetSize WHERE PayloadMultihash = ? AND PieceCid = ?`
918+
err = s.session.Query(q, multihashBytes, pieceCid.Bytes()).Exec()
919+
if err != nil {
920+
return fmt.Errorf("deleting from PieceBlockOffsetSize: %w", err)
921+
}
917922
}
918923
}
919924

@@ -925,13 +930,6 @@ func (s *Store) RemoveIndexes(ctx context.Context, pieceCid cid.Cid) error {
925930
return err
926931
}
927932

928-
// Delete from piece offsets index
929-
qry := `DELETE FROM PieceBlockOffsetSize WHERE PieceCid = ?`
930-
err = s.session.Query(qry, pieceCid.Bytes()).WithContext(ctx).Exec()
931-
if err != nil {
932-
return fmt.Errorf("removing indexes for piece %s: deleting offset / size info: %w", pieceCid, err)
933-
}
934-
935933
failureMetrics = false
936934
return nil
937935
}

indexprovider/wrapper.go

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,15 @@ import (
1010

1111
"github.com/filecoin-project/boost/lib/legacy"
1212
"github.com/filecoin-project/boost/storagemarket/types/legacytypes"
13+
"github.com/filecoin-project/go-bitfield"
1314
"github.com/filecoin-project/go-statemachine/fsm"
15+
"github.com/filecoin-project/lotus/api/v1api"
16+
"github.com/filecoin-project/lotus/blockstore"
17+
"github.com/filecoin-project/lotus/chain/actors/adt"
18+
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
19+
chainTypes "github.com/filecoin-project/lotus/chain/types"
1420
"github.com/google/uuid"
21+
cbor "github.com/ipfs/go-ipld-cbor"
1522
"go.uber.org/fx"
1623

1724
"github.com/ipfs/go-datastore"
@@ -48,6 +55,8 @@ var log = logging.Logger("index-provider-wrapper")
4855
type Wrapper struct {
4956
enabled bool
5057

58+
full v1api.FullNode
59+
miner address.Address
5160
cfg *config.Boost
5261
dealsDB *db.DealsDB
5362
legacyProv legacy.LegacyDealManager
@@ -64,15 +73,15 @@ type Wrapper struct {
6473
stop context.CancelFunc
6574
}
6675

67-
func NewWrapper(cfg *config.Boost) func(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, directDealsDB *db.DirectDealsDB, dealsDB *db.DealsDB,
76+
func NewWrapper(provAddr address.Address, cfg *config.Boost) func(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, directDealsDB *db.DirectDealsDB, dealsDB *db.DealsDB,
6877
ssDB *db.SectorStateDB, legacyProv legacy.LegacyDealManager, prov provider.Interface,
69-
piecedirectory *piecedirectory.PieceDirectory, ssm *sectorstatemgr.SectorStateMgr, meshCreator idxprov.MeshCreator, storageService lotus_modules.MinerStorageService) (*Wrapper, error) {
78+
piecedirectory *piecedirectory.PieceDirectory, ssm *sectorstatemgr.SectorStateMgr, meshCreator idxprov.MeshCreator, storageService lotus_modules.MinerStorageService, full v1api.FullNode) (*Wrapper, error) {
7079

7180
return func(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, directDealsDB *db.DirectDealsDB, dealsDB *db.DealsDB,
7281
ssDB *db.SectorStateDB, legacyProv legacy.LegacyDealManager, prov provider.Interface,
7382
piecedirectory *piecedirectory.PieceDirectory,
7483
ssm *sectorstatemgr.SectorStateMgr,
75-
meshCreator idxprov.MeshCreator, storageService lotus_modules.MinerStorageService) (*Wrapper, error) {
84+
meshCreator idxprov.MeshCreator, storageService lotus_modules.MinerStorageService, full v1api.FullNode) (*Wrapper, error) {
7685

7786
_, isDisabled := prov.(*DisabledIndexProvider)
7887

@@ -95,6 +104,8 @@ func NewWrapper(cfg *config.Boost) func(lc fx.Lifecycle, h host.Host, r repo.Loc
95104
bitswapEnabled: bitswapEnabled,
96105
httpEnabled: httpEnabled,
97106
ssm: ssm,
107+
full: full,
108+
miner: provAddr,
98109
}
99110
return w, nil
100111
}
@@ -445,6 +456,29 @@ func (w *Wrapper) IndexerAnnounceAllDeals(ctx context.Context) error {
445456
return errors.New("cannot announce all deals: index provider is disabled")
446457
}
447458

459+
mActor, err := w.full.StateGetActor(ctx, w.miner, chainTypes.EmptyTSK)
460+
if err != nil {
461+
return fmt.Errorf("getting actor for the miner %s: %w", w.miner, err)
462+
}
463+
464+
store := adt.WrapStore(ctx, cbor.NewCborStore(blockstore.NewAPIBlockstore(w.full)))
465+
mas, err := miner.Load(store, mActor)
466+
if err != nil {
467+
return fmt.Errorf("loading miner actor state %s: %w", w.miner, err)
468+
}
469+
liveSectors, err := miner.AllPartSectors(mas, miner.Partition.LiveSectors)
470+
if err != nil {
471+
return fmt.Errorf("getting live sector sets for miner %s: %w", w.miner, err)
472+
}
473+
unProvenSectors, err := miner.AllPartSectors(mas, miner.Partition.UnprovenSectors)
474+
if err != nil {
475+
return fmt.Errorf("getting unproven sector sets for miner %s: %w", w.miner, err)
476+
}
477+
activeSectors, err := bitfield.MergeBitFields(liveSectors, unProvenSectors)
478+
if err != nil {
479+
return fmt.Errorf("merging bitfields to generate all sealed sectors on miner %s: %w", w.miner, err)
480+
}
481+
448482
log.Info("announcing all legacy deals to Indexer")
449483

450484
legacyDeals, err := w.legacyProv.ListDeals()
@@ -476,6 +510,15 @@ func (w *Wrapper) IndexerAnnounceAllDeals(ctx context.Context) error {
476510
continue
477511
}
478512

513+
present, err := activeSectors.IsSet(uint64(d.SectorNumber))
514+
if err != nil {
515+
return fmt.Errorf("checking if bitfield is set: %w", err)
516+
}
517+
518+
if !present {
519+
continue
520+
}
521+
479522
adCid, lerr := w.AnnounceLegcayDealToIndexer(ctx, d.ProposalCid)
480523
if lerr != nil {
481524
merr = multierror.Append(merr, lerr)
@@ -508,6 +551,15 @@ func (w *Wrapper) IndexerAnnounceAllDeals(ctx context.Context) error {
508551
continue
509552
}
510553

554+
present, err := activeSectors.IsSet(uint64(d.SectorID))
555+
if err != nil {
556+
return fmt.Errorf("checking if bitfield is set: %w", err)
557+
}
558+
559+
if !present {
560+
continue
561+
}
562+
511563
if _, err := w.AnnounceBoostDeal(ctx, d); err != nil {
512564
// don't log already advertised errors as errors - just skip them
513565
if !errors.Is(err, provider.ErrAlreadyAdvertised) {

lib/pdcleaner/pdcleaner.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,9 @@ func (p *pdcleaner) clean() {
102102
// Run at start up
103103
log.Infof("Starting LID clean up")
104104
serr := p.CleanOnce(p.ctx)
105-
log.Errorf("Failed to cleanup LID: %s", serr)
105+
if serr != nil {
106+
log.Errorf("Failed to cleanup LID: %s", serr)
107+
}
106108
log.Debugf("Finished cleaning up LID")
107109

108110
// Create a ticker with an hour tick
@@ -185,6 +187,7 @@ func (p *pdcleaner) CleanOnce(ctx context.Context) error {
185187
}
186188
return fmt.Errorf("cleaning up boost deal %s for piece %s: %s", deal.DealUuid.String(), deal.ClientDealProposal.Proposal.PieceCID.String(), err.Error())
187189
}
190+
log.Infof("removed deal for %s and deal ID %s", deal.ClientDealProposal.Proposal.PieceCID.String(), deal.DealUuid.String())
188191
}
189192
return nil
190193
})
@@ -212,6 +215,7 @@ func (p *pdcleaner) CleanOnce(ctx context.Context) error {
212215
}
213216
return fmt.Errorf("cleaning up legacy deal %s for piece %s: %s", deal.ProposalCid.String(), deal.ClientDealProposal.Proposal.PieceCID.String(), err.Error())
214217
}
218+
log.Infof("removed legacy deal for %s and deal ID %s", deal.ClientDealProposal.Proposal.PieceCID.String(), deal.ProposalCid.String())
215219
}
216220
return nil
217221
})
@@ -240,6 +244,7 @@ func (p *pdcleaner) CleanOnce(ctx context.Context) error {
240244
}
241245
return fmt.Errorf("cleaning up direct deal %s for piece %s: %s", deal.ID.String(), deal.PieceCID, err.Error())
242246
}
247+
log.Infof("removed direct deal for %s and deal ID %s", deal.PieceCID.String(), deal.ID.String())
243248
}
244249
return nil
245250
})
@@ -299,6 +304,7 @@ func (p *pdcleaner) CleanOnce(ctx context.Context) error {
299304
}
300305
log.Errorf("cleaning up dangling deal %s for piece %s: %s", deal.DealUuid, piece, err.Error())
301306
}
307+
log.Infof("removed dangling deal for %s and deal ID %s", piece.String(), deal.DealUuid)
302308
}
303309
}
304310
return nil

node/builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,7 @@ func ConfigBoost(cfg *config.Boost) Option {
472472
Override(new(sealingpipeline.API), From(new(lotus_modules.MinerStorageService))),
473473

474474
Override(new(*sectorstatemgr.SectorStateMgr), sectorstatemgr.NewSectorStateMgr(cfg)),
475-
Override(new(*indexprovider.Wrapper), indexprovider.NewWrapper(cfg)),
475+
Override(new(*indexprovider.Wrapper), indexprovider.NewWrapper(walletMiner, cfg)),
476476
Override(new(storedask.StoredAsk), storedask.NewStoredAsk(cfg)),
477477

478478
Override(new(legacy.LegacyDealManager), modules.NewLegacyDealsManager),

piecedirectory/doctor.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"math/rand"
8+
"strings"
89
"time"
910

1011
"github.com/filecoin-project/boost/db"
@@ -13,11 +14,11 @@ import (
1314
"github.com/filecoin-project/boost/sectorstatemgr"
1415
"github.com/filecoin-project/go-address"
1516
"github.com/filecoin-project/go-state-types/abi"
17+
verifregtypes "github.com/filecoin-project/go-state-types/builtin/v9/verifreg"
1618
"github.com/filecoin-project/lotus/api"
1719
"github.com/filecoin-project/lotus/chain/types"
1820
"github.com/ipfs/go-cid"
1921
logging "github.com/ipfs/go-log/v2"
20-
verifregtypes "github.com/filecoin-project/go-state-types/builtin/v9/verifreg"
2122
)
2223

2324
var doclog = logging.Logger("piecedoc")
@@ -122,6 +123,18 @@ func (d *Doctor) checkPiece(ctx context.Context, pieceCid cid.Cid, lu *sectorsta
122123
// Check if piece belongs to an active sector
123124
md, err := d.store.GetPieceMetadata(ctx, pieceCid)
124125
if err != nil {
126+
// If piece is not found then it should be unflagged and removed from future tracking
127+
if strings.Contains(err.Error(), "not found") {
128+
serr := d.store.UnflagPiece(ctx, pieceCid, d.maddr)
129+
if serr != nil {
130+
return fmt.Errorf("failed to unflag the missing piece %s: %w", pieceCid.String(), serr)
131+
}
132+
serr = d.store.UntrackPiece(ctx, pieceCid, d.maddr)
133+
if serr != nil {
134+
return fmt.Errorf("failed to delete piece from tracker table %s: %w", pieceCid.String(), serr)
135+
}
136+
return nil
137+
}
125138
return fmt.Errorf("failed to get piece %s from local index directory: %w", pieceCid, err)
126139
}
127140

0 commit comments

Comments
 (0)