-
Notifications
You must be signed in to change notification settings - Fork 391
fix(aggregator): (WIP) fetch task on task response if not cached #1351
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: staging
Are you sure you want to change the base?
Changes from 4 commits
a3e9fb8
b714153
4ee91c9
eb3fd52
6cd8fa9
ad522cd
27bf9c1
9c964bb
c8cb8e7
7759996
9072d8b
99c6c88
cfe283d
4773b0a
009375f
9d56887
86bb230
4b3eef0
a44b1a7
eac734f
9392336
05c7243
2c5a7d1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,6 +35,48 @@ func (agg *Aggregator) ServeOperators() error { | |
| return err | ||
| } | ||
|
|
||
| // Waits for the arrival of task associated with signedTaskResponse and returns true on success or false on failure | ||
| // If the task is not present in the internal map, it will try to fetch it from logs and retry. | ||
| // The number of retries is specified by `waitForEventRetries`, and the waiting time between each by `waitForEventSleepSeconds` | ||
| func (agg *Aggregator) waitForTask(signedTaskResponse *types.SignedTaskResponse) bool { | ||
| for i := 0; i < waitForEventRetries; i++ { | ||
| // Lock | ||
| agg.taskMutex.Lock() | ||
| agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Starting processing of Response") | ||
| _, ok := agg.batchesIdxByIdentifierHash[signedTaskResponse.BatchIdentifierHash] | ||
| // Unlock | ||
| agg.logger.Info("- Unlocked Resources: Task not found in the internal map") | ||
MauroToscano marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| agg.taskMutex.Unlock() | ||
| if ok { | ||
| return true | ||
| } | ||
|
|
||
| // Task was not found in internal map, let's try to fetch it from logs | ||
| agg.logger.Info("Trying to fetch missed task from logs...") | ||
| batch, err := agg.avsReader.GetPendingBatchFromMerkleRoot(signedTaskResponse.BatchMerkleRoot) | ||
|
|
||
| if err == nil && batch != nil { | ||
| agg.logger.Info("Found missed task in logs with merkle root 0x%e", batch.BatchMerkleRoot) | ||
| // Adding new task will fail only if it already exists | ||
| agg.AddNewTask(batch.BatchMerkleRoot, batch.SenderAddress, batch.TaskCreatedBlock) | ||
| return true | ||
| } | ||
|
|
||
| if err != nil { | ||
| agg.logger.Warn("Error fetching task from logs: %v", err) | ||
| } | ||
|
|
||
| if batch == nil { | ||
| agg.logger.Info("Task not found in logs") | ||
| } | ||
|
|
||
| // Task was not found, wait and retry | ||
| time.Sleep(waitForEventSleepSeconds) | ||
| } | ||
|
|
||
| return false | ||
| } | ||
|
|
||
| // Aggregator Methods | ||
| // This is the list of methods that the Aggregator exposes to the Operator | ||
| // The Operator can call these methods to interact with the Aggregator | ||
|
|
@@ -49,27 +91,25 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t | |
| "SenderAddress", "0x"+hex.EncodeToString(signedTaskResponse.SenderAddress[:]), | ||
| "BatchIdentifierHash", "0x"+hex.EncodeToString(signedTaskResponse.BatchIdentifierHash[:]), | ||
| "operatorId", hex.EncodeToString(signedTaskResponse.OperatorId[:])) | ||
| taskIndex := uint32(0) | ||
| ok := false | ||
|
|
||
| for i := 0; i < waitForEventRetries; i++ { | ||
| agg.taskMutex.Lock() | ||
| agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Starting processing of Response") | ||
| taskIndex, ok = agg.batchesIdxByIdentifierHash[signedTaskResponse.BatchIdentifierHash] | ||
| if !ok { | ||
| agg.taskMutex.Unlock() | ||
| agg.logger.Info("- Unlocked Resources: Task not found in the internal map") | ||
| time.Sleep(waitForEventSleepSeconds) | ||
| } else { | ||
| break | ||
| } | ||
| if !agg.waitForTask(signedTaskResponse) { | ||
|
||
| agg.logger.Warn("Task not found in the internal map, operator signature will be lost. Batch may not reach quorum") | ||
| *reply = 1 | ||
| return nil | ||
| } | ||
|
|
||
| agg.taskMutex.Lock() | ||
| agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Starting processing of Response") | ||
| taskIndex, ok := agg.batchesIdxByIdentifierHash[signedTaskResponse.BatchIdentifierHash] | ||
| if !ok { | ||
| agg.logger.Warn("Task not found in the internal map, operator signature will be lost. Batch may not reach quorum") | ||
| agg.logger.Errorf("Unexpected error fetching for task with merkle root 0x%x", signedTaskResponse.BatchMerkleRoot) | ||
| *reply = 1 | ||
| return nil | ||
| } | ||
| // Unlock | ||
| agg.logger.Info("- Unlocked Resources: Task not found in the internal map") | ||
| agg.taskMutex.Unlock() | ||
|
|
||
| agg.telemetry.LogOperatorResponse(signedTaskResponse.BatchMerkleRoot, signedTaskResponse.OperatorId) | ||
|
|
||
| // Don't wait infinitely if it can't answer | ||
|
|
@@ -110,7 +150,6 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t | |
| } | ||
|
|
||
| agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Task response processing finished") | ||
| agg.taskMutex.Unlock() | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This log is repeated on
ProcessOperatorSignedTaskResponseV2.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I can change it if that's more clear
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done