Skip to content

Commit ab09cca

Browse files
authored
Replace ExecutedAndBlockMessages with individual methods (#1040)
* refactor: remove block messages from TipSetMessages and refactor processors * chore: TipSetMessageReceipts implemented and wired up * chore: update miner post extractor * polish: TipSetMessageReceipts iterator * refactor: update msapproaval extractor * refactor: update parsed messages task * refactor: memoize TipSetMessageReceipts * refactor: update gas economy * refactor: update gas outs task * refactor: remove ExecutedAndBlockMessages method * address review feedback
1 parent da510c3 commit ab09cca

File tree

21 files changed

+831
-688
lines changed

21 files changed

+831
-688
lines changed

chain/datasource/datasource.go

Lines changed: 81 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,23 @@ import (
1212

1313
"github.com/filecoin-project/go-address"
1414
"github.com/filecoin-project/go-hamt-ipld/v3"
15+
"github.com/filecoin-project/go-state-types/abi"
1516
"github.com/filecoin-project/lotus/api"
1617
"github.com/filecoin-project/lotus/chain/state"
1718
"github.com/filecoin-project/lotus/chain/types"
19+
"github.com/filecoin-project/lotus/chain/vm"
20+
states0 "github.com/filecoin-project/specs-actors/actors/states"
21+
states2 "github.com/filecoin-project/specs-actors/v2/actors/states"
22+
states3 "github.com/filecoin-project/specs-actors/v3/actors/states"
23+
states4 "github.com/filecoin-project/specs-actors/v4/actors/states"
24+
states5 "github.com/filecoin-project/specs-actors/v5/actors/states"
1825
lru "github.com/hashicorp/golang-lru"
1926
"github.com/ipfs/go-cid"
2027
logging "github.com/ipfs/go-log/v2"
2128
"go.opentelemetry.io/otel"
2229
"go.opentelemetry.io/otel/attribute"
2330
"golang.org/x/sync/singleflight"
2431

25-
states0 "github.com/filecoin-project/specs-actors/actors/states"
26-
states2 "github.com/filecoin-project/specs-actors/v2/actors/states"
27-
states3 "github.com/filecoin-project/specs-actors/v3/actors/states"
28-
states4 "github.com/filecoin-project/specs-actors/v4/actors/states"
29-
states5 "github.com/filecoin-project/specs-actors/v5/actors/states"
30-
3132
"github.com/filecoin-project/lily/chain/actors/adt"
3233
"github.com/filecoin-project/lily/chain/actors/adt/diff"
3334
"github.com/filecoin-project/lily/chain/actors/builtin/miner"
@@ -37,28 +38,28 @@ import (
3738
)
3839

3940
var (
40-
executedBlkMsgCacheSize int
41-
executedTsCacheSize int
42-
diffPreCommitCacheSize int
43-
diffSectorCacheSize int
44-
45-
executedBlkMsgCacheSizeEnv = "LILY_EXECUTED_BLK_MSG_CACHE_SIZE"
46-
executedTsCacheSizeEnv = "LILY_EXECUTED_TS_CACHE_SIZE"
47-
diffPreCommitCacheSizeEnv = "LILY_DIFF_PRECOMMIT_CACHE_SIZE"
48-
diffSectorCacheSizeEnv = "LILY_DIFF_SECTORS_CACHE_SIZE"
41+
tipsetMessageReceiptCacheSize int
42+
executedTsCacheSize int
43+
diffPreCommitCacheSize int
44+
diffSectorCacheSize int
45+
46+
tipsetMessageReceiptSizeEnv = "LILY_TIPSET_MSG_RECEIPT_CACHE_SIZE"
47+
executedTsCacheSizeEnv = "LILY_EXECUTED_TS_CACHE_SIZE"
48+
diffPreCommitCacheSizeEnv = "LILY_DIFF_PRECOMMIT_CACHE_SIZE"
49+
diffSectorCacheSizeEnv = "LILY_DIFF_SECTORS_CACHE_SIZE"
4950
)
5051

5152
func init() {
52-
executedBlkMsgCacheSize = 4
53+
tipsetMessageReceiptCacheSize = 4
5354
executedTsCacheSize = 4
5455
diffPreCommitCacheSize = 500
5556
diffSectorCacheSize = 500
56-
if s := os.Getenv(executedBlkMsgCacheSizeEnv); s != "" {
57+
if s := os.Getenv(tipsetMessageReceiptSizeEnv); s != "" {
5758
v, err := strconv.ParseInt(s, 10, 64)
5859
if err == nil {
59-
executedBlkMsgCacheSize = int(v)
60+
tipsetMessageReceiptCacheSize = int(v)
6061
} else {
61-
log.Warnf("invalid value (%s) for %s defaulting to %d: %s", s, executedBlkMsgCacheSizeEnv, executedBlkMsgCacheSize, err)
62+
log.Warnf("invalid value (%s) for %s defaulting to %d: %s", s, tipsetMessageReceiptSizeEnv, tipsetMessageReceiptCacheSize, err)
6263
}
6364
}
6465
if s := os.Getenv(executedTsCacheSizeEnv); s != "" {
@@ -92,28 +93,12 @@ var _ tasks.DataSource = (*DataSource)(nil)
9293

9394
var log = logging.Logger("lily/datasource")
9495

95-
type DataSource struct {
96-
node lens.API
97-
98-
executedBlkMsgCache *lru.Cache
99-
executedBlkMsgGroup singleflight.Group
100-
101-
executedTsCache *lru.Cache
102-
executedTsGroup singleflight.Group
103-
104-
diffSectorsCache *lru.Cache
105-
diffSectorsGroup singleflight.Group
106-
107-
diffPreCommitCache *lru.Cache
108-
diffPreCommitGroup singleflight.Group
109-
}
110-
11196
func NewDataSource(node lens.API) (*DataSource, error) {
11297
t := &DataSource{
11398
node: node,
11499
}
115100
var err error
116-
t.executedBlkMsgCache, err = lru.New(executedBlkMsgCacheSize)
101+
t.tsBlkMsgRecCache, err = lru.New(tipsetMessageReceiptCacheSize)
117102
if err != nil {
118103
return nil, err
119104
}
@@ -137,6 +122,56 @@ func NewDataSource(node lens.API) (*DataSource, error) {
137122
return t, nil
138123
}
139124

125+
type DataSource struct {
126+
node lens.API
127+
128+
executedTsCache *lru.Cache
129+
executedTsGroup singleflight.Group
130+
131+
tsBlkMsgRecCache *lru.Cache
132+
tsBlkMsgRecGroup singleflight.Group
133+
134+
diffSectorsCache *lru.Cache
135+
diffSectorsGroup singleflight.Group
136+
137+
diffPreCommitCache *lru.Cache
138+
diffPreCommitGroup singleflight.Group
139+
}
140+
141+
func (t *DataSource) ComputeBaseFee(ctx context.Context, ts *types.TipSet) (abi.TokenAmount, error) {
142+
return t.node.ComputeBaseFee(ctx, ts)
143+
}
144+
145+
func (t *DataSource) TipSetBlockMessages(ctx context.Context, ts *types.TipSet) ([]*lens.BlockMessages, error) {
146+
return t.node.MessagesForTipSetBlocks(ctx, ts)
147+
}
148+
149+
// TipSetMessageReceipts returns the blocks and messages in `pts` and their corresponding receipts from `ts` matching block order in tipset (`pts`).
150+
// TODO replace with lotus chainstore method when https://github.com/filecoin-project/lotus/pull/9186 lands
151+
func (t *DataSource) TipSetMessageReceipts(ctx context.Context, ts, pts *types.TipSet) ([]*lens.BlockMessageReceipts, error) {
152+
key, err := asKey(ts, pts)
153+
if err != nil {
154+
return nil, err
155+
}
156+
value, found := t.tsBlkMsgRecCache.Get(key)
157+
if found {
158+
return value.([]*lens.BlockMessageReceipts), nil
159+
}
160+
161+
value, err, _ = t.tsBlkMsgRecGroup.Do(key, func() (interface{}, error) {
162+
data, innerErr := t.node.TipSetMessageReceipts(ctx, ts, pts)
163+
if innerErr == nil {
164+
t.tsBlkMsgRecCache.Add(key, data)
165+
}
166+
return data, innerErr
167+
})
168+
if err != nil {
169+
return nil, err
170+
}
171+
172+
return value.([]*lens.BlockMessageReceipts), nil
173+
}
174+
140175
func (t *DataSource) TipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) {
141176
ctx, span := otel.Tracer("").Start(ctx, "DataSource.TipSet")
142177
if span.IsRecording() {
@@ -239,45 +274,20 @@ func (t *DataSource) MessageExecutions(ctx context.Context, ts, pts *types.TipSe
239274
return value.([]*lens.MessageExecution), nil
240275
}
241276

242-
func (t *DataSource) ExecutedAndBlockMessages(ctx context.Context, ts, pts *types.TipSet) (*lens.TipSetMessages, error) {
243-
metrics.RecordInc(ctx, metrics.DataSourceExecutedAndBlockMessagesRead)
244-
ctx, span := otel.Tracer("").Start(ctx, "DataSource.ExecutedAndBlockMessages")
245-
if span.IsRecording() {
246-
span.SetAttributes(attribute.String("tipset", ts.Key().String()))
247-
span.SetAttributes(attribute.String("parent", pts.Key().String()))
248-
}
249-
defer span.End()
250-
251-
key, err := asKey(ts, pts)
252-
if err != nil {
253-
return nil, err
254-
}
255-
value, found := t.executedBlkMsgCache.Get(key)
256-
if found {
257-
metrics.RecordInc(ctx, metrics.DataSourceExecutedAndBlockMessagesCacheHit)
258-
return value.(*lens.TipSetMessages), nil
259-
}
260-
261-
value, err, shared := t.executedBlkMsgGroup.Do(key, func() (interface{}, error) {
262-
data, innerErr := t.node.GetExecutedAndBlockMessagesForTipset(ctx, ts, pts)
263-
if innerErr == nil {
264-
t.executedBlkMsgCache.Add(key, data)
265-
}
277+
func (t *DataSource) MinerLoad(store adt.Store, act *types.Actor) (miner.State, error) {
278+
return miner.Load(store, act)
279+
}
266280

267-
return data, innerErr
268-
})
281+
func (t *DataSource) ShouldBurnFn(ctx context.Context, ts *types.TipSet) (lens.ShouldBurnFn, error) {
282+
return t.node.BurnFundsFn(ctx, ts)
283+
}
269284

270-
if span.IsRecording() {
271-
span.SetAttributes(attribute.Bool("shared", shared))
272-
}
285+
func ComputeGasOutputs(ctx context.Context, block *types.BlockHeader, message *types.Message, receipt *types.MessageReceipt, shouldBurnFn lens.ShouldBurnFn) (vm.GasOutputs, error) {
286+
burn, err := shouldBurnFn(ctx, message, receipt.ExitCode)
273287
if err != nil {
274-
return nil, err
288+
return vm.GasOutputs{}, err
275289
}
276-
return value.(*lens.TipSetMessages), nil
277-
}
278-
279-
func (t *DataSource) MinerLoad(store adt.Store, act *types.Actor) (miner.State, error) {
280-
return miner.Load(store, act)
290+
return vm.ComputeGasOutputs(receipt.GasUsed, message.GasLimit, block.ParentBaseFee, message.GasFeeCap, message.GasPremium, burn), nil
281291
}
282292

283293
func GetActorStateChanges(ctx context.Context, store adt.Store, current, executed *types.TipSet) (tasks.ActorStateChangeDiff, error) {

chain/indexer/integrated/processor/state.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -541,11 +541,14 @@ func MakeProcessors(api tasks.DataSource, indexerTasks []string) (*IndexerProces
541541
// Messages
542542
//
543543
case tasktype.Message:
544-
out.TipsetsProcessors[t] = messagetask.NewTask(api)
544+
out.TipsetProcessors[t] = messagetask.NewTask(api)
545+
case tasktype.BlockMessage:
546+
out.TipsetProcessors[t] = bmtask.NewTask(api)
547+
case tasktype.MessageGasEconomy:
548+
out.TipsetProcessors[t] = gasecontask.NewTask(api)
549+
545550
case tasktype.GasOutputs:
546551
out.TipsetsProcessors[t] = gasouttask.NewTask(api)
547-
case tasktype.BlockMessage:
548-
out.TipsetsProcessors[t] = bmtask.NewTask(api)
549552
case tasktype.ParsedMessage:
550553
out.TipsetsProcessors[t] = parentmessagetask.NewTask(api)
551554
case tasktype.Receipt:
@@ -554,8 +557,6 @@ func MakeProcessors(api tasks.DataSource, indexerTasks []string) (*IndexerProces
554557
out.TipsetsProcessors[t] = imtask.NewTask(api)
555558
case tasktype.InternalParsedMessage:
556559
out.TipsetsProcessors[t] = ipmtask.NewTask(api)
557-
case tasktype.MessageGasEconomy:
558-
out.TipsetsProcessors[t] = gasecontask.NewTask(api)
559560
case tasktype.MultisigApproval:
560561
out.TipsetsProcessors[t] = msapprovaltask.NewTask(api)
561562
case tasktype.VmMessage:

chain/indexer/integrated/processor/state_internal_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ import (
1616
"github.com/filecoin-project/lily/chain/actors/builtin/verifreg"
1717
"github.com/filecoin-project/lily/chain/indexer/tasktype"
1818
"github.com/filecoin-project/lily/tasks/messageexecutions/vm"
19+
"github.com/filecoin-project/lily/tasks/messages/blockmessage"
20+
"github.com/filecoin-project/lily/tasks/messages/gaseconomy"
21+
"github.com/filecoin-project/lily/tasks/messages/message"
1922

2023
"github.com/filecoin-project/lily/tasks/actorstate"
2124
inittask "github.com/filecoin-project/lily/tasks/actorstate/init_"
@@ -34,10 +37,7 @@ import (
3437
"github.com/filecoin-project/lily/tasks/consensus"
3538
"github.com/filecoin-project/lily/tasks/messageexecutions/internalmessage"
3639
"github.com/filecoin-project/lily/tasks/messageexecutions/internalparsedmessage"
37-
"github.com/filecoin-project/lily/tasks/messages/blockmessage"
38-
"github.com/filecoin-project/lily/tasks/messages/gaseconomy"
3940
"github.com/filecoin-project/lily/tasks/messages/gasoutput"
40-
"github.com/filecoin-project/lily/tasks/messages/message"
4141
"github.com/filecoin-project/lily/tasks/messages/parsedmessage"
4242
"github.com/filecoin-project/lily/tasks/messages/receipt"
4343
"github.com/filecoin-project/lily/tasks/msapprovals"
@@ -48,26 +48,26 @@ func TestNewProcessor(t *testing.T) {
4848
require.NoError(t, err)
4949
require.Equal(t, t.Name(), proc.name)
5050
require.Len(t, proc.actorProcessors, 21)
51-
require.Len(t, proc.tipsetProcessors, 5)
52-
require.Len(t, proc.tipsetsProcessors, 10)
51+
require.Len(t, proc.tipsetProcessors, 8)
52+
require.Len(t, proc.tipsetsProcessors, 7)
5353
require.Len(t, proc.builtinProcessors, 1)
5454

55-
require.Equal(t, message.NewTask(nil), proc.tipsetsProcessors[tasktype.Message])
5655
require.Equal(t, gasoutput.NewTask(nil), proc.tipsetsProcessors[tasktype.GasOutputs])
57-
require.Equal(t, blockmessage.NewTask(nil), proc.tipsetsProcessors[tasktype.BlockMessage])
5856
require.Equal(t, parsedmessage.NewTask(nil), proc.tipsetsProcessors[tasktype.ParsedMessage])
5957
require.Equal(t, receipt.NewTask(nil), proc.tipsetsProcessors[tasktype.Receipt])
6058
require.Equal(t, internalmessage.NewTask(nil), proc.tipsetsProcessors[tasktype.InternalMessage])
6159
require.Equal(t, internalparsedmessage.NewTask(nil), proc.tipsetsProcessors[tasktype.InternalParsedMessage])
62-
require.Equal(t, gaseconomy.NewTask(nil), proc.tipsetsProcessors[tasktype.MessageGasEconomy])
6360
require.Equal(t, msapprovals.NewTask(nil), proc.tipsetsProcessors[tasktype.MultisigApproval])
6461
require.Equal(t, vm.NewTask(nil), proc.tipsetsProcessors[tasktype.VmMessage])
6562

63+
require.Equal(t, message.NewTask(nil), proc.tipsetProcessors[tasktype.Message])
64+
require.Equal(t, blockmessage.NewTask(nil), proc.tipsetProcessors[tasktype.BlockMessage])
6665
require.Equal(t, headers.NewTask(), proc.tipsetProcessors[tasktype.BlockHeader])
6766
require.Equal(t, parents.NewTask(), proc.tipsetProcessors[tasktype.BlockParent])
6867
require.Equal(t, drand.NewTask(), proc.tipsetProcessors[tasktype.DrandBlockEntrie])
6968
require.Equal(t, chaineconomics.NewTask(nil), proc.tipsetProcessors[tasktype.ChainEconomics])
7069
require.Equal(t, consensus.NewTask(nil), proc.tipsetProcessors[tasktype.ChainConsensus])
70+
require.Equal(t, gaseconomy.NewTask(nil), proc.tipsetProcessors[tasktype.MessageGasEconomy])
7171

7272
require.Equal(t, actorstate.NewTask(nil, actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.DeadlineInfoExtractor{})), proc.actorProcessors[tasktype.MinerCurrentDeadlineInfo])
7373
require.Equal(t, actorstate.NewTask(nil, actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.FeeDebtExtractor{})), proc.actorProcessors[tasktype.MinerFeeDebt])

chain/indexer/integrated/processor/state_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -269,47 +269,47 @@ func TestMakeProcessorsActors(t *testing.T) {
269269

270270
func TestMakeProcessorsTipSet(t *testing.T) {
271271
tasks := []string{
272+
tasktype.Message,
273+
tasktype.BlockMessage,
272274
tasktype.BlockHeader,
273275
tasktype.BlockParent,
274276
tasktype.DrandBlockEntrie,
275277
tasktype.ChainEconomics,
276278
tasktype.ChainConsensus,
279+
tasktype.MessageGasEconomy,
277280
}
278281
proc, err := processor.MakeProcessors(nil, tasks)
279282
require.NoError(t, err)
280283
require.Len(t, proc.TipsetProcessors, len(tasks))
281284

285+
require.Equal(t, message.NewTask(nil), proc.TipsetProcessors[tasktype.Message])
286+
require.Equal(t, blockmessage.NewTask(nil), proc.TipsetProcessors[tasktype.BlockMessage])
282287
require.Equal(t, headers.NewTask(), proc.TipsetProcessors[tasktype.BlockHeader])
283288
require.Equal(t, parents.NewTask(), proc.TipsetProcessors[tasktype.BlockParent])
284289
require.Equal(t, drand.NewTask(), proc.TipsetProcessors[tasktype.DrandBlockEntrie])
285290
require.Equal(t, chaineconomics.NewTask(nil), proc.TipsetProcessors[tasktype.ChainEconomics])
286291
require.Equal(t, consensus.NewTask(nil), proc.TipsetProcessors[tasktype.ChainConsensus])
292+
require.Equal(t, gaseconomy.NewTask(nil), proc.TipsetProcessors[tasktype.MessageGasEconomy])
287293
}
288294

289295
func TestMakeProcessorsTipSets(t *testing.T) {
290296
tasks := []string{
291-
tasktype.Message,
292297
tasktype.GasOutputs,
293-
tasktype.BlockMessage,
294298
tasktype.ParsedMessage,
295299
tasktype.Receipt,
296300
tasktype.InternalMessage,
297301
tasktype.InternalParsedMessage,
298-
tasktype.MessageGasEconomy,
299302
tasktype.MultisigApproval,
300303
}
301304
proc, err := processor.MakeProcessors(nil, tasks)
302305
require.NoError(t, err)
303306
require.Len(t, proc.TipsetsProcessors, len(tasks))
304307

305-
require.Equal(t, message.NewTask(nil), proc.TipsetsProcessors[tasktype.Message])
306308
require.Equal(t, gasoutput.NewTask(nil), proc.TipsetsProcessors[tasktype.GasOutputs])
307-
require.Equal(t, blockmessage.NewTask(nil), proc.TipsetsProcessors[tasktype.BlockMessage])
308309
require.Equal(t, parsedmessage.NewTask(nil), proc.TipsetsProcessors[tasktype.ParsedMessage])
309310
require.Equal(t, receipt.NewTask(nil), proc.TipsetsProcessors[tasktype.Receipt])
310311
require.Equal(t, internalmessage.NewTask(nil), proc.TipsetsProcessors[tasktype.InternalMessage])
311312
require.Equal(t, internalparsedmessage.NewTask(nil), proc.TipsetsProcessors[tasktype.InternalParsedMessage])
312-
require.Equal(t, gaseconomy.NewTask(nil), proc.TipsetsProcessors[tasktype.MessageGasEconomy])
313313
require.Equal(t, msapprovals.NewTask(nil), proc.TipsetsProcessors[tasktype.MultisigApproval])
314314
}
315315

@@ -346,7 +346,7 @@ func TestMakeProcessorsAllTasks(t *testing.T) {
346346
proc, err := processor.MakeProcessors(nil, append(tasktype.AllTableTasks, processor.BuiltinTaskName))
347347
require.NoError(t, err)
348348
require.Len(t, proc.ActorProcessors, 21)
349-
require.Len(t, proc.TipsetProcessors, 5)
350-
require.Len(t, proc.TipsetsProcessors, 10)
349+
require.Len(t, proc.TipsetProcessors, 8)
350+
require.Len(t, proc.TipsetsProcessors, 7)
351351
require.Len(t, proc.ReportProcessors, 1)
352352
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ require (
6767
github.com/filecoin-project/specs-actors/v8 v8.0.1
6868
github.com/hibiken/asynq v0.23.0
6969
github.com/hibiken/asynq/x v0.0.0-20220413130846-5c723f597e01
70+
github.com/ipfs/go-ipld-format v0.4.0
7071
github.com/jedib0t/go-pretty/v6 v6.2.7
7172
go.opentelemetry.io/otel/trace v1.3.0
7273
go.uber.org/atomic v1.9.0
@@ -196,7 +197,6 @@ require (
196197
github.com/ipfs/go-ipfs-pq v0.0.2 // indirect
197198
github.com/ipfs/go-ipfs-routing v0.2.1 // indirect
198199
github.com/ipfs/go-ipfs-util v0.0.2 // indirect
199-
github.com/ipfs/go-ipld-format v0.4.0 // indirect
200200
github.com/ipfs/go-ipld-legacy v0.1.1 // indirect
201201
github.com/ipfs/go-ipns v0.1.2 // indirect
202202
github.com/ipfs/go-log v1.0.5 // indirect

0 commit comments

Comments
 (0)