Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions chain/datasource/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ type DataSource struct {
diffPreCommitGroup singleflight.Group
}

func (t *DataSource) ChainReadObj(ctx context.Context, obj cid.Cid) ([]byte, error) {
return t.node.ChainReadObj(ctx, obj)
}

func (t *DataSource) ComputeBaseFee(ctx context.Context, ts *types.TipSet) (abi.TokenAmount, error) {
return t.node.ComputeBaseFee(ctx, ts)
}
Expand Down
174 changes: 174 additions & 0 deletions chain/indexer/feeder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package indexer

/*

const BitWidth = 8

type Feeder struct {
Strg model.Storage
}

func (f *Feeder) Index(ctx context.Context, path string) error {
fi, err := os.Open(path)
if err != nil {
return err
}
defer fi.Close()
start := time.Now()
mr, err := cborable.NewModelReader(ctx, fi)
if err != nil {
return err
}

for _, state := range mr.States() {
tipset := state.Current
parent := state.Executed

tasks, err := mr.ModelMetasForTipSet(tipset.Key())
if err != nil {
return err
}

transformer, consumer, err := f.startRouters(ctx, tasks,
[]transform.Handler{
raw.NewActorTransform(),
raw.NewActorStateTransform(),

miner.NewSectorInfoTransform(),
miner.NewPrecommitEventTransformer(),
miner.NewSectorEventTransformer(),
miner.NewSectorDealsTransformer(),
miner.NewPrecommitInfoTransformer(),

market.NewDealProposalTransformer(),

message.NewVMMessageTransform(),
message.NewMessageTransform(),
message.NewParsedMessageTransform(),
message.NewBlockMessageTransform(),
message.NewGasOutputTransform(),
economics.NewGasEconomyTransform(),
message.NewReceiptTransform(),

block.NewBlockHeaderTransform(),
block.NewBlockParentsTransform(),
block.NewDrandBlockEntryTransform(),
}, []load.Handler{
&persistable.PersistableResultConsumer{Strg: f.Strg},
})

// TODO handle the error case here, remove the panic in the goroutine
go func() {
for _, task := range tasks {
data, err := mr.GetModels(tipset.Key(), task)
if err != nil {
log.Errorw("getting models", "error", err)
panic(err)
}
if err := transformer.Route(ctx, &resultImpl{
task: task,
current: tipset,
executed: parent,
complete: true,
Task: task,
Error: nil,
Data: data,
StartedAt: time.Now(),
Duration: 0,
},
}); err != nil {
log.Errorw("routing models", "error", err)
panic(err)
}
}
if err := transformer.Stop(); err != nil {
log.Errorw("stopping transformer", "error", err)
}
}()

for res := range transformer.Results() {
if err := consumer.Route(ctx, res); err != nil {
return err
}
}
if err := consumer.Stop(); err != nil {
return err
}

}
log.Infow("index complete", "duration", time.Since(start))
return nil
}

type resultImpl struct {
task v2.ModelMeta
current *types.TipSet
executed *types.TipSet
complete bool
result *extract.StateResult
}

func (r *resultImpl) Error() error {
//TODO implement me
panic("implement me")
}

func (r *resultImpl) ExtractionState() interface{} {
//TODO implement me
panic("implement me")
}

func (r *resultImpl) Models() []v2.LilyModel {
//TODO implement me
panic("implement me")
}

func (r *resultImpl) Task() v2.ModelMeta {
return r.task
}

func (r *resultImpl) Current() *types.TipSet {
return r.current
}

func (r *resultImpl) Executed() *types.TipSet {
return r.executed
}

func (r *resultImpl) Complete() bool {
return r.complete
}

func (r *resultImpl) State() *extract.StateResult {
return r.result
}

type Transformer interface {
Route(ctx context.Context, data transform.IndexState) error
Results() chan transform.Result
Stop() error
}

type Loader interface {
Route(ctx context.Context, data transform.Result) error
Stop() error
}

func (f *Feeder) startRouters(ctx context.Context, tasks []v2.ModelMeta, handlers []transform.Handler, consumers []load.Handler) (Transformer, Loader, error) {
tr, err := transform.NewRouter(tasks, handlers...)
if err != nil {
return nil, nil, err
}
tr.Start(ctx)

lr, err := load.NewRouter(consumers...)
if err != nil {
return nil, nil, err
}
lr.Start(ctx)

return tr, lr, nil
}


*/
3 changes: 2 additions & 1 deletion chain/indexer/integrated/processor/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (sp *StateProcessor) State(ctx context.Context, current, executed *types.Ti
ctx, span := otel.Tracer("").Start(ctx, "StateProcessor.State")

num := len(sp.tipsetProcessors) + len(sp.actorProcessors) + len(sp.tipsetsProcessors) + len(sp.builtinProcessors)
results := make(chan *Result, num)
results := make(chan *Result, 1)
taskNames := make([]string, 0, num)

taskNames = append(taskNames, sp.startReport(ctx, current, results)...)
Expand Down Expand Up @@ -400,6 +400,7 @@ func MakeProcessors(api tasks.DataSource, indexerTasks []string) (*IndexerProces
ActorProcessors: make(map[string]ActorProcessor),
ReportProcessors: make(map[string]ReportProcessor),
}

for _, t := range indexerTasks {
switch t {
//
Expand Down
3 changes: 3 additions & 0 deletions chain/indexer/integrated/tipset/tipset.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/filecoin-project/lily/chain/indexer/tasktype"
"github.com/filecoin-project/lily/metrics"
"github.com/filecoin-project/lily/model"
v2 "github.com/filecoin-project/lily/model/v2"
visormodel "github.com/filecoin-project/lily/model/visor"
taskapi "github.com/filecoin-project/lily/tasks"
)
Expand Down Expand Up @@ -55,6 +56,8 @@ type Result struct {
Data model.Persistable
// Report containing details of task execution success and duration.
Report visormodel.ProcessingReportList

CborData []v2.LilyModel
}

// TipSet keeps no internal state and asynchronously indexes `ts` returning Result's as they extracted.
Expand Down
4 changes: 2 additions & 2 deletions chain/indexer/tasktype/table_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var AllTableTasks = []string{
DrandBlockEntrie,
MinerSectorDeal,
MinerSectorInfoV7,
MinerSectorInfoV1_6,
//MinerSectorInfoV1_6,
MinerSectorPost,
MinerPreCommitInfo,
MinerSectorEvent,
Expand Down Expand Up @@ -74,7 +74,7 @@ var AllTableTasks = []string{
GasOutputs,
ChainEconomics,
ChainConsensus,
MultisigApproval,
//MultisigApproval,
VerifiedRegistryVerifier,
VerifiedRegistryVerifiedClient,
}
Expand Down
33 changes: 33 additions & 0 deletions chain/indexer/v2/bus/bus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package bus

import (
evntbus "github.com/mustafaturan/bus/v3"
"github.com/mustafaturan/monoton/v2"
"github.com/mustafaturan/monoton/v2/sequencer"
)

func NewBus() (*Bus, error) {
// configure id generator (it doesn't have to be monoton)
node := uint64(1)
initialTime := uint64(1577865600000) // set 2020-01-01 PST as initial time
m, err := monoton.New(sequencer.NewMillisecond(), node, initialTime)
if err != nil {
return nil, err
}

// init an id generator
var idGenerator evntbus.Next = m.Next

// create a new bus instance
b, err := evntbus.NewBus(idGenerator)
if err != nil {
return nil, err
}
return &Bus{
Bus: b,
}, nil
}

type Bus struct {
Bus *evntbus.Bus
}
Loading