Skip to content

Commit 21d0f99

Browse files
authored
feat: implement miner protocol survey task (#1048)
* feat: implement miner protocol survey task - closes #989 * polish: better logging * feedback: remove error and reachable cols, add env config * fix: race in tipset head - add logging for why miners fail - add logging for task duration
1 parent de20617 commit 21d0f99

File tree

6 files changed

+312
-16
lines changed

6 files changed

+312
-16
lines changed

commands/job/survey.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package job
22

33
import (
4-
"fmt"
54
"os"
65
"time"
76

@@ -10,7 +9,6 @@ import (
109

1110
"github.com/filecoin-project/lily/commands"
1211
"github.com/filecoin-project/lily/lens/lily"
13-
"github.com/filecoin-project/lily/network"
1412
)
1513

1614
var surveyFlags struct {
@@ -31,16 +29,6 @@ var SurveyCmd = &cli.Command{
3129
Destination: &surveyFlags.interval,
3230
},
3331
},
34-
Before: func(cctx *cli.Context) error {
35-
tasks := RunFlags.Tasks.Value()
36-
if len(tasks) != 1 {
37-
return fmt.Errorf("survey accepts single task type: '%s'", network.PeerAgentsTask)
38-
}
39-
if tasks[0] != network.PeerAgentsTask {
40-
return fmt.Errorf("unknown task: %s", tasks[0])
41-
}
42-
return nil
43-
},
4432
Action: func(cctx *cli.Context) error {
4533
ctx := lotuscli.ReqContext(cctx)
4634

lens/lily/impl.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/ipfs/go-cid"
2525
ipld "github.com/ipfs/go-ipld-format"
2626
logging "github.com/ipfs/go-log/v2"
27+
"github.com/libp2p/go-libp2p-core/host"
2728
"go.uber.org/fx"
2829

2930
"github.com/filecoin-project/lily/chain/datasource"
@@ -68,9 +69,8 @@ type LilyNodeAPI struct {
6869
actorStoreInit sync.Once
6970
}
7071

71-
type vmWrapper struct {
72-
vm vm.Interface
73-
st *state.StateTree
72+
func (m *LilyNodeAPI) Host() host.Host {
73+
return m.RawHost
7474
}
7575

7676
func (m *LilyNodeAPI) StartTipSetWorker(_ context.Context, cfg *LilyTipSetWorkerConfig) (*schedule.JobSubmitResult, error) {
@@ -668,6 +668,11 @@ func (m *LilyNodeAPI) TipSetMessageReceipts(ctx context.Context, ts, pts *types.
668668
return out, nil
669669
}
670670

671+
type vmWrapper struct {
672+
vm vm.Interface
673+
st *state.StateTree
674+
}
675+
671676
func (v *vmWrapper) ShouldBurn(ctx context.Context, msg *types.Message, errcode exitcode.ExitCode) (bool, error) {
672677
if lvmi, ok := v.vm.(*vm.LegacyVM); ok {
673678
return lvmi.ShouldBurn(ctx, v.st, msg, errcode)

model/surveyed/minerprotocols.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package observed
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"go.opencensus.io/tag"
8+
"go.opentelemetry.io/otel"
9+
"go.opentelemetry.io/otel/attribute"
10+
11+
"github.com/filecoin-project/lily/metrics"
12+
"github.com/filecoin-project/lily/model"
13+
)
14+
15+
type MinerProtocol struct {
16+
tableName struct{} `pg:"surveyed_miner_protocols"` // nolint: structcheck
17+
18+
// ObservedAt is the time the observation was made.
19+
ObservedAt time.Time `pg:",pk,notnull"`
20+
21+
// MinerID is the address of the miner observed.
22+
MinerID string `pg:",pk,notnull"`
23+
24+
// PeerID is the peerID of the miner observed.
25+
PeerID string
26+
27+
// Agent is the raw peer agent string of the miner.
28+
Agent string
29+
30+
// Protocols is the list of protocols supported by the miner.
31+
Protocols []string
32+
}
33+
34+
func (m *MinerProtocol) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error {
35+
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "surveyed_miner_protocols"))
36+
stop := metrics.Timer(ctx, metrics.PersistDuration)
37+
defer stop()
38+
39+
return s.PersistModel(ctx, m)
40+
}
41+
42+
type MinerProtocolList []*MinerProtocol
43+
44+
func (m MinerProtocolList) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error {
45+
if len(m) == 0 {
46+
return nil
47+
}
48+
ctx, span := otel.Tracer("").Start(ctx, "MinerProtocolList.Persist")
49+
if span.IsRecording() {
50+
span.SetAttributes(attribute.Int("count", len(m)))
51+
}
52+
defer span.End()
53+
54+
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "surveyed_miner_protocols"))
55+
stop := metrics.Timer(ctx, metrics.PersistDuration)
56+
defer stop()
57+
58+
return s.PersistModel(ctx, m)
59+
}

network/surveyer.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,19 @@ import (
1313

1414
"github.com/filecoin-project/lily/metrics"
1515
"github.com/filecoin-project/lily/model"
16+
"github.com/filecoin-project/lily/tasks/survey/minerprotocols"
1617
"github.com/filecoin-project/lily/tasks/survey/peeragents"
1718
)
1819

1920
const (
20-
PeerAgentsTask = "peeragents" // task that observes connected peer agents
21+
MinerProtocolsTask = "minerprotocols" // task that observes the supported protocols of miners on the Filecoin network.
22+
PeerAgentsTask = "peeragents" // task that observes connected peer agents
2123
)
2224

2325
var log = logging.Logger("lily/network")
2426

2527
type API interface {
28+
minerprotocols.API
2629
peeragents.API
2730
}
2831

@@ -42,6 +45,8 @@ func NewSurveyer(api API, storage model.Storage, interval time.Duration, name st
4245
switch task {
4346
case PeerAgentsTask:
4447
obs.tasks[PeerAgentsTask] = peeragents.NewTask(api)
48+
case MinerProtocolsTask:
49+
obs.tasks[MinerProtocolsTask] = minerprotocols.NewTask(api)
4550
default:
4651
return nil, fmt.Errorf("unknown task: %s", task)
4752
}

schemas/v1/9_miner_protocols.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package v1
2+
3+
// Schema patch 9 adds surveyed miner protocols and agents
4+
5+
func init() {
6+
patches.Register(
7+
9,
8+
`
9+
-- ----------------------------------------------------------------
10+
-- Name: surveyed_miner_protocols
11+
-- Model: surveyed.MinerProtocols
12+
-- Growth: N/A
13+
-- ----------------------------------------------------------------
14+
15+
CREATE TABLE {{ .SchemaName | default "public"}}.surveyed_miner_protocols (
16+
observed_at timestamp with time zone NOT NULL,
17+
miner_id text NOT NULL,
18+
peer_id text,
19+
agent text,
20+
protocols jsonb
21+
);
22+
ALTER TABLE ONLY {{ .SchemaName | default "public"}}.surveyed_miner_protocols ADD CONSTRAINT surveyed_miner_protocols_pkey PRIMARY KEY (observed_at, miner_id);
23+
24+
COMMENT ON TABLE {{ .SchemaName | default "public"}}.surveyed_miner_protocols IS 'Observations of Filecoin storage provider supported protocols and agents over time.';
25+
COMMENT ON COLUMN {{ .SchemaName | default "public"}}.surveyed_miner_protocols.observed_at IS 'Timestamp of the observation.';
26+
COMMENT ON COLUMN {{ .SchemaName | default "public"}}.surveyed_miner_protocols.miner_id IS 'Address (ActorID) of the miner.';
27+
COMMENT ON COLUMN {{ .SchemaName | default "public"}}.surveyed_miner_protocols.peer_id IS 'PeerID of the miner advertised in on-chain MinerInfo structure.';
28+
COMMENT ON COLUMN {{ .SchemaName | default "public"}}.surveyed_miner_protocols.agent IS 'Agent string as reported by the peer.';
29+
COMMENT ON COLUMN {{ .SchemaName | default "public"}}.surveyed_miner_protocols.protocols IS 'List of supported protocol strings supported by the peer.';
30+
`)
31+
}
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
package minerprotocols
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"strconv"
8+
"time"
9+
10+
"github.com/filecoin-project/go-address"
11+
lapi "github.com/filecoin-project/lotus/api"
12+
"github.com/filecoin-project/lotus/chain/types"
13+
"github.com/gammazero/workerpool"
14+
logging "github.com/ipfs/go-log/v2"
15+
"github.com/libp2p/go-libp2p-core/host"
16+
"github.com/libp2p/go-libp2p-core/peer"
17+
"github.com/multiformats/go-multiaddr"
18+
19+
"github.com/filecoin-project/lily/model"
20+
observed "github.com/filecoin-project/lily/model/surveyed"
21+
)
22+
23+
var log = logging.Logger("lily/tasks/minerproto")
24+
25+
var (
26+
resultBufferEnv = "LILY_SURVEY_MINER_PROTOCOL_BUFFER"
27+
resultBufferSize = 50
28+
29+
workerPoolSizeEnv = "LILY_SURVEY_MINER_PROTOCOL_WORKERS"
30+
workerPoolSize = 50
31+
32+
fetchTimeoutEnv = "LILY_SURVEY_MINER_PROTOCOL_TIMEOUT_SECONDS"
33+
fetchTimeout = 30
34+
)
35+
36+
func init() {
37+
if s := os.Getenv(resultBufferEnv); s != "" {
38+
v, err := strconv.ParseInt(s, 10, 64)
39+
if err == nil {
40+
resultBufferSize = int(v)
41+
}
42+
}
43+
if s := os.Getenv(workerPoolSizeEnv); s != "" {
44+
v, err := strconv.ParseInt(s, 10, 64)
45+
if err == nil {
46+
workerPoolSize = int(v)
47+
}
48+
}
49+
if s := os.Getenv(fetchTimeoutEnv); s != "" {
50+
v, err := strconv.ParseInt(s, 10, 64)
51+
if err == nil {
52+
fetchTimeout = int(v)
53+
}
54+
}
55+
}
56+
57+
type API interface {
58+
Host() host.Host
59+
ChainHead(context.Context) (*types.TipSet, error)
60+
StateMinerInfo(ctx context.Context, addr address.Address, tsk types.TipSetKey) (lapi.MinerInfo, error)
61+
StateListMiners(ctx context.Context, tsk types.TipSetKey) ([]address.Address, error)
62+
StateMinerPower(context.Context, address.Address, types.TipSetKey) (*lapi.MinerPower, error)
63+
}
64+
65+
func NewTask(api API) *Task {
66+
return &Task{api: api}
67+
}
68+
69+
type Task struct {
70+
api API
71+
}
72+
73+
func (t *Task) Process(ctx context.Context) (model.Persistable, error) {
74+
headTs, err := t.api.ChainHead(ctx)
75+
if err != nil {
76+
return nil, fmt.Errorf("getting chain head: %w", err)
77+
}
78+
79+
miners, err := t.api.StateListMiners(ctx, headTs.Key())
80+
if err != nil {
81+
return nil, fmt.Errorf("listing miners: %w", err)
82+
}
83+
84+
queriedCount := uint64(0)
85+
start := time.Now()
86+
out := make(observed.MinerProtocolList, 0, len(miners))
87+
results := make(chan *observed.MinerProtocol, resultBufferSize)
88+
pool := workerpool.New(workerPoolSize)
89+
90+
for _, miner := range miners {
91+
select {
92+
case <-ctx.Done():
93+
pool.Stop()
94+
return nil, ctx.Err()
95+
default:
96+
}
97+
miner := miner
98+
99+
mpower, err := t.api.StateMinerPower(ctx, miner, headTs.Key())
100+
if err != nil {
101+
return nil, fmt.Errorf("getting miner %s power: %w", miner, err)
102+
}
103+
// don't process miners without min power
104+
if !mpower.HasMinPower {
105+
continue
106+
}
107+
108+
// find the miner, if DNE abort as this indicates an error in the API as a miner was returned from StateListMiners that DNE in state tree.
109+
minerInfo, err := t.api.StateMinerInfo(ctx, miner, headTs.Key())
110+
if err != nil {
111+
return nil, fmt.Errorf("getting miner %s info: %w", miner, err)
112+
}
113+
114+
// don't process miners without multiaddresses set
115+
if len(minerInfo.Multiaddrs) == 0 {
116+
continue
117+
}
118+
119+
queriedCount++
120+
log.Debugw("fetching miner protocol info", "miner", miner, "count", queriedCount)
121+
pool.Submit(func() {
122+
fetchCtx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(fetchTimeout))
123+
defer cancel()
124+
fetchMinerProtocolModel(fetchCtx, t.api, miner, minerInfo, start, results)
125+
})
126+
}
127+
128+
// wait for all workers to complete then close the results channel
129+
go func() {
130+
pool.StopWait()
131+
close(results)
132+
}()
133+
134+
// drain results until closed.
135+
for res := range results {
136+
log.Debugw("miner protocol result received", "miner", res.MinerID, "count", len(out))
137+
out = append(out, res)
138+
}
139+
log.Infow("miner protocol survey complete", "duration", time.Since(start), "results", len(out), "queried", queriedCount)
140+
return out, nil
141+
}
142+
143+
func (t *Task) Close() error {
144+
return nil
145+
}
146+
147+
func fetchMinerProtocolModel(ctx context.Context, api API, addr address.Address, minerInfo lapi.MinerInfo, start time.Time, results chan *observed.MinerProtocol) {
148+
// since miners may choose if their peerID is set in their info
149+
var peerID string
150+
if minerInfo.PeerId != nil {
151+
peerID = minerInfo.PeerId.String()
152+
}
153+
154+
// extract any multiaddresses the miner has set in their info, they may have none bail if that is the case.
155+
minerPeerInfo, err := getMinerAddrInfo(minerInfo)
156+
if err != nil {
157+
log.Debugw("failed getting miner address info", "miner", addr, "error", err)
158+
return
159+
}
160+
161+
// attempt to connect to miner
162+
if err := api.Host().Connect(ctx, *minerPeerInfo); err != nil {
163+
log.Debugw("failed connecting to miner", "miner", addr, "error", err)
164+
return
165+
}
166+
167+
// get protocols supported by miner
168+
protos, err := api.Host().Peerstore().GetProtocols(minerPeerInfo.ID)
169+
if err != nil {
170+
log.Debugw("failed getting miner protocols", "miner", addr, "error", err)
171+
return
172+
}
173+
174+
// find miners agent version
175+
agentVersionI, err := api.Host().Peerstore().Get(minerPeerInfo.ID, "AgentVersion")
176+
if err != nil {
177+
log.Debugw("failed getting miner agent", "miner", addr, "error", err)
178+
return
179+
}
180+
181+
// create the model we will export to storage
182+
results <- &observed.MinerProtocol{
183+
ObservedAt: start,
184+
MinerID: addr.String(),
185+
PeerID: peerID,
186+
Agent: agentVersionI.(string),
187+
Protocols: protos,
188+
}
189+
190+
}
191+
192+
func getMinerAddrInfo(info lapi.MinerInfo) (*peer.AddrInfo, error) {
193+
var maddrs []multiaddr.Multiaddr
194+
for _, m := range info.Multiaddrs {
195+
ma, err := multiaddr.NewMultiaddrBytes(m)
196+
if err != nil {
197+
return nil, fmt.Errorf("miner had invalid multiaddrs in their info: %w", err)
198+
}
199+
maddrs = append(maddrs, ma)
200+
}
201+
if len(maddrs) == 0 {
202+
return nil, fmt.Errorf("miner has no multiaddrs set on-chain")
203+
}
204+
return &peer.AddrInfo{
205+
ID: *info.PeerId,
206+
Addrs: maddrs,
207+
}, nil
208+
}

0 commit comments

Comments
 (0)