Skip to content

Commit c10de69

Browse files
committed
fix
1 parent 9a4e474 commit c10de69

File tree

1 file changed

+5
-3
lines changed

1 file changed

+5
-3
lines changed

redis/redis-checkpoint.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,9 @@ func (checkpointer *RedisCheckpoint) CheckpointSequence(shard *par.ShardStatus)
185185
claimRequest := ""
186186

187187
if checkpointer.kclConfig.EnableLeaseStealing {
188-
if checkpoint, err := checkpointer.getItem(shard.ID); err != nil && checkpoint.Checkpoint != "" {
189-
claimRequest = checkpoint.Checkpoint
188+
if checkpoint, err := checkpointer.getItem(shard.ID); err != nil && checkpoint.ClaimRequest != "" && checkpoint.ClaimRequest != shard.ID {
189+
claimRequest = checkpoint.ClaimRequest
190+
checkpointer.kclConfig.Logger.Warnf("CheckpointSequence new claimRequest for %s", claimRequest)
190191
}
191192
}
192193

@@ -274,7 +275,8 @@ func (checkpointer *RedisCheckpoint) RemoveLeaseOwner(shardID string) error {
274275
} else if cp.AssignedTo != checkpointer.kclConfig.WorkerID {
275276
return fmt.Errorf("RemoveLeaseOwner invalid AssignedTo")
276277
} else {
277-
return checkpointer.removeItem(shardID)
278+
cp.AssignedTo = ""
279+
return checkpointer.putItem(cp)
278280
}
279281
}
280282

0 commit comments

Comments
 (0)