@@ -127,7 +127,8 @@ func (t *TaskClientPoll) CanAccept(ids []harmonytask.TaskID, engine *harmonytask
127
127
128
128
// Do implements harmonytask.TaskInterface.
129
129
func (t * TaskClientPoll ) Do (taskID harmonytask.TaskID , stillOwned func () bool ) (done bool , err error ) {
130
- ctx := context .Background ()
130
+ ctx , cancel := context .WithCancel (context .Background ())
131
+ defer cancel ()
131
132
132
133
var clientRequest ClientRequest
133
134
err = t .db .QueryRow (ctx , `
@@ -148,10 +149,28 @@ func (t *TaskClientPoll) Do(taskID harmonytask.TaskID, stillOwned func() bool) (
148
149
return false , nil
149
150
}
150
151
152
+ pollCtx , ownedCancel := context .WithCancel (ctx )
153
+ go func () {
154
+ const pollInterval = 10 * time .Second
155
+ defer ownedCancel ()
156
+
157
+ for {
158
+ select {
159
+ case <- ctx .Done ():
160
+ return
161
+ case <- time .After (pollInterval ):
162
+ if ! stillOwned () {
163
+ // close the owned context
164
+ return
165
+ }
166
+ }
167
+ }
168
+ }()
169
+
151
170
var proof []byte
152
171
for {
153
172
var stateChanged bool
154
- stateChanged , proof , err = pollForProof (ctx , t .db , taskID , & clientRequest )
173
+ stateChanged , proof , err = pollForProof (pollCtx , t .db , taskID , & clientRequest )
155
174
if err != nil {
156
175
return false , xerrors .Errorf ("failed to poll for proof: %w" , err )
157
176
}
@@ -224,7 +243,7 @@ func pollForProof(ctx context.Context, db *harmonydb.DB, taskID harmonytask.Task
224
243
}
225
244
226
245
// Get proof status by CID
227
- proofResp , err := proofsvc .GetProofStatus (requestCid )
246
+ proofResp , err := proofsvc .GetProofStatus (ctx , requestCid )
228
247
if err != nil || proofResp .Proof == nil {
229
248
log .Infow ("proof not ready" , "taskID" , taskID , "spID" , clientRequest .SpID , "sectorNumber" , clientRequest .SectorNumber )
230
249
// Not ready yet, continue polling
0 commit comments