Skip to content

Commit ef6d5b2

Browse files
authored
fix(aggregator): add retries for check if task exists (#397)
1 parent 43b6404 commit ef6d5b2

File tree

1 file changed

+15
-8
lines changed

1 file changed

+15
-8
lines changed

aggregator/internal/pkg/server.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,16 @@ package pkg
33
import (
44
"context"
55
"encoding/hex"
6-
"fmt"
76
"net/http"
87
"net/rpc"
98
"time"
109

1110
"github.com/yetanotherco/aligned_layer/core/types"
1211
)
1312

13+
const waitForEventRetries = 50
14+
const waitForEventSleepSeconds = 4 * time.Second
15+
1416
func (agg *Aggregator) ServeOperators() error {
1517
// Registers a new RPC server
1618
err := rpc.Register(agg)
@@ -49,13 +51,18 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponse(signedTaskResponse *typ
4951
"merkleRoot", hex.EncodeToString(signedTaskResponse.BatchMerkleRoot[:]),
5052
"operatorId", hex.EncodeToString(signedTaskResponse.OperatorId[:]))
5153

52-
agg.taskMutex.Lock()
53-
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Starting processing of Response")
54-
taskIndex, ok := agg.batchesIdxByRoot[signedTaskResponse.BatchMerkleRoot]
55-
if !ok {
56-
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources")
57-
agg.taskMutex.Unlock()
58-
return fmt.Errorf("task with batch merkle root %d does not exist", signedTaskResponse.BatchMerkleRoot)
54+
taskIndex := uint32(0)
55+
for i := 0; i < waitForEventRetries; i++ {
56+
agg.taskMutex.Lock()
57+
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Starting processing of Response")
58+
ok := false
59+
taskIndex, ok = agg.batchesIdxByRoot[signedTaskResponse.BatchMerkleRoot]
60+
if !ok {
61+
agg.taskMutex.Unlock()
62+
time.Sleep(waitForEventSleepSeconds)
63+
} else {
64+
break
65+
}
5966
}
6067

6168
// Don't wait infinitely if it can't answer

0 commit comments

Comments
 (0)