Skip to content

Commit 292d776

Browse files
authored
chore: move task implementations out of chain package (#399)
* chore: move task implementations out of chain package * fix comment grammar
1 parent ec0a276 commit 292d776

File tree

6 files changed

+59
-55
lines changed

6 files changed

+59
-55
lines changed

chain/indexer.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ import (
1818
"github.com/filecoin-project/sentinel-visor/metrics"
1919
"github.com/filecoin-project/sentinel-visor/model"
2020
visormodel "github.com/filecoin-project/sentinel-visor/model/visor"
21+
"github.com/filecoin-project/sentinel-visor/tasks/actorstate"
22+
"github.com/filecoin-project/sentinel-visor/tasks/blocks"
23+
"github.com/filecoin-project/sentinel-visor/tasks/chaineconomics"
2124
"github.com/filecoin-project/sentinel-visor/tasks/messages"
2225
"github.com/filecoin-project/sentinel-visor/tasks/msapprovals"
2326
)
@@ -83,45 +86,45 @@ func NewTipSetIndexer(o lens.APIOpener, d model.Storage, window time.Duration, n
8386
for _, task := range tasks {
8487
switch task {
8588
case BlocksTask:
86-
tsi.processors[BlocksTask] = NewBlockProcessor()
89+
tsi.processors[BlocksTask] = blocks.NewTask()
8790
case MessagesTask:
8891
tsi.messageProcessors[MessagesTask] = messages.NewTask(o)
8992
case ChainEconomicsTask:
90-
tsi.processors[ChainEconomicsTask] = NewChainEconomicsProcessor(o)
93+
tsi.processors[ChainEconomicsTask] = chaineconomics.NewTask(o)
9194
case ActorStatesRawTask:
92-
tsi.actorProcessors[ActorStatesRawTask] = NewActorStateProcessor(o, &RawActorExtractorMap{})
95+
tsi.actorProcessors[ActorStatesRawTask] = actorstate.NewTask(o, &actorstate.RawActorExtractorMap{})
9396
case ActorStatesPowerTask:
94-
tsi.actorProcessors[ActorStatesPowerTask] = NewActorStateProcessor(o, &TypedActorExtractorMap{
97+
tsi.actorProcessors[ActorStatesPowerTask] = actorstate.NewTask(o, &actorstate.TypedActorExtractorMap{
9598
CodeV1: sa0builtin.StoragePowerActorCodeID,
9699
CodeV2: sa2builtin.StoragePowerActorCodeID,
97100
CodeV3: sa3builtin.StoragePowerActorCodeID,
98101
})
99102
case ActorStatesRewardTask:
100-
tsi.actorProcessors[ActorStatesRewardTask] = NewActorStateProcessor(o, &TypedActorExtractorMap{
103+
tsi.actorProcessors[ActorStatesRewardTask] = actorstate.NewTask(o, &actorstate.TypedActorExtractorMap{
101104
CodeV1: sa0builtin.RewardActorCodeID,
102105
CodeV2: sa2builtin.RewardActorCodeID,
103106
CodeV3: sa3builtin.RewardActorCodeID,
104107
})
105108
case ActorStatesMinerTask:
106-
tsi.actorProcessors[ActorStatesMinerTask] = NewActorStateProcessor(o, &TypedActorExtractorMap{
109+
tsi.actorProcessors[ActorStatesMinerTask] = actorstate.NewTask(o, &actorstate.TypedActorExtractorMap{
107110
CodeV1: sa0builtin.StorageMinerActorCodeID,
108111
CodeV2: sa2builtin.StorageMinerActorCodeID,
109112
CodeV3: sa3builtin.StorageMinerActorCodeID,
110113
})
111114
case ActorStatesInitTask:
112-
tsi.actorProcessors[ActorStatesInitTask] = NewActorStateProcessor(o, &TypedActorExtractorMap{
115+
tsi.actorProcessors[ActorStatesInitTask] = actorstate.NewTask(o, &actorstate.TypedActorExtractorMap{
113116
CodeV1: sa0builtin.InitActorCodeID,
114117
CodeV2: sa2builtin.InitActorCodeID,
115118
CodeV3: sa3builtin.InitActorCodeID,
116119
})
117120
case ActorStatesMarketTask:
118-
tsi.actorProcessors[ActorStatesMarketTask] = NewActorStateProcessor(o, &TypedActorExtractorMap{
121+
tsi.actorProcessors[ActorStatesMarketTask] = actorstate.NewTask(o, &actorstate.TypedActorExtractorMap{
119122
CodeV1: sa0builtin.StorageMarketActorCodeID,
120123
CodeV2: sa2builtin.StorageMarketActorCodeID,
121124
CodeV3: sa3builtin.StorageMarketActorCodeID,
122125
})
123126
case ActorStatesMultisigTask:
124-
tsi.actorProcessors[ActorStatesMultisigTask] = NewActorStateProcessor(o, &TypedActorExtractorMap{
127+
tsi.actorProcessors[ActorStatesMultisigTask] = actorstate.NewTask(o, &actorstate.TypedActorExtractorMap{
125128
CodeV1: sa0builtin.MultisigActorCodeID,
126129
CodeV2: sa2builtin.MultisigActorCodeID,
127130
CodeV3: sa3builtin.MultisigActorCodeID,

chain/actor.go renamed to tasks/actorstate/task.go

Lines changed: 29 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package chain
1+
package actorstate
22

33
import (
44
"context"
@@ -15,33 +15,32 @@ import (
1515
"github.com/filecoin-project/sentinel-visor/metrics"
1616
"github.com/filecoin-project/sentinel-visor/model"
1717
visormodel "github.com/filecoin-project/sentinel-visor/model/visor"
18-
"github.com/filecoin-project/sentinel-visor/tasks/actorstate"
1918
)
2019

21-
// An ActorStateProcessor processes the extraction of actor state according the allowed types in its extracter map.
22-
type ActorStateProcessor struct {
20+
// A Task processes the extraction of actor state according the allowed types in its extracter map.
21+
type Task struct {
2322
node lens.API
2423
opener lens.APIOpener
2524
closer lens.APICloser
2625
extracterMap ActorExtractorMap
2726
}
2827

29-
func NewActorStateProcessor(opener lens.APIOpener, extracterMap ActorExtractorMap) *ActorStateProcessor {
30-
p := &ActorStateProcessor{
28+
func NewTask(opener lens.APIOpener, extracterMap ActorExtractorMap) *Task {
29+
p := &Task{
3130
opener: opener,
3231
extracterMap: extracterMap,
3332
}
3433
return p
3534
}
3635

37-
func (p *ActorStateProcessor) ProcessActors(ctx context.Context, ts *types.TipSet, pts *types.TipSet, candidates map[string]types.Actor) (model.Persistable, *visormodel.ProcessingReport, error) {
38-
if p.node == nil {
39-
node, closer, err := p.opener.Open(ctx)
36+
func (t *Task) ProcessActors(ctx context.Context, ts *types.TipSet, pts *types.TipSet, candidates map[string]types.Actor) (model.Persistable, *visormodel.ProcessingReport, error) {
37+
if t.node == nil {
38+
node, closer, err := t.opener.Open(ctx)
4039
if err != nil {
4140
return nil, nil, xerrors.Errorf("unable to open lens: %w", err)
4241
}
43-
p.node = node
44-
p.closer = closer
42+
t.node = node
43+
t.closer = closer
4544
}
4645

4746
log.Debugw("processing actor state changes", "height", ts.Height(), "parent_height", pts.Height())
@@ -57,7 +56,7 @@ func (p *ActorStateProcessor) ProcessActors(ctx context.Context, ts *types.TipSe
5756
// Filter to just allowed actors
5857
actors := map[string]types.Actor{}
5958
for addr, act := range candidates {
60-
if p.extracterMap.Allow(act.Code) {
59+
if t.extracterMap.Allow(act.Code) {
6160
actors[addr] = act
6261
}
6362
}
@@ -77,7 +76,7 @@ func (p *ActorStateProcessor) ProcessActors(ctx context.Context, ts *types.TipSe
7776
// Run each task concurrently
7877
results := make(chan *ActorStateResult, len(actors))
7978
for addr, act := range actors {
80-
go p.runActorStateExtraction(ctx, ts, pts, addr, act, results)
79+
go t.runActorStateExtraction(ctx, ts, pts, addr, act, results)
8180
}
8281

8382
// Gather results
@@ -86,13 +85,13 @@ func (p *ActorStateProcessor) ProcessActors(ctx context.Context, ts *types.TipSe
8685
res := <-results
8786
inFlight--
8887
elapsed := time.Since(start)
89-
lla := log.With("height", int64(ts.Height()), "actor", actorstate.ActorNameByCode(res.Code), "address", res.Address)
88+
lla := log.With("height", int64(ts.Height()), "actor", ActorNameByCode(res.Code), "address", res.Address)
9089

9190
if res.Error != nil {
9291
lla.Errorw("actor returned with error", "error", res.Error.Error())
9392
report.ErrorsDetected = append(errorsDetected, &ActorStateError{
9493
Code: res.Code.String(),
95-
Name: actorstate.ActorNameByCode(res.Code),
94+
Name: ActorNameByCode(res.Code),
9695
Head: res.Head.String(),
9796
Address: res.Address,
9897
Error: res.Error.Error(),
@@ -120,8 +119,8 @@ func (p *ActorStateProcessor) ProcessActors(ctx context.Context, ts *types.TipSe
120119
return data, report, nil
121120
}
122121

123-
func (p *ActorStateProcessor) runActorStateExtraction(ctx context.Context, ts *types.TipSet, pts *types.TipSet, addrStr string, act types.Actor, results chan *ActorStateResult) {
124-
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ActorCode, actorstate.ActorNameByCode(act.Code)))
122+
func (t *Task) runActorStateExtraction(ctx context.Context, ts *types.TipSet, pts *types.TipSet, addrStr string, act types.Actor, results chan *ActorStateResult) {
123+
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ActorCode, ActorNameByCode(act.Code)))
125124

126125
res := &ActorStateResult{
127126
Code: act.Code,
@@ -138,7 +137,7 @@ func (p *ActorStateProcessor) runActorStateExtraction(ctx context.Context, ts *t
138137
return
139138
}
140139

141-
info := actorstate.ActorInfo{
140+
info := ActorInfo{
142141
Actor: act,
143142
Address: addr,
144143
ParentStateRoot: pts.ParentState(),
@@ -147,12 +146,12 @@ func (p *ActorStateProcessor) runActorStateExtraction(ctx context.Context, ts *t
147146
ParentTipSet: pts.Parents(),
148147
}
149148

150-
extracter, ok := p.extracterMap.GetExtractor(act.Code)
149+
extracter, ok := t.extracterMap.GetExtractor(act.Code)
151150
if !ok {
152151
res.SkippedParse = true
153152
} else {
154153
// Parse state
155-
data, err := extracter.Extract(ctx, info, p.node)
154+
data, err := extracter.Extract(ctx, info, t.node)
156155
if err != nil {
157156
res.Error = xerrors.Errorf("failed to extract parsed actor state: %w", err)
158157
return
@@ -161,12 +160,12 @@ func (p *ActorStateProcessor) runActorStateExtraction(ctx context.Context, ts *t
161160
}
162161
}
163162

164-
func (p *ActorStateProcessor) Close() error {
165-
if p.closer != nil {
166-
p.closer()
167-
p.closer = nil
163+
func (t *Task) Close() error {
164+
if t.closer != nil {
165+
t.closer()
166+
t.closer = nil
168167
}
169-
p.node = nil
168+
t.node = nil
170169
return nil
171170
}
172171

@@ -190,7 +189,7 @@ type ActorStateError struct {
190189
// An ActorExtractorMap controls which actor types may be extracted.
191190
type ActorExtractorMap interface {
192191
Allow(code cid.Cid) bool
193-
GetExtractor(code cid.Cid) (actorstate.ActorStateExtractor, bool)
192+
GetExtractor(code cid.Cid) (ActorStateExtractor, bool)
194193
}
195194

196195
type ActorExtractorFilter interface {
@@ -204,8 +203,8 @@ func (RawActorExtractorMap) Allow(code cid.Cid) bool {
204203
return true
205204
}
206205

207-
func (RawActorExtractorMap) GetExtractor(code cid.Cid) (actorstate.ActorStateExtractor, bool) {
208-
return actorstate.ActorExtractor{}, true
206+
func (RawActorExtractorMap) GetExtractor(code cid.Cid) (ActorStateExtractor, bool) {
207+
return ActorExtractor{}, true
209208
}
210209

211210
// A TypedActorExtractorMap extracts a single type of actor using full parsing of actor state
@@ -220,9 +219,9 @@ func (t *TypedActorExtractorMap) Allow(code cid.Cid) bool {
220219
return code == t.CodeV1 || code == t.CodeV2 || code == t.CodeV3
221220
}
222221

223-
func (t *TypedActorExtractorMap) GetExtractor(code cid.Cid) (actorstate.ActorStateExtractor, bool) {
222+
func (t *TypedActorExtractorMap) GetExtractor(code cid.Cid) (ActorStateExtractor, bool) {
224223
if !t.Allow(code) {
225224
return nil, false
226225
}
227-
return actorstate.GetActorStateExtractor(code)
226+
return GetActorStateExtractor(code)
228227
}

chain/block.go renamed to tasks/blocks/task.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package chain
1+
package blocks
22

33
import (
44
"context"
@@ -10,14 +10,14 @@ import (
1010
visormodel "github.com/filecoin-project/sentinel-visor/model/visor"
1111
)
1212

13-
type BlockProcessor struct {
13+
type Task struct {
1414
}
1515

16-
func NewBlockProcessor() *BlockProcessor {
17-
return &BlockProcessor{}
16+
func NewTask() *Task {
17+
return &Task{}
1818
}
1919

20-
func (p *BlockProcessor) ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.Persistable, *visormodel.ProcessingReport, error) {
20+
func (p *Task) ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.Persistable, *visormodel.ProcessingReport, error) {
2121
var pl model.PersistableList
2222
for _, bh := range ts.Blocks() {
2323
select {
@@ -39,6 +39,6 @@ func (p *BlockProcessor) ProcessTipSet(ctx context.Context, ts *types.TipSet) (m
3939
return pl, report, nil
4040
}
4141

42-
func (p *BlockProcessor) Close() error {
42+
func (p *Task) Close() error {
4343
return nil
4444
}

tasks/chain/economics.go renamed to tasks/chaineconomics/economics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package chain
1+
package chaineconomics
22

33
import (
44
"context"

tasks/chain/economics_test.go renamed to tasks/chaineconomics/economics_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package chain
1+
package chaineconomics
22

33
import (
44
"context"

chain/economics.go renamed to tasks/chaineconomics/task.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,32 @@
1-
package chain
1+
package chaineconomics
22

33
import (
44
"context"
55

66
"github.com/filecoin-project/lotus/chain/types"
7+
logging "github.com/ipfs/go-log/v2"
78
"golang.org/x/xerrors"
89

910
"github.com/filecoin-project/sentinel-visor/lens"
1011
"github.com/filecoin-project/sentinel-visor/model"
1112
visormodel "github.com/filecoin-project/sentinel-visor/model/visor"
12-
"github.com/filecoin-project/sentinel-visor/tasks/chain"
1313
)
1414

15-
type ChainEconomicsProcessor struct {
15+
var log = logging.Logger("chaineconomics")
16+
17+
type Task struct {
1618
node lens.API
1719
opener lens.APIOpener
1820
closer lens.APICloser
1921
}
2022

21-
func NewChainEconomicsProcessor(opener lens.APIOpener) *ChainEconomicsProcessor {
22-
return &ChainEconomicsProcessor{
23+
func NewTask(opener lens.APIOpener) *Task {
24+
return &Task{
2325
opener: opener,
2426
}
2527
}
2628

27-
func (p *ChainEconomicsProcessor) ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.Persistable, *visormodel.ProcessingReport, error) {
29+
func (p *Task) ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.Persistable, *visormodel.ProcessingReport, error) {
2830
if p.node == nil {
2931
node, closer, err := p.opener.Open(ctx)
3032
if err != nil {
@@ -40,7 +42,7 @@ func (p *ChainEconomicsProcessor) ProcessTipSet(ctx context.Context, ts *types.T
4042
StateRoot: ts.ParentState().String(),
4143
}
4244

43-
ce, err := chain.ExtractChainEconomicsModel(ctx, p.node, ts)
45+
ce, err := ExtractChainEconomicsModel(ctx, p.node, ts)
4446
if err != nil {
4547
log.Errorw("error received while extracting chain economics, closing lens", "error", err)
4648
if cerr := p.Close(); cerr != nil {
@@ -52,7 +54,7 @@ func (p *ChainEconomicsProcessor) ProcessTipSet(ctx context.Context, ts *types.T
5254
return ce, report, nil
5355
}
5456

55-
func (p *ChainEconomicsProcessor) Close() error {
57+
func (p *Task) Close() error {
5658
if p.closer != nil {
5759
p.closer()
5860
p.closer = nil

0 commit comments

Comments
 (0)