Skip to content

Commit 5ed8a2d

Browse files
committed
+ make has assigned task's priority higher
+ fix unexpected assigned task type issue
1 parent b20efc7 commit 5ed8a2d

File tree

3 files changed

+36
-27
lines changed

3 files changed

+36
-27
lines changed

coordinator/internal/logic/provertask/batch_prover_task.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -84,17 +84,12 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
8484
for i := 0; i < 5; i++ {
8585
var getTaskError error
8686
var tmpBatchTask *orm.Batch
87-
if getTaskParameter.TaskID != "" {
88-
tmpBatchTask, getTaskError = bp.batchOrm.GetBatchByHash(ctx.Copy(), getTaskParameter.TaskID)
89-
if getTaskError != nil {
90-
log.Error("failed to get expected batch", "taskID", getTaskParameter.TaskID, "err", getTaskError)
91-
return nil, ErrCoordinatorInternalFailure
92-
} else if tmpBatchTask == nil {
93-
return nil, fmt.Errorf("Expected task (%s) is already dropped", getTaskParameter.TaskID)
94-
}
95-
}
9687

9788
if taskCtx.hasAssignedTask != nil {
89+
if taskCtx.hasAssignedTask.TaskType != int16(message.ProofTypeBatch) {
90+
return nil, fmt.Errorf("prover with publicKey %s is already assigned a task. ProverName: %s, ProverVersion: %s", taskCtx.PublicKey, taskCtx.ProverName, taskCtx.ProverVersion)
91+
}
92+
9893
tmpBatchTask, getTaskError = bp.batchOrm.GetBatchByHash(ctx.Copy(), taskCtx.hasAssignedTask.TaskID)
9994
if getTaskError != nil {
10095
log.Error("failed to get batch has assigned to prover", "taskID", taskCtx.hasAssignedTask.TaskID, "err", getTaskError)
@@ -104,6 +99,14 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
10499
return nil, fmt.Errorf("prover with publicKey %s is already assigned a dropped batch. ProverName: %s, ProverVersion: %s",
105100
taskCtx.PublicKey, taskCtx.ProverName, taskCtx.ProverVersion)
106101
}
102+
} else if getTaskParameter.TaskID != "" {
103+
tmpBatchTask, getTaskError = bp.batchOrm.GetBatchByHash(ctx.Copy(), getTaskParameter.TaskID)
104+
if getTaskError != nil {
105+
log.Error("failed to get expected batch", "taskID", getTaskParameter.TaskID, "err", getTaskError)
106+
return nil, ErrCoordinatorInternalFailure
107+
} else if tmpBatchTask == nil {
108+
return nil, fmt.Errorf("Expected task (%s) is already dropped", getTaskParameter.TaskID)
109+
}
107110
}
108111

109112
if tmpBatchTask == nil {

coordinator/internal/logic/provertask/bundle_prover_task.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -82,17 +82,12 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat
8282
for i := 0; i < 5; i++ {
8383
var getTaskError error
8484
var tmpBundleTask *orm.Bundle
85-
if getTaskParameter.TaskID != "" {
86-
tmpBundleTask, getTaskError = bp.bundleOrm.GetBundleByHash(ctx.Copy(), getTaskParameter.TaskID)
87-
if getTaskError != nil {
88-
log.Error("failed to get expected bundle", "taskID", getTaskParameter.TaskID, "err", getTaskError)
89-
return nil, ErrCoordinatorInternalFailure
90-
} else if tmpBundleTask == nil {
91-
return nil, fmt.Errorf("Expected task (%s) is already dropped", getTaskParameter.TaskID)
92-
}
93-
}
9485

9586
if taskCtx.hasAssignedTask != nil {
87+
if taskCtx.hasAssignedTask.TaskType != int16(message.ProofTypeBundle) {
88+
return nil, fmt.Errorf("prover with publicKey %s is already assigned a task. ProverName: %s, ProverVersion: %s", taskCtx.PublicKey, taskCtx.ProverName, taskCtx.ProverVersion)
89+
}
90+
9691
tmpBundleTask, getTaskError = bp.bundleOrm.GetBundleByHash(ctx.Copy(), taskCtx.hasAssignedTask.TaskID)
9792
if getTaskError != nil {
9893
log.Error("failed to get bundle has assigned to prover", "taskID", taskCtx.hasAssignedTask.TaskID, "err", getTaskError)
@@ -102,6 +97,14 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat
10297
return nil, fmt.Errorf("prover with publicKey %s is already assigned a dropped bundle. ProverName: %s, ProverVersion: %s",
10398
taskCtx.PublicKey, taskCtx.ProverName, taskCtx.ProverVersion)
10499
}
100+
} else if getTaskParameter.TaskID != "" {
101+
tmpBundleTask, getTaskError = bp.bundleOrm.GetBundleByHash(ctx.Copy(), getTaskParameter.TaskID)
102+
if getTaskError != nil {
103+
log.Error("failed to get expected bundle", "taskID", getTaskParameter.TaskID, "err", getTaskError)
104+
return nil, ErrCoordinatorInternalFailure
105+
} else if tmpBundleTask == nil {
106+
return nil, fmt.Errorf("Expected task (%s) is already dropped", getTaskParameter.TaskID)
107+
}
105108
}
106109

107110
if tmpBundleTask == nil {

coordinator/internal/logic/provertask/chunk_prover_task.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -80,17 +80,12 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
8080
for i := 0; i < 5; i++ {
8181
var getTaskError error
8282
var tmpChunkTask *orm.Chunk
83-
if getTaskParameter.TaskID != "" {
84-
tmpChunkTask, getTaskError = cp.chunkOrm.GetChunkByHash(ctx.Copy(), getTaskParameter.TaskID)
85-
if getTaskError != nil {
86-
log.Error("failed to get expected chunk", "taskID", getTaskParameter.TaskID, "err", getTaskError)
87-
return nil, ErrCoordinatorInternalFailure
88-
} else if tmpChunkTask == nil {
89-
return nil, fmt.Errorf("Expected task (%s) is already dropped", getTaskParameter.TaskID)
90-
}
91-
}
9283

9384
if taskCtx.hasAssignedTask != nil {
85+
if taskCtx.hasAssignedTask.TaskType != int16(message.ProofTypeChunk) {
86+
return nil, fmt.Errorf("prover with publicKey %s is already assigned a task. ProverName: %s, ProverVersion: %s", taskCtx.PublicKey, taskCtx.ProverName, taskCtx.ProverVersion)
87+
}
88+
9489
log.Debug("retrieved assigned task chunk", "taskID", taskCtx.hasAssignedTask.TaskID, "prover", taskCtx.ProverName)
9590
tmpChunkTask, getTaskError = cp.chunkOrm.GetChunkByHash(ctx.Copy(), taskCtx.hasAssignedTask.TaskID)
9691
if getTaskError != nil {
@@ -101,6 +96,14 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
10196
return nil, fmt.Errorf("prover with publicKey %s is already assigned a dropped chunk. ProverName: %s, ProverVersion: %s",
10297
taskCtx.PublicKey, taskCtx.ProverName, taskCtx.ProverVersion)
10398
}
99+
} else if getTaskParameter.TaskID != "" {
100+
tmpChunkTask, getTaskError = cp.chunkOrm.GetChunkByHash(ctx.Copy(), getTaskParameter.TaskID)
101+
if getTaskError != nil {
102+
log.Error("failed to get expected chunk", "taskID", getTaskParameter.TaskID, "err", getTaskError)
103+
return nil, ErrCoordinatorInternalFailure
104+
} else if tmpChunkTask == nil {
105+
return nil, fmt.Errorf("Expected task (%s) is already dropped", getTaskParameter.TaskID)
106+
}
104107
}
105108

106109
if tmpChunkTask == nil {

0 commit comments

Comments
 (0)