Skip to content

Commit dcc5517

Browse files
frristiand
andauthored
Implement Visor Vector Builder & Executor (#370)
Co-authored-by: Ian Davis <[email protected]>
1 parent ed757da commit dcc5517

26 files changed

+1494
-9
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,4 @@ visor
1919
sentinel-visor
2020

2121
build/.*
22+
vector/data/*.json

Makefile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ build/.update-modules:
3131

3232
.PHONY: deps
3333
deps: build/.update-modules
34+
cd ./vector; ./fetch_vectors.sh
3435

3536
# test starts dependencies and runs all tests
3637
.PHONY: test
@@ -77,6 +78,7 @@ docker-image:
7778

7879
clean:
7980
rm -rf $(CLEAN) $(BINS)
81+
rm ./vector/data/*json
8082
.PHONY: clean
8183

8284
dist-clean:

chain/actor.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,10 @@ type ActorExtractorMap interface {
193193
GetExtractor(code cid.Cid) (actorstate.ActorStateExtractor, bool)
194194
}
195195

196+
type ActorExtractorFilter interface {
197+
AllowAddress(addr string) bool
198+
}
199+
196200
// A RawActorExtractorMap extracts all types of actors using basic actor extraction which only parses shallow state.
197201
type RawActorExtractorMap struct{}
198202

chain/filter.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package chain
2+
3+
func NewAddressFilter(addr string) *AddressFilter {
4+
return &AddressFilter{address: addr}
5+
}
6+
7+
type AddressFilter struct {
8+
address string
9+
}
10+
11+
func (f *AddressFilter) Allow(addr string) bool {
12+
return f.address == addr
13+
}

chain/indexer.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,22 @@ type TipSetIndexer struct {
5353
node lens.API
5454
opener lens.APIOpener
5555
closer lens.APICloser
56+
addressFilter *AddressFilter
57+
}
58+
59+
type TipSetIndexerOpt func(t *TipSetIndexer)
60+
61+
func AddressFilterOpt(f *AddressFilter) TipSetIndexerOpt {
62+
return func(t *TipSetIndexer) {
63+
t.addressFilter = f
64+
}
5665
}
5766

5867
// A TipSetIndexer extracts block, message and actor state data from a tipset and persists it to storage. Extraction
5968
// and persistence are concurrent. Extraction of the a tipset can proceed while data from the previous extraction is
6069
// being persisted. The indexer may be given a time window in which to complete data extraction. The name of the
6170
// indexer is used as the reporter in the visor_processing_reports table.
62-
func NewTipSetIndexer(o lens.APIOpener, d model.Storage, window time.Duration, name string, tasks []string) (*TipSetIndexer, error) {
71+
func NewTipSetIndexer(o lens.APIOpener, d model.Storage, window time.Duration, name string, tasks []string, options ...TipSetIndexerOpt) (*TipSetIndexer, error) {
6372
tsi := &TipSetIndexer{
6473
storage: d,
6574
window: window,
@@ -123,6 +132,11 @@ func NewTipSetIndexer(o lens.APIOpener, d model.Storage, window time.Duration, n
123132
return nil, xerrors.Errorf("unknown task: %s", task)
124133
}
125134
}
135+
136+
for _, opt := range options {
137+
opt(tsi)
138+
}
139+
126140
return tsi, nil
127141
}
128142

@@ -223,6 +237,13 @@ func (t *TipSetIndexer) TipSet(ctx context.Context, ts *types.TipSet) error {
223237
if len(t.actorProcessors) > 0 {
224238
changes, err := t.node.StateChangedActors(tctx, parent.ParentState(), child.ParentState())
225239
if err == nil {
240+
if t.addressFilter != nil {
241+
for addr := range changes {
242+
if !t.addressFilter.Allow(addr) {
243+
delete(changes, addr)
244+
}
245+
}
246+
}
226247
for name, p := range t.actorProcessors {
227248
inFlight++
228249
go t.runActorProcessor(tctx, p, name, child, parent, changes, results)

commands/vector.go

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package commands
2+
3+
import (
4+
"context"
5+
"os"
6+
"os/signal"
7+
"strings"
8+
"syscall"
9+
10+
"github.com/urfave/cli/v2"
11+
"golang.org/x/xerrors"
12+
13+
"github.com/filecoin-project/sentinel-visor/chain"
14+
"github.com/filecoin-project/sentinel-visor/vector"
15+
)
16+
17+
var Vector = &cli.Command{
18+
Name: "vector",
19+
Usage: "Vector tooling for Visor.",
20+
Subcommands: []*cli.Command{
21+
BuildVector,
22+
ExecuteVector,
23+
},
24+
}
25+
26+
var BuildVector = &cli.Command{
27+
Name: "build",
28+
Usage: "Create a vector.",
29+
Flags: []cli.Flag{
30+
&cli.Int64Flag{
31+
Name: "from",
32+
Usage: "Limit actor and message processing to tipsets at or above `HEIGHT`",
33+
EnvVars: []string{"VISOR_HEIGHT_FROM"},
34+
},
35+
&cli.Int64Flag{
36+
Name: "to",
37+
Usage: "Limit actor and message processing to tipsets at or below `HEIGHT`",
38+
Value: estimateCurrentEpoch(),
39+
DefaultText: "current epoch",
40+
EnvVars: []string{"VISOR_HEIGHT_TO"},
41+
},
42+
&cli.StringFlag{
43+
Name: "tasks",
44+
Usage: "Comma separated list of tasks to build. Each task is reported separately in the database.",
45+
Value: strings.Join([]string{chain.BlocksTask}, ","),
46+
EnvVars: []string{"VISOR_VECTOR_TASKS"},
47+
},
48+
&cli.StringFlag{
49+
Name: "actor-address",
50+
Usage: "Address of an actor.",
51+
},
52+
&cli.StringFlag{
53+
Name: "vector-file",
54+
Usage: "Path of vector file.",
55+
Required: true,
56+
},
57+
&cli.StringFlag{
58+
Name: "vector-desc",
59+
Usage: "Short description of the test vector.",
60+
Required: true,
61+
},
62+
},
63+
Action: build,
64+
}
65+
66+
func build(cctx *cli.Context) error {
67+
// Set up a context that is canceled when the command is interrupted
68+
ctx, cancel := context.WithCancel(cctx.Context)
69+
70+
// Set up a signal handler to cancel the context
71+
go func() {
72+
interrupt := make(chan os.Signal, 1)
73+
signal.Notify(interrupt, syscall.SIGTERM, syscall.SIGINT)
74+
select {
75+
case <-interrupt:
76+
cancel()
77+
case <-ctx.Done():
78+
}
79+
}()
80+
81+
if err := setupLogging(cctx); err != nil {
82+
return xerrors.Errorf("setup logging: %w", err)
83+
}
84+
85+
builder, err := vector.NewBuilder(cctx)
86+
if err != nil {
87+
return err
88+
}
89+
90+
schema, err := builder.Build(ctx)
91+
if err != nil {
92+
return err
93+
}
94+
95+
return schema.Persist(cctx.String("vector-file"))
96+
}
97+
98+
var ExecuteVector = &cli.Command{
99+
Name: "execute",
100+
Usage: "execute a test vector",
101+
Flags: []cli.Flag{
102+
&cli.StringFlag{
103+
Name: "vector-file",
104+
Usage: "Path to vector file.",
105+
Required: true,
106+
},
107+
},
108+
Action: execute,
109+
}
110+
111+
func execute(cctx *cli.Context) error {
112+
// Set up a context that is canceled when the command is interrupted
113+
ctx, cancel := context.WithCancel(cctx.Context)
114+
115+
// Set up a signal handler to cancel the context
116+
go func() {
117+
interrupt := make(chan os.Signal, 1)
118+
signal.Notify(interrupt, syscall.SIGTERM, syscall.SIGINT)
119+
select {
120+
case <-interrupt:
121+
cancel()
122+
case <-ctx.Done():
123+
}
124+
}()
125+
126+
if err := setupLogging(cctx); err != nil {
127+
return xerrors.Errorf("setup logging: %w", err)
128+
}
129+
runner, err := vector.NewRunner(ctx, cctx.String("vector-file"), 0)
130+
if err != nil {
131+
return err
132+
}
133+
134+
err = runner.Run(ctx)
135+
if err != nil {
136+
return err
137+
}
138+
139+
return runner.Validate(ctx)
140+
}

go.mod

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,18 @@ require (
2020
github.com/go-pg/migrations/v8 v8.0.1
2121
github.com/go-pg/pg/v10 v10.3.1
2222
github.com/go-pg/pgext v0.1.4
23+
github.com/google/go-cmp v0.5.2
2324
github.com/hashicorp/golang-lru v0.5.4
2425
github.com/ipfs/go-block-format v0.0.2
26+
github.com/ipfs/go-blockservice v0.1.4
2527
github.com/ipfs/go-cid v0.0.7
2628
github.com/ipfs/go-datastore v0.4.5
29+
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
2730
github.com/ipfs/go-ipld-cbor v0.0.5
31+
github.com/ipfs/go-ipld-format v0.2.0
2832
github.com/ipfs/go-log/v2 v2.1.2-0.20200626104915-0016c0b4b3e4
33+
github.com/ipfs/go-merkledag v0.3.2
34+
github.com/ipld/go-car v0.1.1-0.20201119040415-11b6074b6d4d
2935
github.com/ipld/go-ipld-prime v0.5.1-0.20201021195245-109253e8a018
3036
github.com/jackc/pgx/v4 v4.9.0
3137
github.com/lib/pq v1.8.0

go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1572,6 +1572,7 @@ github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/
15721572
github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
15731573
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
15741574
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
1575+
github.com/urfave/cli v1.22.1 h1:+mkCCcOFKPnCmVYVcURKps1Xe+3zP90gSYGNfRkjoIY=
15751576
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
15761577
github.com/urfave/cli v1.22.2 h1:gsqYFH8bb9ekPA12kRo0hfjngWQjkJPlN9R0N78BoUo=
15771578
github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=

lens/carrepo/carrepo.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,5 @@ func NewAPIOpener(c *cli.Context) (lens.APIOpener, lens.APICloser, error) {
3333
return &tsk, nil
3434
}
3535

36-
return util.NewAPIOpener(c, cacheDB, h)
36+
return util.NewAPIOpener(c.Context, cacheDB, h, c.Int("lens-cache-hint"))
3737
}

lens/interface.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package lens
22

33
import (
44
"context"
5+
"github.com/filecoin-project/lotus/node/modules/dtypes"
56

67
"github.com/filecoin-project/go-address"
78
"github.com/filecoin-project/go-bitfield"
@@ -55,6 +56,7 @@ type StateAPI interface {
5556
StateReadState(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*api.ActorState, error)
5657
StateGetReceipt(ctx context.Context, bcid cid.Cid, tsk types.TipSetKey) (*types.MessageReceipt, error)
5758
StateVMCirculatingSupplyInternal(context.Context, types.TipSetKey) (api.CirculatingSupply, error)
59+
StateNetworkName(context.Context) (dtypes.NetworkName, error)
5860
}
5961

6062
type APICloser func()

0 commit comments

Comments
 (0)