Skip to content

Commit dd2d0a3

Browse files
NicolasRampolditaturosatientropidelicuri-99
authored
feat: add retry to get batch of s3 (#958)
Co-authored-by: Tatu <[email protected]> Co-authored-by: Mariano Nicolini <[email protected]> Co-authored-by: Urix <[email protected]>
1 parent 5fb540f commit dd2d0a3

File tree

3 files changed

+49
-14
lines changed

3 files changed

+49
-14
lines changed

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ batcher_send_sp1_burst:
244244
--proving_system SP1 \
245245
--proof ../../scripts/test_files/sp1/sp1_fibonacci.proof \
246246
--vm_program ../../scripts/test_files/sp1/sp1_fibonacci.elf \
247-
--repetitions 15 \
247+
--repetitions $(BURST_SIZE) \
248248
--proof_generator_addr 0x66f9664f97F2b50F62D13eA064982f936dE76657 \
249249
--rpc_url $(RPC_URL) \
250250
--payment_service_addr $(BATCHER_PAYMENTS_CONTRACT_ADDRESS)
@@ -271,7 +271,7 @@ batcher_send_risc0_burst:
271271
--proof ../../scripts/test_files/risc_zero/fibonacci_proof_generator/risc_zero_fibonacci.proof \
272272
--vm_program ../../scripts/test_files/risc_zero/fibonacci_proof_generator/fibonacci_id.bin \
273273
--public_input ../../scripts/test_files/risc_zero/fibonacci_proof_generator/risc_zero_fibonacci.pub \
274-
--repetitions 15 \
274+
--repetitions $(BURST_SIZE) \
275275
--proof_generator_addr 0x66f9664f97F2b50F62D13eA064982f936dE76657 \
276276
--rpc_url $(RPC_URL) \
277277
--payment_service_addr $(BATCHER_PAYMENTS_CONTRACT_ADDRESS)

operator/pkg/operator.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ type Operator struct {
5555
}
5656

5757
const (
58-
BatchDownloadTimeout = 1 * time.Minute
58+
BatchDownloadTimeout = 1 * time.Minute
59+
BatchDownloadMaxRetries = 3
60+
BatchDownloadRetryDelay = 5 * time.Second
5961
)
6062

6163
func NewOperatorFromConfig(configuration config.OperatorConfig) (*Operator, error) {
@@ -183,7 +185,7 @@ func (o *Operator) handleNewBatchLog(newBatchLog *servicemanager.ContractAligned
183185
hex.EncodeToString(signedTaskResponse.BatchMerkleRoot[:]),
184186
hex.EncodeToString(signedTaskResponse.SenderAddress[:]),
185187
)
186-
188+
187189
o.aggRpcClient.SendSignedTaskResponseToAggregator(&signedTaskResponse)
188190
}
189191

@@ -199,7 +201,7 @@ func (o *Operator) ProcessNewBatchLog(newBatchLog *servicemanager.ContractAligne
199201
ctx, cancel := context.WithTimeout(context.Background(), BatchDownloadTimeout)
200202
defer cancel()
201203

202-
verificationDataBatch, err := o.getBatchFromS3(ctx, newBatchLog.BatchDataPointer, newBatchLog.BatchMerkleRoot)
204+
verificationDataBatch, err := o.getBatchFromDataService(ctx, newBatchLog.BatchDataPointer, newBatchLog.BatchMerkleRoot, BatchDownloadMaxRetries, BatchDownloadRetryDelay)
203205
if err != nil {
204206
o.Logger.Errorf("Could not get proofs from S3 bucket: %v", err)
205207
return err

operator/pkg/s3.go

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,25 +5,58 @@ import (
55
"fmt"
66
"io"
77
"net/http"
8+
"time"
89

910
"github.com/ugorji/go/codec"
1011

1112
"github.com/yetanotherco/aligned_layer/operator/merkle_tree"
1213
)
1314

14-
func (o *Operator) getBatchFromS3(ctx context.Context, batchURL string, expectedMerkleRoot [32]byte) ([]VerificationData, error) {
15-
o.Logger.Infof("Getting batch from S3..., batchURL: %s", batchURL)
15+
func (o *Operator) getBatchFromDataService(ctx context.Context, batchURL string, expectedMerkleRoot [32]byte, maxRetries int, retryDelay time.Duration) ([]VerificationData, error) {
16+
o.Logger.Infof("Getting batch from data service, batchURL: %s", batchURL)
17+
18+
var resp *http.Response
19+
var err error
20+
var req *http.Request
21+
22+
for attempt := 0; attempt < maxRetries; attempt++ {
23+
if attempt > 0 {
24+
o.Logger.Infof("Waiting for %s before retrying data fetch (attempt %d of %d)", retryDelay, attempt+1, maxRetries)
25+
select {
26+
case <-time.After(retryDelay):
27+
// Wait before retrying
28+
case <-ctx.Done():
29+
return nil, ctx.Err()
30+
}
31+
retryDelay *= 2 // Exponential backoff. Ex: 5s, 10s, 20s
32+
}
1633

17-
req, err := http.NewRequestWithContext(ctx, "GET", batchURL, nil)
34+
req, err = http.NewRequestWithContext(ctx, "GET", batchURL, nil)
35+
if err != nil {
36+
return nil, err
37+
}
1838

19-
if err != nil {
20-
return nil, err
39+
resp, err = http.DefaultClient.Do(req)
40+
if err == nil && resp.StatusCode == http.StatusOK {
41+
break // Successful request, exit retry loop
42+
}
43+
44+
if resp != nil {
45+
err := resp.Body.Close()
46+
if err != nil {
47+
return nil, err
48+
}
49+
}
50+
51+
o.Logger.Warnf("Error fetching batch from data service - (attempt %d): %v", attempt+1, err)
2152
}
2253

23-
resp, err := http.DefaultClient.Do(req)
2454
if err != nil {
2555
return nil, err
2656
}
57+
58+
// At this point, the HTTP request was successfull.
59+
2760
defer func(Body io.ReadCloser) {
2861
err := Body.Close()
2962
if err != nil {
@@ -33,7 +66,7 @@ func (o *Operator) getBatchFromS3(ctx context.Context, batchURL string, expected
3366

3467
// Check if the response is OK
3568
if resp.StatusCode != http.StatusOK {
36-
return nil, fmt.Errorf("error getting Proof Head from S3: %s", resp.Status)
69+
return nil, fmt.Errorf("error getting batch from data service: %s", resp.Status)
3770
}
3871

3972
contentLength := resp.ContentLength
@@ -58,8 +91,8 @@ func (o *Operator) getBatchFromS3(ctx context.Context, batchURL string, expected
5891

5992
// Checks if downloaded merkle root is the same as the expected one
6093
o.Logger.Infof("Verifying batch merkle tree...")
61-
merkle_root_check := merkle_tree.VerifyMerkleTreeBatch(batchBytes, uint(len(batchBytes)), expectedMerkleRoot)
62-
if !merkle_root_check {
94+
merkleRootCheck := merkle_tree.VerifyMerkleTreeBatch(batchBytes, uint(len(batchBytes)), expectedMerkleRoot)
95+
if !merkleRootCheck {
6396
return nil, fmt.Errorf("merkle root check failed")
6497
}
6598
o.Logger.Infof("Batch merkle tree verified")

0 commit comments

Comments
 (0)