@@ -18,6 +18,8 @@ import (
18
18
"github.com/filecoin-project/sentinel-visor/metrics"
19
19
"github.com/filecoin-project/sentinel-visor/model"
20
20
visormodel "github.com/filecoin-project/sentinel-visor/model/visor"
21
+ "github.com/filecoin-project/sentinel-visor/tasks/messages"
22
+ "github.com/filecoin-project/sentinel-visor/tasks/msapprovals"
21
23
)
22
24
23
25
const (
@@ -31,6 +33,7 @@ const (
31
33
BlocksTask = "blocks" // task that extracts block data
32
34
MessagesTask = "messages" // task that extracts message data
33
35
ChainEconomicsTask = "chaineconomics" // task that extracts chain economics data
36
+ MultisigApprovalsTask = "msapprovals" // task that extracts multisig actor approvals
34
37
)
35
38
36
39
var log = logging .Logger ("chain" )
@@ -39,16 +42,17 @@ var _ TipSetObserver = (*TipSetIndexer)(nil)
39
42
40
43
// A TipSetWatcher waits for tipsets and persists their block data into a database.
41
44
type TipSetIndexer struct {
42
- window time.Duration
43
- storage model.Storage
44
- processors map [string ]TipSetProcessor
45
- actorProcessors map [string ]ActorProcessor
46
- name string
47
- persistSlot chan struct {}
48
- lastTipSet * types.TipSet
49
- node lens.API
50
- opener lens.APIOpener
51
- closer lens.APICloser
45
+ window time.Duration
46
+ storage model.Storage
47
+ processors map [string ]TipSetProcessor
48
+ messageProcessors map [string ]MessageProcessor
49
+ actorProcessors map [string ]ActorProcessor
50
+ name string
51
+ persistSlot chan struct {}
52
+ lastTipSet * types.TipSet
53
+ node lens.API
54
+ opener lens.APIOpener
55
+ closer lens.APICloser
52
56
}
53
57
54
58
// A TipSetIndexer extracts block, message and actor state data from a tipset and persists it to storage. Extraction
@@ -57,21 +61,22 @@ type TipSetIndexer struct {
57
61
// indexer is used as the reporter in the visor_processing_reports table.
58
62
func NewTipSetIndexer (o lens.APIOpener , d model.Storage , window time.Duration , name string , tasks []string ) (* TipSetIndexer , error ) {
59
63
tsi := & TipSetIndexer {
60
- storage : d ,
61
- window : window ,
62
- name : name ,
63
- persistSlot : make (chan struct {}, 1 ), // allow one concurrent persistence job
64
- processors : map [string ]TipSetProcessor {},
65
- actorProcessors : map [string ]ActorProcessor {},
66
- opener : o ,
64
+ storage : d ,
65
+ window : window ,
66
+ name : name ,
67
+ persistSlot : make (chan struct {}, 1 ), // allow one concurrent persistence job
68
+ processors : map [string ]TipSetProcessor {},
69
+ messageProcessors : map [string ]MessageProcessor {},
70
+ actorProcessors : map [string ]ActorProcessor {},
71
+ opener : o ,
67
72
}
68
73
69
74
for _ , task := range tasks {
70
75
switch task {
71
76
case BlocksTask :
72
77
tsi .processors [BlocksTask ] = NewBlockProcessor ()
73
78
case MessagesTask :
74
- tsi .processors [MessagesTask ] = NewMessageProcessor (o )
79
+ tsi .messageProcessors [MessagesTask ] = messages . NewTask (o )
75
80
case ChainEconomicsTask :
76
81
tsi .processors [ChainEconomicsTask ] = NewChainEconomicsProcessor (o )
77
82
case ActorStatesRawTask :
@@ -112,6 +117,8 @@ func NewTipSetIndexer(o lens.APIOpener, d model.Storage, window time.Duration, n
112
117
CodeV2 : sa2builtin .MultisigActorCodeID ,
113
118
CodeV3 : sa3builtin .MultisigActorCodeID ,
114
119
})
120
+ case MultisigApprovalsTask :
121
+ tsi .messageProcessors [MultisigApprovalsTask ] = msapprovals .NewTask (o )
115
122
default :
116
123
return nil , xerrors .Errorf ("unknown task: %s" , task )
117
124
}
@@ -135,6 +142,8 @@ func (t *TipSetIndexer) TipSet(ctx context.Context, ts *types.TipSet) error {
135
142
}
136
143
defer cancel ()
137
144
145
+ ll := log .With ("height" , int64 (ts .Height ()))
146
+
138
147
start := time .Now ()
139
148
140
149
inFlight := 0
@@ -149,8 +158,8 @@ func (t *TipSetIndexer) TipSet(ctx context.Context, ts *types.TipSet) error {
149
158
go t .runProcessor (tctx , p , name , ts , results )
150
159
}
151
160
152
- // Run each actor processing task concurrently if we have any and we've seen a previous tipset to compare with
153
- if len (t .actorProcessors ) > 0 {
161
+ // Run each actor or message processing task concurrently if we have any and we've seen a previous tipset to compare with
162
+ if len (t .actorProcessors ) > 0 || len ( t . messageProcessors ) > 0 {
154
163
155
164
// Actor processors perform a diff between two tipsets so we need to keep track of parent and child
156
165
var parent , child * types.TipSet
@@ -181,36 +190,65 @@ func (t *TipSetIndexer) TipSet(ctx context.Context, ts *types.TipSet) error {
181
190
t .closer = closer
182
191
}
183
192
184
- changes , err := t .node .StateChangedActors (tctx , parent .ParentState (), child .ParentState ())
185
- if err != nil {
186
-
187
- terr := xerrors .Errorf ("failed to extract actor changes: %w" , err )
188
- // We need to report that all actor tasks failed
189
- for name := range t .actorProcessors {
190
- report := & visormodel.ProcessingReport {
191
- Height : int64 (ts .Height ()),
192
- StateRoot : ts .ParentState ().String (),
193
- Reporter : t .name ,
194
- Task : name ,
195
- StartedAt : start ,
196
- CompletedAt : time .Now (),
197
- Status : visormodel .ProcessingStatusError ,
198
- ErrorsDetected : terr ,
193
+ // If we have message processors then extract the messages and receipts
194
+ if len (t .messageProcessors ) > 0 {
195
+ emsgs , err := t .node .GetExecutedMessagesForTipset (ctx , child , parent )
196
+ if err == nil {
197
+ // Start all the message processors
198
+ for name , p := range t .messageProcessors {
199
+ inFlight ++
200
+ go t .runMessageProcessor (tctx , p , name , child , parent , emsgs , results )
201
+ }
202
+ } else {
203
+ ll .Errorw ("failed to extract messages" , "error" , err )
204
+ terr := xerrors .Errorf ("failed to extract messages: %w" , err )
205
+ // We need to report that all message tasks failed
206
+ for name := range t .messageProcessors {
207
+ report := & visormodel.ProcessingReport {
208
+ Height : int64 (ts .Height ()),
209
+ StateRoot : ts .ParentState ().String (),
210
+ Reporter : t .name ,
211
+ Task : name ,
212
+ StartedAt : start ,
213
+ CompletedAt : time .Now (),
214
+ Status : visormodel .ProcessingStatusError ,
215
+ ErrorsDetected : terr ,
216
+ }
217
+ taskOutputs [name ] = model.PersistableList {report }
199
218
}
200
- taskOutputs [name ] = model.PersistableList {report }
201
219
}
202
- return terr
203
220
}
204
221
205
- for name , p := range t .actorProcessors {
206
- inFlight ++
207
- go t .runActorProcessor (tctx , p , name , child , parent , changes , results )
222
+ // If we have actor processors then find actors that have changed state
223
+ if len (t .actorProcessors ) > 0 {
224
+ changes , err := t .node .StateChangedActors (tctx , parent .ParentState (), child .ParentState ())
225
+ if err == nil {
226
+ for name , p := range t .actorProcessors {
227
+ inFlight ++
228
+ go t .runActorProcessor (tctx , p , name , child , parent , changes , results )
229
+ }
230
+ } else {
231
+ ll .Errorw ("failed to extract actor changes" , "error" , err )
232
+ terr := xerrors .Errorf ("failed to extract actor changes: %w" , err )
233
+ // We need to report that all actor tasks failed
234
+ for name := range t .actorProcessors {
235
+ report := & visormodel.ProcessingReport {
236
+ Height : int64 (ts .Height ()),
237
+ StateRoot : ts .ParentState ().String (),
238
+ Reporter : t .name ,
239
+ Task : name ,
240
+ StartedAt : start ,
241
+ CompletedAt : time .Now (),
242
+ Status : visormodel .ProcessingStatusError ,
243
+ ErrorsDetected : terr ,
244
+ }
245
+ taskOutputs [name ] = model.PersistableList {report }
246
+ }
247
+ }
208
248
}
209
249
}
210
250
}
211
251
212
- ll := log .With ("height" , int64 (ts .Height ()))
213
-
214
252
// Wait for all tasks to complete
215
253
for inFlight > 0 {
216
254
res := <- results
@@ -326,6 +364,28 @@ func (t *TipSetIndexer) runProcessor(ctx context.Context, p TipSetProcessor, nam
326
364
}
327
365
}
328
366
367
+ func (t * TipSetIndexer ) runMessageProcessor (ctx context.Context , p MessageProcessor , name string , ts , pts * types.TipSet , emsgs []* lens.ExecutedMessage , results chan * TaskResult ) {
368
+ ctx , _ = tag .New (ctx , tag .Upsert (metrics .TaskType , name ))
369
+ stats .Record (ctx , metrics .TipsetHeight .M (int64 (ts .Height ())))
370
+ stop := metrics .Timer (ctx , metrics .ProcessingDuration )
371
+ defer stop ()
372
+
373
+ data , report , err := p .ProcessMessages (ctx , ts , pts , emsgs )
374
+ if err != nil {
375
+ stats .Record (ctx , metrics .ProcessingFailure .M (1 ))
376
+ results <- & TaskResult {
377
+ Task : name ,
378
+ Error : err ,
379
+ }
380
+ return
381
+ }
382
+ results <- & TaskResult {
383
+ Task : name ,
384
+ Report : report ,
385
+ Data : data ,
386
+ }
387
+ }
388
+
329
389
func (t * TipSetIndexer ) runActorProcessor (ctx context.Context , p ActorProcessor , name string , ts , pts * types.TipSet , actors map [string ]types.Actor , results chan * TaskResult ) {
330
390
ctx , _ = tag .New (ctx , tag .Upsert (metrics .TaskType , name ))
331
391
stats .Record (ctx , metrics .TipsetHeight .M (int64 (ts .Height ())))
@@ -384,6 +444,14 @@ type TipSetProcessor interface {
384
444
Close () error
385
445
}
386
446
447
+ type MessageProcessor interface {
448
+ // ProcessMessages processes messages contained within a tipset. If error is non-nil then the processor encountered a fatal error.
449
+ // pts is the tipset containing the messages, ts is the tipset containing the receipts
450
+ // Any data returned must be accompanied by a processing report.
451
+ ProcessMessages (ctx context.Context , ts * types.TipSet , pts * types.TipSet , emsgs []* lens.ExecutedMessage ) (model.Persistable , * visormodel.ProcessingReport , error )
452
+ Close () error
453
+ }
454
+
387
455
type ActorProcessor interface {
388
456
// ProcessActor processes a set of actors. If error is non-nil then the processor encountered a fatal error.
389
457
// Any data returned must be accompanied by a processing report.
0 commit comments