9
9
10
10
"github.com/filecoin-project/go-address"
11
11
"github.com/filecoin-project/go-bitfield"
12
+ "github.com/filecoin-project/go-state-types/abi"
12
13
13
14
"github.com/filecoin-project/lotus/chain/actors/adt"
14
15
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
@@ -61,6 +62,10 @@ type PipelineTask struct {
61
62
FailedReason string `db:"failed_reason"`
62
63
}
63
64
65
+ func (pt PipelineTask ) sectorID () abi.SectorID {
66
+ return abi.SectorID {Miner : abi .ActorID (pt .SpID ), Number : abi .SectorNumber (pt .SectorNumber )}
67
+ }
68
+
64
69
type sectorListEntry struct {
65
70
PipelineTask
66
71
@@ -69,6 +74,9 @@ type sectorListEntry struct {
69
74
AfterSeed bool
70
75
71
76
ChainAlloc , ChainSector , ChainActive , ChainUnproven , ChainFaulty bool
77
+
78
+ MissingTasks []int64
79
+ AllTasks []int64
72
80
}
73
81
74
82
type minerBitfields struct {
@@ -99,6 +107,16 @@ func (a *WebRPC) PipelinePorepSectors(ctx context.Context) ([]sectorListEntry, e
99
107
return nil , xerrors .Errorf ("failed to fetch pipeline tasks: %w" , err )
100
108
}
101
109
110
+ missingTasks , err := a .pipelinePorepMissingTasks (ctx )
111
+ if err != nil {
112
+ return nil , xerrors .Errorf ("failed to fetch missing tasks: %w" , err )
113
+ }
114
+
115
+ missingTasksMap := make (map [abi.SectorID ]porepMissingTask )
116
+ for _ , mt := range missingTasks {
117
+ missingTasksMap [mt .sectorID ()] = mt
118
+ }
119
+
102
120
head , err := a .deps .Chain .ChainHead (ctx )
103
121
if err != nil {
104
122
return nil , xerrors .Errorf ("failed to fetch chain head: %w" , err )
@@ -129,6 +147,12 @@ func (a *WebRPC) PipelinePorepSectors(ctx context.Context) ([]sectorListEntry, e
129
147
130
148
afterSeed := task .SeedEpoch != nil && * task .SeedEpoch <= int64 (epoch )
131
149
150
+ var missingTasks , allTasks []int64
151
+ if mt , ok := missingTasksMap [task .sectorID ()]; ok {
152
+ missingTasks = mt .MissingTaskIDs
153
+ allTasks = mt .AllTaskIDs
154
+ }
155
+
132
156
sectorList = append (sectorList , sectorListEntry {
133
157
PipelineTask : task ,
134
158
Address : addr ,
@@ -140,6 +164,9 @@ func (a *WebRPC) PipelinePorepSectors(ctx context.Context) ([]sectorListEntry, e
140
164
ChainActive : must .One (mbf .active .IsSet (uint64 (task .SectorNumber ))),
141
165
ChainUnproven : must .One (mbf .unproven .IsSet (uint64 (task .SectorNumber ))),
142
166
ChainFaulty : must .One (mbf .faulty .IsSet (uint64 (task .SectorNumber ))),
167
+
168
+ MissingTasks : missingTasks ,
169
+ AllTasks : allTasks ,
143
170
})
144
171
}
145
172
@@ -249,3 +276,87 @@ func (a *WebRPC) PorepPipelineSummary(ctx context.Context) ([]PorepPipelineSumma
249
276
}
250
277
return summaries , nil
251
278
}
279
+
280
+ func (a * WebRPC ) PipelinePorepRestartAll (ctx context.Context ) error {
281
+ missing , err := a .pipelinePorepMissingTasks (ctx )
282
+ if err != nil {
283
+ return err
284
+ }
285
+
286
+ for _ , mt := range missing {
287
+ if len (mt .AllTaskIDs ) != len (mt .MissingTaskIDs ) || len (mt .MissingTaskIDs ) == 0 {
288
+ continue
289
+ }
290
+
291
+ log .Infow ("Restarting sector" , "sector" , mt .sectorID (), "missing_tasks" , mt .MissingTasksCount )
292
+
293
+ if err := a .SectorResume (ctx , mt .SpID , mt .SectorNumber ); err != nil {
294
+ return err
295
+ }
296
+ }
297
+ return nil
298
+ }
299
+
300
+ type porepMissingTask struct {
301
+ SpID int64 `db:"sp_id"`
302
+ SectorNumber int64 `db:"sector_number"`
303
+
304
+ AllTaskIDs []int64 `db:"all_task_ids"`
305
+ MissingTaskIDs []int64 `db:"missing_task_ids"`
306
+ TotalTasks int `db:"total_tasks"`
307
+ MissingTasksCount int `db:"missing_tasks_count"`
308
+ RestartStatus string `db:"restart_status"`
309
+ }
310
+
311
+ func (pmt porepMissingTask ) sectorID () abi.SectorID {
312
+ return abi.SectorID {Miner : abi .ActorID (pmt .SpID ), Number : abi .SectorNumber (pmt .SectorNumber )}
313
+ }
314
+
315
+ func (a * WebRPC ) pipelinePorepMissingTasks (ctx context.Context ) ([]porepMissingTask , error ) {
316
+ var tasks []porepMissingTask
317
+ err := a .deps .DB .Select (ctx , & tasks , `
318
+ WITH sector_tasks AS (
319
+ SELECT
320
+ sp.sp_id,
321
+ sp.sector_number,
322
+ get_sdr_pipeline_tasks(sp.sp_id, sp.sector_number) AS task_ids
323
+ FROM
324
+ sectors_sdr_pipeline sp
325
+ ),
326
+ missing_tasks AS (
327
+ SELECT
328
+ st.sp_id,
329
+ st.sector_number,
330
+ st.task_ids,
331
+ array_agg(CASE WHEN ht.id IS NULL THEN task_id ELSE NULL END) AS missing_task_ids
332
+ FROM
333
+ sector_tasks st
334
+ CROSS JOIN UNNEST(st.task_ids) WITH ORDINALITY AS t(task_id, task_order)
335
+ LEFT JOIN harmony_task ht ON ht.id = task_id
336
+ GROUP BY
337
+ st.sp_id, st.sector_number, st.task_ids
338
+ )
339
+ SELECT
340
+ mt.sp_id,
341
+ mt.sector_number,
342
+ mt.task_ids AS all_task_ids,
343
+ mt.missing_task_ids,
344
+ array_length(mt.task_ids, 1) AS total_tasks,
345
+ array_length(mt.missing_task_ids, 1) AS missing_tasks_count,
346
+ CASE
347
+ WHEN array_length(mt.task_ids, 1) = array_length(mt.missing_task_ids, 1) THEN 'All tasks missing'
348
+ ELSE 'Some tasks missing'
349
+ END AS restart_status
350
+ FROM
351
+ missing_tasks mt
352
+ WHERE
353
+ array_length(mt.task_ids, 1) > 0 -- Has at least one task
354
+ AND array_length(array_remove(mt.missing_task_ids, NULL), 1) > 0 -- At least one task is missing
355
+ ORDER BY
356
+ mt.sp_id, mt.sector_number;` )
357
+ if err != nil {
358
+ return nil , xerrors .Errorf ("failed to fetch missing tasks: %w" , err )
359
+ }
360
+
361
+ return tasks , nil
362
+ }
0 commit comments