Skip to content

Commit 7ed3f3e

Browse files
taturosatiMauroToscanoentropidelic
authored
fix (aggregator): improve stability (#321)
Co-authored-by: MauroFab <[email protected]> Co-authored-by: Mariano A. Nicolini <[email protected]>
1 parent a525f12 commit 7ed3f3e

File tree

4 files changed

+109
-99
lines changed

4 files changed

+109
-99
lines changed

aggregator/internal/pkg/aggregator.go

Lines changed: 59 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package pkg
22

33
import (
44
"context"
5+
"encoding/hex"
56
"sync"
67
"time"
78

@@ -28,10 +29,7 @@ const QUORUM_NUMBER = byte(0)
2829
const QUORUM_THRESHOLD = byte(67)
2930

3031
// Aggregator stores TaskResponse for a task here
31-
type TaskResponsesWithStatus struct {
32-
taskResponses []types.SignedTaskResponse
33-
submittedToEthereum bool
34-
}
32+
type TaskResponses = []types.SignedTaskResponse
3533

3634
type Aggregator struct {
3735
AggregatorConfig *config.AggregatorConfig
@@ -46,26 +44,23 @@ type Aggregator struct {
4644
// Since our ID is not an idx, we build this cache
4745
// Note: In case of a reboot, this doesn't need to be loaded,
4846
// and can start from zero
49-
batchesRootByIdx map[uint32][32]byte
50-
batchesRootByIdxMutex *sync.Mutex
47+
batchesRootByIdx map[uint32][32]byte
5148

5249
// This is the counterpart,
5350
// to use when we have the batch but not the index
5451
// Note: In case of a reboot, this doesn't need to be loaded,
5552
// and can start from zero
56-
batchesIdxByRoot map[[32]byte]uint32
57-
batchesIdxByRootMutex *sync.Mutex
53+
batchesIdxByRoot map[[32]byte]uint32
5854

5955
// This task index is to communicate with the local BLS
6056
// Service.
6157
// Note: In case of a reboot it can start from 0 again
62-
nextBatchIndex uint32
63-
nextBatchIndexMutex *sync.Mutex
58+
nextBatchIndex uint32
59+
60+
// Mutex to protect batchesRootByIdx, batchesIdxByRoot and nextBatchIndex
61+
taskMutex *sync.Mutex
6462

65-
OperatorTaskResponses map[[32]byte]*TaskResponsesWithStatus
66-
// Mutex to protect the taskResponses map
67-
batchesResponseMutex *sync.Mutex
68-
logger logging.Logger
63+
logger logging.Logger
6964

7065
metricsReg *prometheus.Registry
7166
metrics *metrics.Metrics
@@ -92,8 +87,6 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
9287
batchesRootByIdx := make(map[uint32][32]byte)
9388
batchesIdxByRoot := make(map[[32]byte]uint32)
9489

95-
operatorTaskResponses := make(map[[32]byte]*TaskResponsesWithStatus, 0)
96-
9790
chainioConfig := sdkclients.BuildAllConfig{
9891
EthHttpUrl: aggregatorConfig.BaseConfig.EthRpcUrl,
9992
EthWsUrl: aggregatorConfig.BaseConfig.EthWsUrl,
@@ -129,17 +122,11 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
129122
avsWriter: avsWriter,
130123
NewBatchChan: newBatchChan,
131124

132-
batchesRootByIdx: batchesRootByIdx,
133-
batchesRootByIdxMutex: &sync.Mutex{},
134-
135-
batchesIdxByRoot: batchesIdxByRoot,
136-
batchesIdxByRootMutex: &sync.Mutex{},
137-
138-
nextBatchIndex: nextBatchIndex,
139-
nextBatchIndexMutex: &sync.Mutex{},
125+
batchesRootByIdx: batchesRootByIdx,
126+
batchesIdxByRoot: batchesIdxByRoot,
127+
nextBatchIndex: nextBatchIndex,
128+
taskMutex: &sync.Mutex{},
140129

141-
OperatorTaskResponses: operatorTaskResponses,
142-
batchesResponseMutex: &sync.Mutex{},
143130
blsAggregationService: blsAggregationService,
144131
logger: logger,
145132
metricsReg: reg,
@@ -173,13 +160,16 @@ func (agg *Aggregator) Start(ctx context.Context) error {
173160
case err := <-metricsErrChan:
174161
agg.logger.Fatal("Metrics server failed", "err", err)
175162
case blsAggServiceResp := <-agg.blsAggregationService.GetResponseChannel():
176-
agg.logger.Info("Received response from BLS aggregation service", "blsAggServiceResp", blsAggServiceResp)
163+
agg.logger.Info("Received response from BLS aggregation service",
164+
"taskIndex", blsAggServiceResp.TaskIndex)
177165
agg.sendAggregatedResponseToContract(blsAggServiceResp)
178166
agg.metrics.IncAggregatedResponses()
179167
}
180168
}
181169
}
182170

171+
const MaxSentTxRetries = 5
172+
183173
func (agg *Aggregator) sendAggregatedResponseToContract(blsAggServiceResp blsagg.BlsAggregationServiceResponse) {
184174
if blsAggServiceResp.Err != nil {
185175
agg.logger.Error("BlsAggregationServiceResponse contains an error", "err", blsAggServiceResp.Err)
@@ -206,70 +196,76 @@ func (agg *Aggregator) sendAggregatedResponseToContract(blsAggServiceResp blsagg
206196
NonSignerStakeIndices: blsAggServiceResp.NonSignerStakeIndices,
207197
}
208198

199+
agg.taskMutex.Lock()
200+
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Fetching merkle root")
201+
batchMerkleRoot := agg.batchesRootByIdx[blsAggServiceResp.TaskIndex]
202+
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Fetching merkle root")
203+
agg.taskMutex.Unlock()
204+
209205
agg.logger.Info("Threshold reached. Sending aggregated response onchain.",
210206
"taskIndex", blsAggServiceResp.TaskIndex,
211-
)
207+
"merkleRoot", hex.EncodeToString(batchMerkleRoot[:]))
212208

213-
agg.batchesRootByIdxMutex.Lock()
214-
batchMerkleRoot := agg.batchesRootByIdx[blsAggServiceResp.TaskIndex]
215-
agg.batchesRootByIdxMutex.Unlock()
209+
var err error
216210

217-
_, err := agg.avsWriter.SendAggregatedResponse(context.Background(), batchMerkleRoot, nonSignerStakesAndSignature)
218-
if err != nil {
219-
agg.logger.Error("Aggregator failed to respond to task", "err", err)
211+
for i := 0; i < MaxSentTxRetries; i++ {
212+
_, err = agg.avsWriter.SendAggregatedResponse(context.Background(), batchMerkleRoot, nonSignerStakesAndSignature)
213+
if err == nil {
214+
agg.logger.Info("Aggregator successfully responded to task",
215+
"taskIndex", blsAggServiceResp.TaskIndex,
216+
"merkleRoot", hex.EncodeToString(batchMerkleRoot[:]))
217+
218+
return
219+
}
220+
221+
// Sleep for a bit before retrying
222+
time.Sleep(2 * time.Second)
220223
}
224+
225+
agg.logger.Error("Aggregator failed to respond to task, this batch will be lost",
226+
"err", err,
227+
"taskIndex", blsAggServiceResp.TaskIndex,
228+
"merkleRoot", hex.EncodeToString(batchMerkleRoot[:]))
221229
}
222230

231+
223232
func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, taskCreatedBlock uint32) {
224233
agg.AggregatorConfig.BaseConfig.Logger.Info("Adding new task", "Batch merkle root", batchMerkleRoot)
225234

226-
agg.nextBatchIndexMutex.Lock()
227-
batchIndex := agg.nextBatchIndex
228-
agg.nextBatchIndexMutex.Unlock()
235+
agg.taskMutex.Lock()
236+
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Adding new task")
229237

230238
// --- UPDATE BATCH - INDEX CACHES ---
231-
232-
agg.batchesRootByIdxMutex.Lock()
233-
if _, ok := agg.batchesRootByIdx[batchIndex]; ok {
239+
batchIndex := agg.nextBatchIndex
240+
if _, ok := agg.batchesIdxByRoot[batchMerkleRoot]; ok {
234241
agg.logger.Warn("Batch already exists", "batchIndex", batchIndex, "batchRoot", batchMerkleRoot)
235-
agg.batchesRootByIdxMutex.Unlock()
242+
agg.taskMutex.Unlock()
243+
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Adding new task")
236244
return
237245
}
238-
agg.batchesRootByIdx[batchIndex] = batchMerkleRoot
239-
agg.batchesRootByIdxMutex.Unlock()
240246

241-
agg.batchesIdxByRootMutex.Lock()
242247
// This shouldn't happen, since both maps are updated together
243-
if _, ok := agg.batchesIdxByRoot[batchMerkleRoot]; ok {
248+
if _, ok := agg.batchesRootByIdx[batchIndex]; ok {
244249
agg.logger.Warn("Batch already exists", "batchIndex", batchIndex, "batchRoot", batchMerkleRoot)
245-
agg.batchesRootByIdxMutex.Unlock()
250+
agg.taskMutex.Unlock()
251+
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Adding new task")
246252
return
247253
}
248-
agg.batchesIdxByRoot[batchMerkleRoot] = batchIndex
249-
agg.batchesIdxByRootMutex.Unlock()
250254

251-
// --- UPDATE TASK RESPONSES ---
252-
253-
agg.batchesResponseMutex.Lock()
254-
agg.OperatorTaskResponses[batchMerkleRoot] = &TaskResponsesWithStatus{
255-
taskResponses: make([]types.SignedTaskResponse, 0),
256-
submittedToEthereum: false,
257-
}
258-
agg.batchesResponseMutex.Unlock()
255+
agg.batchesIdxByRoot[batchMerkleRoot] = batchIndex
256+
agg.batchesRootByIdx[batchIndex] = batchMerkleRoot
257+
agg.nextBatchIndex += 1
259258

260259
quorumNums := eigentypes.QuorumNums{eigentypes.QuorumNum(QUORUM_NUMBER)}
261260
quorumThresholdPercentages := eigentypes.QuorumThresholdPercentages{eigentypes.QuorumThresholdPercentage(QUORUM_THRESHOLD)}
262261

263262
err := agg.blsAggregationService.InitializeNewTask(batchIndex, taskCreatedBlock, quorumNums, quorumThresholdPercentages, 100*time.Second)
264-
265-
// --- INCREASE BATCH INDEX ---
266-
267-
agg.nextBatchIndexMutex.Lock()
268-
agg.nextBatchIndex = agg.nextBatchIndex + 1
269-
agg.nextBatchIndexMutex.Unlock()
270-
271263
// FIXME(marian): When this errors, should we retry initializing new task? Logging fatal for now.
272264
if err != nil {
273265
agg.logger.Fatalf("BLS aggregation service error when initializing new task: %s", err)
274266
}
267+
268+
agg.taskMutex.Unlock()
269+
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Adding new task")
270+
agg.logger.Info("New task added", "batchIndex", batchIndex, "batchMerkleRoot", batchMerkleRoot)
275271
}

aggregator/internal/pkg/server.go

Lines changed: 48 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@ package pkg
22

33
import (
44
"context"
5+
"encoding/hex"
56
"fmt"
67
"net/http"
78
"net/rpc"
9+
"time"
810

911
"github.com/yetanotherco/aligned_layer/core/types"
1012
)
@@ -43,35 +45,57 @@ func (agg *Aggregator) ServeOperators() error {
4345
// - 0: Success
4446
// - 1: Error
4547
func (agg *Aggregator) ProcessOperatorSignedTaskResponse(signedTaskResponse *types.SignedTaskResponse, reply *uint8) error {
46-
47-
agg.AggregatorConfig.BaseConfig.Logger.Info("New task response", "taskResponse", signedTaskResponse)
48-
49-
if _, ok := agg.OperatorTaskResponses[signedTaskResponse.BatchMerkleRoot]; !ok {
48+
agg.AggregatorConfig.BaseConfig.Logger.Info("New task response",
49+
"merkleRoot", hex.EncodeToString(signedTaskResponse.BatchMerkleRoot[:]),
50+
"operatorId", hex.EncodeToString(signedTaskResponse.OperatorId[:]))
51+
52+
agg.taskMutex.Lock()
53+
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Starting processing of Response")
54+
taskIndex, ok := agg.batchesIdxByRoot[signedTaskResponse.BatchMerkleRoot]
55+
if !ok {
56+
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources")
57+
agg.taskMutex.Unlock()
5058
return fmt.Errorf("task with batch merkle root %d does not exist", signedTaskResponse.BatchMerkleRoot)
5159
}
5260

53-
agg.batchesResponseMutex.Lock()
54-
taskResponses := agg.OperatorTaskResponses[signedTaskResponse.BatchMerkleRoot]
55-
taskResponses.taskResponses = append(
56-
agg.OperatorTaskResponses[signedTaskResponse.BatchMerkleRoot].taskResponses,
57-
*signedTaskResponse)
58-
agg.batchesResponseMutex.Unlock()
59-
60-
agg.batchesIdxByRootMutex.Lock()
61-
taskIndex := agg.batchesIdxByRoot[signedTaskResponse.BatchMerkleRoot]
62-
agg.batchesIdxByRootMutex.Unlock()
63-
64-
err := agg.blsAggregationService.ProcessNewSignature(
65-
context.Background(), taskIndex, signedTaskResponse.BatchMerkleRoot,
66-
&signedTaskResponse.BlsSignature, signedTaskResponse.OperatorId,
67-
)
68-
if err != nil {
69-
agg.logger.Warnf("BLS aggregation service error: %s", err)
70-
*reply = 1
71-
return err
61+
// Don't wait infinitely if it can't answer
62+
// Create a context with a timeout of 5 seconds
63+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
64+
defer cancel() // Ensure the cancel function is called to release resources
65+
66+
// Create a channel to signal when the task is done
67+
done := make(chan struct{})
68+
69+
agg.logger.Info("Starting bls signature process")
70+
go func() {
71+
err := agg.blsAggregationService.ProcessNewSignature(
72+
context.Background(), taskIndex, signedTaskResponse.BatchMerkleRoot,
73+
&signedTaskResponse.BlsSignature, signedTaskResponse.OperatorId,
74+
)
75+
76+
if err != nil {
77+
agg.logger.Warnf("BLS aggregation service error: %s", err)
78+
} else {
79+
agg.logger.Info("BLS process succeeded")
80+
}
81+
82+
close(done)
83+
}()
84+
85+
*reply = 1
86+
// Wait for either the context to be done or the task to complete
87+
select {
88+
case <-ctx.Done():
89+
// The context's deadline was exceeded or it was canceled
90+
agg.logger.Info("Bls process timed out, operator signature will be lost. Batch may not reach quorum")
91+
case <-done:
92+
// The task completed successfully
93+
agg.logger.Info("Bls context finished correctly")
94+
*reply = 0
7295
}
7396

74-
*reply = 0
97+
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Task response processing finished")
98+
agg.taskMutex.Unlock()
7599

76100
return nil
77101
}

aggregator/internal/pkg/subscriber.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ import (
99
)
1010

1111
const (
12-
MaxRetries = 20
13-
RetryInterval = 10 * time.Second
12+
MaxRetries = 100
13+
RetryInterval = 1 * time.Second
1414
)
1515

1616
func (agg *Aggregator) SubscribeToNewTasks() error {
@@ -32,7 +32,6 @@ func (agg *Aggregator) subscribeToNewTasks() error {
3232
for {
3333
select {
3434
case err := <-agg.taskSubscriber.Err():
35-
agg.AggregatorConfig.BaseConfig.Logger.Error("Error in subscription", "err", err)
3635
return err
3736
case newBatch := <-agg.NewBatchChan:
3837
agg.AddNewTask(newBatch.BatchMerkleRoot, newBatch.TaskCreatedBlock)

core/chainio/avs_writer.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@ func (w *AvsWriter) SendAggregatedResponse(ctx context.Context, batchMerkleRoot
9090
txOpts.NoSend = true // simulate the transaction
9191
tx, err := w.AvsContractBindings.ServiceManager.RespondToTask(&txOpts, batchMerkleRoot, nonSignerStakesAndSignature)
9292
if err != nil {
93-
w.logger.Error("Error submitting SubmitTaskResponse tx while calling respondToTask", "err", err)
9493
return nil, err
9594
}
9695

@@ -99,7 +98,6 @@ func (w *AvsWriter) SendAggregatedResponse(ctx context.Context, batchMerkleRoot
9998
txOpts.GasLimit = tx.Gas() * 110 / 100 // Add 10% to the gas limit
10099
tx, err = w.AvsContractBindings.ServiceManager.RespondToTask(&txOpts, batchMerkleRoot, nonSignerStakesAndSignature)
101100
if err != nil {
102-
w.logger.Error("Error submitting SubmitTaskResponse tx while calling respondToTask", "err", err)
103101
return nil, err
104102
}
105103

@@ -108,13 +106,6 @@ func (w *AvsWriter) SendAggregatedResponse(ctx context.Context, batchMerkleRoot
108106
return nil, err
109107
}
110108

111-
taskRespondedEvent, err := w.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerFilterer.ParseBatchVerified(*receipt.Logs[0])
112-
if err != nil {
113-
return nil, err
114-
}
115-
116-
// FIXME(marian): Dummy log to check integration with the contract
117-
w.logger.Infof("TASK RESPONDED EVENT: %+v", taskRespondedEvent)
118109
return receipt, nil
119110
}
120111

0 commit comments

Comments
 (0)