@@ -12,6 +12,7 @@ import (
12
12
13
13
"github.com/stretchr/testify/require"
14
14
15
+ "github.com/filecoin-project/go-address"
15
16
"github.com/filecoin-project/go-bitfield"
16
17
"github.com/filecoin-project/go-jsonrpc"
17
18
"github.com/filecoin-project/go-state-types/abi"
@@ -20,6 +21,7 @@ import (
20
21
"github.com/filecoin-project/go-state-types/dline"
21
22
22
23
"github.com/filecoin-project/lotus/api"
24
+ "github.com/filecoin-project/lotus/api/v1api"
23
25
"github.com/filecoin-project/lotus/chain/types"
24
26
"github.com/filecoin-project/lotus/miner"
25
27
)
@@ -29,11 +31,13 @@ type BlockMiner struct {
29
31
t * testing.T
30
32
miner * TestMiner
31
33
32
- nextNulls int64
33
- pause chan struct {}
34
- unpause chan struct {}
35
- wg sync.WaitGroup
36
- cancel context.CancelFunc
34
+ nextNulls int64
35
+ postWatchMiners []address.Address
36
+ postWatchMinersLk sync.Mutex
37
+ pause chan struct {}
38
+ unpause chan struct {}
39
+ wg sync.WaitGroup
40
+ cancel context.CancelFunc
37
41
}
38
42
39
43
func NewBlockMiner (t * testing.T , miner * TestMiner ) * BlockMiner {
@@ -46,19 +50,58 @@ func NewBlockMiner(t *testing.T, miner *TestMiner) *BlockMiner {
46
50
}
47
51
}
48
52
53
+ type minerDeadline struct {
54
+ addr address.Address
55
+ deadline dline.Info
56
+ }
57
+
58
+ type minerDeadlines []minerDeadline
59
+
60
+ func (mds minerDeadlines ) CloseList () []abi.ChainEpoch {
61
+ var ret []abi.ChainEpoch
62
+ for _ , md := range mds {
63
+ ret = append (ret , md .deadline .Last ())
64
+ }
65
+ return ret
66
+ }
67
+
68
+ func (mds minerDeadlines ) MinerStringList () []string {
69
+ var ret []string
70
+ for _ , md := range mds {
71
+ ret = append (ret , md .addr .String ())
72
+ }
73
+ return ret
74
+ }
75
+
76
+ // FilterByLast returns a new minerDeadlines with only the deadlines that have a Last() epoch
77
+ // greater than or equal to last.
78
+ func (mds minerDeadlines ) FilterByLast (last abi.ChainEpoch ) minerDeadlines {
79
+ var ret minerDeadlines
80
+ for _ , md := range mds {
81
+ if last >= md .deadline .Last () {
82
+ ret = append (ret , md )
83
+ }
84
+ }
85
+ return ret
86
+ }
87
+
49
88
type partitionTracker struct {
89
+ minerAddr address.Address
50
90
partitions []api.Partition
51
91
posted bitfield.BitField
52
92
}
53
93
54
- func newPartitionTracker (ctx context.Context , dlIdx uint64 , bm * BlockMiner ) * partitionTracker {
55
- dlines , err := bm .miner .FullNode .StateMinerDeadlines (ctx , bm .miner .ActorAddr , types .EmptyTSK )
56
- require .NoError (bm .t , err )
94
+ // newPartitionTracker creates a new partitionTracker that tracks the deadline index dlIdx for the
95
+ // given minerAddr. It uses the BlockMiner bm to interact with the chain.
96
+ func newPartitionTracker (ctx context.Context , t * testing.T , client v1api.FullNode , minerAddr address.Address , dlIdx uint64 ) * partitionTracker {
97
+ dlines , err := client .StateMinerDeadlines (ctx , minerAddr , types .EmptyTSK )
98
+ require .NoError (t , err )
57
99
dl := dlines [dlIdx ]
58
100
59
- parts , err := bm . miner . FullNode . StateMinerPartitions (ctx , bm . miner . ActorAddr , dlIdx , types .EmptyTSK )
60
- require .NoError (bm . t , err )
101
+ parts , err := client . StateMinerPartitions (ctx , minerAddr , dlIdx , types .EmptyTSK )
102
+ require .NoError (t , err )
61
103
return & partitionTracker {
104
+ minerAddr : minerAddr ,
62
105
partitions : parts ,
63
106
posted : dl .PostSubmissions ,
64
107
}
@@ -74,11 +117,11 @@ func (p *partitionTracker) done(t *testing.T) bool {
74
117
return uint64 (len (p .partitions )) == p .count (t )
75
118
}
76
119
77
- func (p * partitionTracker ) recordIfPost (t * testing.T , bm * BlockMiner , msg * types.Message ) (ret bool ) {
120
+ func (p * partitionTracker ) recordIfPost (t * testing.T , msg * types.Message ) (ret bool ) {
78
121
defer func () {
79
122
ret = p .done (t )
80
123
}()
81
- if ! (msg .To == bm . miner . ActorAddr ) {
124
+ if ! (msg .To == p . minerAddr ) {
82
125
return
83
126
}
84
127
if msg .Method != builtin .MethodsMiner .SubmitWindowedPoSt {
@@ -92,19 +135,18 @@ func (p *partitionTracker) recordIfPost(t *testing.T, bm *BlockMiner, msg *types
92
135
return
93
136
}
94
137
95
- func (bm * BlockMiner ) forcePoSt (ctx context.Context , ts * types.TipSet , dlinfo * dline.Info ) {
96
-
97
- tracker := newPartitionTracker (ctx , dlinfo .Index , bm )
138
+ func (bm * BlockMiner ) forcePoSt (ctx context.Context , ts * types.TipSet , minerAddr address.Address , dlinfo dline.Info ) {
139
+ tracker := newPartitionTracker (ctx , bm .t , bm .miner .FullNode , minerAddr , dlinfo .Index )
98
140
if ! tracker .done (bm .t ) { // need to wait for post
99
141
bm .t .Logf ("expect %d partitions proved but only see %d" , len (tracker .partitions ), tracker .count (bm .t ))
100
- poolEvts , err := bm .miner .FullNode .MpoolSub (ctx ) //subscribe before checking pending so we don't miss any events
142
+ poolEvts , err := bm .miner .FullNode .MpoolSub (ctx ) // subscribe before checking pending so we don't miss any events
101
143
require .NoError (bm .t , err )
102
144
103
145
// First check pending messages we'll mine this epoch
104
146
msgs , err := bm .miner .FullNode .MpoolPending (ctx , types .EmptyTSK )
105
147
require .NoError (bm .t , err )
106
148
for _ , msg := range msgs {
107
- if tracker .recordIfPost (bm .t , bm , & msg .Message ) {
149
+ if tracker .recordIfPost (bm .t , & msg .Message ) {
108
150
fmt .Printf ("found post in mempool pending\n " )
109
151
}
110
152
}
@@ -114,13 +156,13 @@ func (bm *BlockMiner) forcePoSt(ctx context.Context, ts *types.TipSet, dlinfo *d
114
156
msgs , err := bm .miner .FullNode .ChainGetBlockMessages (ctx , bc )
115
157
require .NoError (bm .t , err )
116
158
for _ , msg := range msgs .BlsMessages {
117
- if tracker .recordIfPost (bm .t , bm , msg ) {
159
+ if tracker .recordIfPost (bm .t , msg ) {
118
160
fmt .Printf ("found post in message of prev tipset\n " )
119
161
}
120
162
121
163
}
122
164
for _ , msg := range msgs .SecpkMessages {
123
- if tracker .recordIfPost (bm .t , bm , & msg .Message ) {
165
+ if tracker .recordIfPost (bm .t , & msg .Message ) {
124
166
fmt .Printf ("found post in message of prev tipset\n " )
125
167
}
126
168
}
@@ -139,7 +181,7 @@ func (bm *BlockMiner) forcePoSt(ctx context.Context, ts *types.TipSet, dlinfo *d
139
181
bm .t .Logf ("pool event: %d" , evt .Type )
140
182
if evt .Type == api .MpoolAdd {
141
183
bm .t .Logf ("incoming message %v" , evt .Message )
142
- if tracker .recordIfPost (bm .t , bm , & evt .Message .Message ) {
184
+ if tracker .recordIfPost (bm .t , & evt .Message .Message ) {
143
185
fmt .Printf ("found post in mempool evt\n " )
144
186
break POOL
145
187
}
@@ -151,11 +193,24 @@ func (bm *BlockMiner) forcePoSt(ctx context.Context, ts *types.TipSet, dlinfo *d
151
193
}
152
194
}
153
195
196
+ // WatchMinerForPost adds a miner to the list of miners that the BlockMiner will watch for window
197
+ // post submissions when using MineBlocksMustPost. This is useful when we have more than just the
198
+ // BlockMiner submitting posts, particularly in the case of UnmanagedMiners which don't participate
199
+ // in block mining.
200
+ func (bm * BlockMiner ) WatchMinerForPost (minerAddr address.Address ) {
201
+ bm .postWatchMinersLk .Lock ()
202
+ bm .postWatchMiners = append (bm .postWatchMiners , minerAddr )
203
+ bm .postWatchMinersLk .Unlock ()
204
+ }
205
+
154
206
// Like MineBlocks but refuses to mine until the window post scheduler has wdpost messages in the mempool
155
207
// and everything shuts down if a post fails. It also enforces that every block mined succeeds
156
208
func (bm * BlockMiner ) MineBlocksMustPost (ctx context.Context , blocktime time.Duration ) {
157
209
time .Sleep (time .Second )
158
210
211
+ // watch for our own window posts
212
+ bm .WatchMinerForPost (bm .miner .ActorAddr )
213
+
159
214
// wrap context in a cancellable context.
160
215
ctx , bm .cancel = context .WithCancel (ctx )
161
216
bm .wg .Add (1 )
@@ -182,11 +237,25 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur
182
237
ts , err := bm .miner .FullNode .ChainHead (ctx )
183
238
require .NoError (bm .t , err )
184
239
185
- dlinfo , err := bm .miner .FullNode .StateMinerProvingDeadline (ctx , bm .miner .ActorAddr , ts .Key ())
186
- require .NoError (bm .t , err )
187
- if ts .Height ()+ 5 + abi .ChainEpoch (nulls ) >= dlinfo .Last () { // Next block brings us past the last epoch in dline, we need to wait for miner to post
188
- bm .t .Logf ("forcing post to get in before deadline closes at %d" , dlinfo .Last ())
189
- bm .forcePoSt (ctx , ts , dlinfo )
240
+ // Get current deadline information for all miners, then filter by the ones that are about to
241
+ // close so we can force a post for them.
242
+ bm .postWatchMinersLk .Lock ()
243
+ var impendingDeadlines minerDeadlines
244
+ for _ , minerAddr := range bm .postWatchMiners {
245
+ dlinfo , err := bm .miner .FullNode .StateMinerProvingDeadline (ctx , minerAddr , ts .Key ())
246
+ require .NoError (bm .t , err )
247
+ require .NotNil (bm .t , dlinfo , "no deadline info for miner %s" , minerAddr )
248
+ impendingDeadlines = append (impendingDeadlines , minerDeadline {addr : minerAddr , deadline : * dlinfo })
249
+ }
250
+ bm .postWatchMinersLk .Unlock ()
251
+ impendingDeadlines = impendingDeadlines .FilterByLast (ts .Height () + 5 + abi .ChainEpoch (nulls ))
252
+
253
+ if len (impendingDeadlines ) > 0 {
254
+ // Next block brings us too close for at least one deadline, we need to wait for miners to post
255
+ bm .t .Logf ("forcing post to get in if due before deadline closes at %v for %v" , impendingDeadlines .CloseList (), impendingDeadlines .MinerStringList ())
256
+ for _ , md := range impendingDeadlines {
257
+ bm .forcePoSt (ctx , ts , md .addr , md .deadline )
258
+ }
190
259
}
191
260
192
261
var target abi.ChainEpoch
@@ -216,10 +285,13 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur
216
285
return
217
286
}
218
287
if ! success {
219
- // if we are mining a new null block and it brings us past deadline boundary we need to wait for miner to post
220
- if ts .Height ()+ 5 + abi .ChainEpoch (nulls + i ) >= dlinfo .Last () {
221
- bm .t .Logf ("forcing post to get in before deadline closes at %d" , dlinfo .Last ())
222
- bm .forcePoSt (ctx , ts , dlinfo )
288
+ // if we are mining a new null block and it brings us past deadline boundary we need to wait for miners to post
289
+ impendingDeadlines = impendingDeadlines .FilterByLast (ts .Height () + 5 + abi .ChainEpoch (nulls + i ))
290
+ if len (impendingDeadlines ) > 0 {
291
+ bm .t .Logf ("forcing post to get in if due before deadline closes at %v for %v" , impendingDeadlines .CloseList (), impendingDeadlines .MinerStringList ())
292
+ for _ , md := range impendingDeadlines {
293
+ bm .forcePoSt (ctx , ts , md .addr , md .deadline )
294
+ }
223
295
}
224
296
}
225
297
}
@@ -378,4 +450,7 @@ func (bm *BlockMiner) Stop() {
378
450
close (bm .pause )
379
451
bm .pause = nil
380
452
}
453
+ bm .postWatchMinersLk .Lock ()
454
+ bm .postWatchMiners = nil
455
+ bm .postWatchMinersLk .Unlock ()
381
456
}
0 commit comments