Skip to content

Commit c2ab08a

Browse files
committed
fix attempt number for reassigned task
1 parent c544a8e commit c2ab08a

File tree

5 files changed

+105
-64
lines changed

5 files changed

+105
-64
lines changed

coordinator/internal/logic/provertask/batch_prover_task.go

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -129,29 +129,32 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
129129
return nil, nil
130130
}
131131

132-
// Don't dispatch the same failing job to the same prover
133-
proverTasks, getFailedTaskError := bp.proverTaskOrm.GetFailedProverTasksByHash(ctx.Copy(), message.ProofTypeBatch, tmpBatchTask.Hash, 2)
134-
if getFailedTaskError != nil {
135-
log.Error("failed to get prover tasks", "proof type", message.ProofTypeBatch.String(), "task ID", tmpBatchTask.Hash, "error", getFailedTaskError)
136-
return nil, ErrCoordinatorInternalFailure
137-
}
138-
for i := 0; i < len(proverTasks); i++ {
139-
if proverTasks[i].ProverPublicKey == taskCtx.PublicKey ||
140-
taskCtx.ProverProviderType == uint8(coordinatorType.ProverProviderTypeExternal) && cutils.IsExternalProverNameMatch(proverTasks[i].ProverName, taskCtx.ProverName) {
141-
log.Debug("get empty batch, the prover already failed this task", "height", getTaskParameter.ProverHeight, "task ID", tmpBatchTask.Hash, "prover name", taskCtx.ProverName, "prover public key", taskCtx.PublicKey)
142-
return nil, nil
132+
// we are simply pick the chunk which has been assigned, so don't bother to update attempts or check failed before
133+
if taskCtx.hasAssignedTask != nil {
134+
// Don't dispatch the same failing job to the same prover
135+
proverTasks, getFailedTaskError := bp.proverTaskOrm.GetFailedProverTasksByHash(ctx.Copy(), message.ProofTypeBatch, tmpBatchTask.Hash, 2)
136+
if getFailedTaskError != nil {
137+
log.Error("failed to get prover tasks", "proof type", message.ProofTypeBatch.String(), "task ID", tmpBatchTask.Hash, "error", getFailedTaskError)
138+
return nil, ErrCoordinatorInternalFailure
139+
}
140+
for i := 0; i < len(proverTasks); i++ {
141+
if proverTasks[i].ProverPublicKey == taskCtx.PublicKey ||
142+
taskCtx.ProverProviderType == uint8(coordinatorType.ProverProviderTypeExternal) && cutils.IsExternalProverNameMatch(proverTasks[i].ProverName, taskCtx.ProverName) {
143+
log.Debug("get empty batch, the prover already failed this task", "height", getTaskParameter.ProverHeight, "task ID", tmpBatchTask.Hash, "prover name", taskCtx.ProverName, "prover public key", taskCtx.PublicKey)
144+
return nil, nil
145+
}
143146
}
144-
}
145147

146-
rowsAffected, updateAttemptsErr := bp.batchOrm.UpdateBatchAttempts(ctx.Copy(), tmpBatchTask.Index, tmpBatchTask.ActiveAttempts, tmpBatchTask.TotalAttempts)
147-
if updateAttemptsErr != nil {
148-
log.Error("failed to update batch attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)
149-
return nil, ErrCoordinatorInternalFailure
150-
}
148+
rowsAffected, updateAttemptsErr := bp.batchOrm.UpdateBatchAttempts(ctx.Copy(), tmpBatchTask.Index, tmpBatchTask.ActiveAttempts, tmpBatchTask.TotalAttempts)
149+
if updateAttemptsErr != nil {
150+
log.Error("failed to update batch attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)
151+
return nil, ErrCoordinatorInternalFailure
152+
}
151153

152-
if rowsAffected == 0 {
153-
time.Sleep(100 * time.Millisecond)
154-
continue
154+
if rowsAffected == 0 {
155+
time.Sleep(100 * time.Millisecond)
156+
continue
157+
}
155158
}
156159

157160
batchTask = tmpBatchTask
@@ -205,6 +208,11 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
205208
log.Error("insert batch prover task info fail", "task_id", batchTask.Hash, "publicKey", taskCtx.PublicKey, "err", err)
206209
return nil, ErrCoordinatorInternalFailure
207210
}
211+
} else {
212+
if err = bp.proverTaskOrm.UpdateProverTaskAssignedTime(ctx.Copy(), proverTask.UUID, utils.NowUTC()); err != nil {
213+
log.Error("update assigned batch prover task fail", "task_id", batchTask.Hash, "publicKey", taskCtx.PublicKey, "err", err)
214+
return nil, ErrCoordinatorInternalFailure
215+
}
208216
}
209217
// notice uuid is set as a side effect of InsertProverTask
210218
taskMsg.UUID = proverTask.UUID.String()

coordinator/internal/logic/provertask/bundle_prover_task.go

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -127,31 +127,33 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat
127127
return nil, nil
128128
}
129129

130-
// Don't dispatch the same failing job to the same prover
131-
proverTasks, getTaskError := bp.proverTaskOrm.GetFailedProverTasksByHash(ctx.Copy(), message.ProofTypeBundle, tmpBundleTask.Hash, 2)
132-
if getTaskError != nil {
133-
log.Error("failed to get prover tasks", "proof type", message.ProofTypeBundle.String(), "task ID", tmpBundleTask.Hash, "error", getTaskError)
134-
return nil, ErrCoordinatorInternalFailure
135-
}
136-
for i := 0; i < len(proverTasks); i++ {
137-
if proverTasks[i].ProverPublicKey == taskCtx.PublicKey ||
138-
taskCtx.ProverProviderType == uint8(coordinatorType.ProverProviderTypeExternal) && cutils.IsExternalProverNameMatch(proverTasks[i].ProverName, taskCtx.ProverName) {
139-
log.Debug("get empty bundle, the prover already failed this task", "height", getTaskParameter.ProverHeight, "task ID", tmpBundleTask.Hash, "prover name", taskCtx.ProverName, "prover public key", taskCtx.PublicKey)
140-
return nil, nil
130+
// we are simply pick the chunk which has been assigned, so don't bother to update attempts or check failed before
131+
if taskCtx.hasAssignedTask != nil {
132+
// Don't dispatch the same failing job to the same prover
133+
proverTasks, getTaskError := bp.proverTaskOrm.GetFailedProverTasksByHash(ctx.Copy(), message.ProofTypeBundle, tmpBundleTask.Hash, 2)
134+
if getTaskError != nil {
135+
log.Error("failed to get prover tasks", "proof type", message.ProofTypeBundle.String(), "task ID", tmpBundleTask.Hash, "error", getTaskError)
136+
return nil, ErrCoordinatorInternalFailure
137+
}
138+
for i := 0; i < len(proverTasks); i++ {
139+
if proverTasks[i].ProverPublicKey == taskCtx.PublicKey ||
140+
taskCtx.ProverProviderType == uint8(coordinatorType.ProverProviderTypeExternal) && cutils.IsExternalProverNameMatch(proverTasks[i].ProverName, taskCtx.ProverName) {
141+
log.Debug("get empty bundle, the prover already failed this task", "height", getTaskParameter.ProverHeight, "task ID", tmpBundleTask.Hash, "prover name", taskCtx.ProverName, "prover public key", taskCtx.PublicKey)
142+
return nil, nil
143+
}
141144
}
142-
}
143145

144-
rowsAffected, updateAttemptsErr := bp.bundleOrm.UpdateBundleAttempts(ctx.Copy(), tmpBundleTask.Hash, tmpBundleTask.ActiveAttempts, tmpBundleTask.TotalAttempts)
145-
if updateAttemptsErr != nil {
146-
log.Error("failed to update bundle attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)
147-
return nil, ErrCoordinatorInternalFailure
148-
}
146+
rowsAffected, updateAttemptsErr := bp.bundleOrm.UpdateBundleAttempts(ctx.Copy(), tmpBundleTask.Hash, tmpBundleTask.ActiveAttempts, tmpBundleTask.TotalAttempts)
147+
if updateAttemptsErr != nil {
148+
log.Error("failed to update bundle attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)
149+
return nil, ErrCoordinatorInternalFailure
150+
}
149151

150-
if rowsAffected == 0 {
151-
time.Sleep(100 * time.Millisecond)
152-
continue
152+
if rowsAffected == 0 {
153+
time.Sleep(100 * time.Millisecond)
154+
continue
155+
}
153156
}
154-
155157
bundleTask = tmpBundleTask
156158
break
157159
}
@@ -205,6 +207,11 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat
205207
log.Error("insert bundle prover task info fail", "task_id", bundleTask.Hash, "publicKey", taskCtx.PublicKey, "err", err)
206208
return nil, ErrCoordinatorInternalFailure
207209
}
210+
} else {
211+
if err = bp.proverTaskOrm.UpdateProverTaskAssignedTime(ctx.Copy(), proverTask.UUID, utils.NowUTC()); err != nil {
212+
log.Error("update assigned bundle prover task fail", "task_id", bundleTask.Hash, "publicKey", taskCtx.PublicKey, "err", err)
213+
return nil, ErrCoordinatorInternalFailure
214+
}
208215
}
209216
// notice uuid is set as a side effect of InsertProverTask
210217
taskMsg.UUID = proverTask.UUID.String()

coordinator/internal/logic/provertask/chunk_prover_task.go

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
8080
var getTaskError error
8181
var tmpChunkTask *orm.Chunk
8282
if taskCtx.hasAssignedTask != nil {
83-
log.Debug("retrived assigned task chunk", "taskID", taskCtx.hasAssignedTask.TaskID, "prover", taskCtx.ProverName)
83+
log.Debug("retrieved assigned task chunk", "taskID", taskCtx.hasAssignedTask.TaskID, "prover", taskCtx.ProverName)
8484
tmpChunkTask, getTaskError = cp.chunkOrm.GetChunkByHash(ctx.Copy(), taskCtx.hasAssignedTask.TaskID)
8585
if getTaskError != nil {
8686
log.Error("failed to get chunk has assigned to prover", "taskID", taskCtx.hasAssignedTask.TaskID, "err", getTaskError)
@@ -124,31 +124,33 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
124124
return nil, nil
125125
}
126126

127-
// Don't dispatch the same failing job to the same prover
128-
proverTasks, getFailedTaskError := cp.proverTaskOrm.GetFailedProverTasksByHash(ctx.Copy(), message.ProofTypeChunk, tmpChunkTask.Hash, 2)
129-
if getFailedTaskError != nil {
130-
log.Error("failed to get prover tasks", "proof type", message.ProofTypeChunk.String(), "task ID", tmpChunkTask.Hash, "error", getFailedTaskError)
131-
return nil, ErrCoordinatorInternalFailure
132-
}
133-
for i := 0; i < len(proverTasks); i++ {
134-
if proverTasks[i].ProverPublicKey == taskCtx.PublicKey ||
135-
taskCtx.ProverProviderType == uint8(coordinatorType.ProverProviderTypeExternal) && cutils.IsExternalProverNameMatch(proverTasks[i].ProverName, taskCtx.ProverName) {
136-
log.Debug("get empty chunk, the prover already failed this task", "height", getTaskParameter.ProverHeight, "task ID", tmpChunkTask.Hash, "prover name", taskCtx.ProverName, "prover public key", taskCtx.PublicKey)
137-
return nil, nil
127+
// we are simply pick the chunk which has been assigned, so don't bother to update attempts or check failed before
128+
if taskCtx.hasAssignedTask != nil {
129+
// Don't dispatch the same failing job to the same prover
130+
proverTasks, getFailedTaskError := cp.proverTaskOrm.GetFailedProverTasksByHash(ctx.Copy(), message.ProofTypeChunk, tmpChunkTask.Hash, 2)
131+
if getFailedTaskError != nil {
132+
log.Error("failed to get prover tasks", "proof type", message.ProofTypeChunk.String(), "task ID", tmpChunkTask.Hash, "error", getFailedTaskError)
133+
return nil, ErrCoordinatorInternalFailure
134+
}
135+
for i := 0; i < len(proverTasks); i++ {
136+
if proverTasks[i].ProverPublicKey == taskCtx.PublicKey ||
137+
taskCtx.ProverProviderType == uint8(coordinatorType.ProverProviderTypeExternal) && cutils.IsExternalProverNameMatch(proverTasks[i].ProverName, taskCtx.ProverName) {
138+
log.Debug("get empty chunk, the prover already failed this task", "height", getTaskParameter.ProverHeight, "task ID", tmpChunkTask.Hash, "prover name", taskCtx.ProverName, "prover public key", taskCtx.PublicKey)
139+
return nil, nil
140+
}
138141
}
139-
}
140142

141-
rowsAffected, updateAttemptsErr := cp.chunkOrm.UpdateChunkAttempts(ctx.Copy(), tmpChunkTask.Index, tmpChunkTask.ActiveAttempts, tmpChunkTask.TotalAttempts)
142-
if updateAttemptsErr != nil {
143-
log.Error("failed to update chunk attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)
144-
return nil, ErrCoordinatorInternalFailure
145-
}
143+
rowsAffected, updateAttemptsErr := cp.chunkOrm.UpdateChunkAttempts(ctx.Copy(), tmpChunkTask.Index, tmpChunkTask.ActiveAttempts, tmpChunkTask.TotalAttempts)
144+
if updateAttemptsErr != nil {
145+
log.Error("failed to update chunk attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)
146+
return nil, ErrCoordinatorInternalFailure
147+
}
146148

147-
if rowsAffected == 0 {
148-
time.Sleep(100 * time.Millisecond)
149-
continue
149+
if rowsAffected == 0 {
150+
time.Sleep(100 * time.Millisecond)
151+
continue
152+
}
150153
}
151-
152154
chunkTask = tmpChunkTask
153155
break
154156
}
@@ -200,6 +202,11 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
200202
log.Error("insert chunk prover task fail", "task_id", chunkTask.Hash, "publicKey", taskCtx.PublicKey, "err", err)
201203
return nil, ErrCoordinatorInternalFailure
202204
}
205+
} else {
206+
if err = cp.proverTaskOrm.UpdateProverTaskAssignedTime(ctx.Copy(), proverTask.UUID, utils.NowUTC()); err != nil {
207+
log.Error("update assigned chunk prover task fail", "task_id", chunkTask.Hash, "publicKey", taskCtx.PublicKey, "err", err)
208+
return nil, ErrCoordinatorInternalFailure
209+
}
203210
}
204211
// notice uuid is set as a side effect of InsertProverTask
205212
taskMsg.UUID = proverTask.UUID.String()

coordinator/internal/orm/prover_task.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,24 @@ func (o *ProverTask) UpdateProverTaskProvingStatusAndFailureType(ctx context.Con
269269
return nil
270270
}
271271

272+
// UpdateProverTaskProvingStatusAndFailureType updates the proving_status of a specific ProverTask record.
273+
func (o *ProverTask) UpdateProverTaskAssignedTime(ctx context.Context, uuid uuid.UUID, t time.Time, dbTX ...*gorm.DB) error {
274+
db := o.db
275+
if len(dbTX) > 0 && dbTX[0] != nil {
276+
db = dbTX[0]
277+
}
278+
db = db.WithContext(ctx)
279+
db = db.Model(&ProverTask{})
280+
db = db.Where("uuid = ?", uuid)
281+
282+
updates := make(map[string]interface{})
283+
updates["assigned_at"] = t
284+
if err := db.Updates(updates).Error; err != nil {
285+
return fmt.Errorf("ProverTask.UpdateProverTaskAssignedTime error: %w, uuid:%s, status: %v", err, uuid, t)
286+
}
287+
return nil
288+
}
289+
272290
// UpdateProverTaskFailureType update the prover task failure type
273291
func (o *ProverTask) UpdateProverTaskFailureType(ctx context.Context, uuid uuid.UUID, failureType types.ProverTaskFailureType, dbTX ...*gorm.DB) error {
274292
db := o.db

zkvm-prover/.work/.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@
33
*.sol
44
cache
55
db
6-
*.json
6+
*.json
7+
?

0 commit comments

Comments
 (0)