Skip to content

Commit e02e66f

Browse files
committed
Fix headerByte function, add read retries
1 parent d8b684e commit e02e66f

File tree

5 files changed

+188
-62
lines changed

5 files changed

+188
-62
lines changed

cmd/celestiadaserver.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
flag "github.com/spf13/pflag"
1414

15+
"github.com/ethereum/go-ethereum/log"
1516
gethlog "github.com/ethereum/go-ethereum/log"
1617
"github.com/ethereum/go-ethereum/metrics"
1718
"github.com/ethereum/go-ethereum/metrics/exp"
@@ -192,6 +193,7 @@ func startup() error {
192193
celestiaWriter = celestiaDA
193194

194195
if serverConfig.FallbackEnabled {
196+
log.Info("Creating client with DAS Fallback", "fallbackEnabled", serverConfig.FallbackEnabled)
195197
clientConfig := serverConfig.DasClientConfig.RPC
196198
client, err := daclient.NewClient(ctx, func() *rpcclient.ClientConfig { return &clientConfig })
197199
if err != nil {
@@ -203,6 +205,7 @@ func startup() error {
203205
panic(fmt.Sprintf("Failed to create client: %v", err))
204206
}
205207
} else {
208+
log.Info("Creating client without DAS Fallback", "fallbackEnabled", serverConfig.FallbackEnabled)
206209
rpcServer, err = das.StartDASRPCServer(ctx, serverConfig.RPCAddr, serverConfig.RPCPort, serverConfig.RPCServerTimeouts, serverConfig.RPCServerBodyLimit, celestiaReader, celestiaWriter, nil, false)
207210
if err != nil {
208211
return err

daserver/celestia.go

Lines changed: 145 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"errors"
99
"fmt"
1010
"math/big"
11+
"math/rand"
1112
"os"
1213
"strings"
1314
"sync"
@@ -16,11 +17,13 @@ import (
1617
txclient "github.com/celestiaorg/celestia-node/api/client"
1718
node "github.com/celestiaorg/celestia-node/api/rpc/client"
1819
"github.com/celestiaorg/celestia-node/blob"
20+
"github.com/celestiaorg/celestia-node/header"
1921
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
2022
"github.com/celestiaorg/celestia-node/state"
2123
libshare "github.com/celestiaorg/go-square/v3/share"
2224
"github.com/celestiaorg/nitro-das-celestia/celestiagen"
2325
"github.com/celestiaorg/nitro-das-celestia/daserver/types"
26+
"github.com/celestiaorg/rsmt2d"
2427
"github.com/ethereum/go-ethereum/accounts/abi/bind"
2528
"github.com/ethereum/go-ethereum/common"
2629
"github.com/ethereum/go-ethereum/crypto"
@@ -33,27 +36,49 @@ import (
3336
)
3437

3538
type DAConfig struct {
36-
WithWriter bool `koanf:"with-writer"`
37-
GasPrice float64 `koanf:"gas-price" reload:"hot"`
38-
GasMultiplier float64 `koanf:"gas-multiplier" reload:"hot"`
39-
Rpc string `koanf:"rpc" reload:"hot"`
40-
ReadRpc string `koanf:"read-rpc" reload:"hot"`
41-
NamespaceId string `koanf:"namespace-id" `
42-
AuthToken string `koanf:"auth-token" reload:"hot"`
43-
ReadAuthToken string `koanf:"read-auth-token" reload:"hot"`
44-
CoreToken string `koanf:"core-token" reload:"hot"`
45-
CoreURL string `koanf:"core-url" reload:"hot"`
46-
CoreNetwork string `koanf:"core-network" reload:"hot"`
47-
KeyName string `koanf:"key-name" reload:"hot"`
48-
KeyPath string `koanf:"key-path" reload:"hot"`
49-
BackendName string `koanf:"backend-name" reload:"hot"`
50-
NoopWriter bool `koanf:"noop-writer" reload:"hot"`
51-
EnableDATLS bool `koanf:"enable-da-tls" reload:"hot"`
52-
EnableCoreTLS bool `koanf:"enable-core-tls" reload:"hot"`
53-
ValidatorConfig ValidatorConfig `koanf:"validator-config" reload:"hot"`
54-
ReorgOnReadFailure bool `koanf:"dangerous-reorg-on-read-failure"`
55-
CacheCleanupTime time.Duration `koanf:"cache-time"`
56-
ExperimentalTxClient bool `koanf:"experimental-tx-client"`
39+
WithWriter bool `koanf:"with-writer"`
40+
GasPrice float64 `koanf:"gas-price" reload:"hot"`
41+
GasMultiplier float64 `koanf:"gas-multiplier" reload:"hot"`
42+
Rpc string `koanf:"rpc" reload:"hot"`
43+
ReadRpc string `koanf:"read-rpc" reload:"hot"`
44+
NamespaceId string `koanf:"namespace-id" `
45+
AuthToken string `koanf:"auth-token" reload:"hot"`
46+
ReadAuthToken string `koanf:"read-auth-token" reload:"hot"`
47+
CoreToken string `koanf:"core-token" reload:"hot"`
48+
CoreURL string `koanf:"core-url" reload:"hot"`
49+
CoreNetwork string `koanf:"core-network" reload:"hot"`
50+
KeyName string `koanf:"key-name" reload:"hot"`
51+
KeyPath string `koanf:"key-path" reload:"hot"`
52+
BackendName string `koanf:"backend-name" reload:"hot"`
53+
NoopWriter bool `koanf:"noop-writer" reload:"hot"`
54+
EnableDATLS bool `koanf:"enable-da-tls" reload:"hot"`
55+
EnableCoreTLS bool `koanf:"enable-core-tls" reload:"hot"`
56+
ValidatorConfig ValidatorConfig `koanf:"validator-config" reload:"hot"`
57+
ReorgOnReadFailure bool `koanf:"dangerous-reorg-on-read-failure"`
58+
CacheCleanupTime time.Duration `koanf:"cache-time"`
59+
ExperimentalTxClient bool `koanf:"experimental-tx-client"`
60+
RetryConfig RetryBackoffConfig `koanf:"retry-config"`
61+
}
62+
63+
type RetryBackoffConfig struct {
64+
MaxRetries int `koanf:"max-retries"`
65+
InitialBackoff time.Duration `koanf:"initial-backoff"`
66+
MaxBackoff time.Duration `koanf:"max-backoff"`
67+
BackoffFactor float64 `koanf:"backoff-factor"`
68+
}
69+
70+
func CelestiaRetryConfigAddOptions(prefix string, f *pflag.FlagSet) {
71+
f.Int(prefix+".max-retries", 5, "maximum number of retry attempts")
72+
f.Duration(prefix+".initial-backoff", 5*time.Second, "initial backoff duration for retries")
73+
f.Duration(prefix+".max-backoff", 60*time.Second, "maximum backoff duration for retries")
74+
f.Float64(prefix+".backoff-factor", 2.0, "exponential backoff multiplier")
75+
}
76+
77+
var DefaultCelestiaRetryConfig = RetryBackoffConfig{
78+
MaxRetries: 5,
79+
InitialBackoff: 10 * time.Second,
80+
MaxBackoff: 120 * time.Second,
81+
BackoffFactor: 2.0,
5782
}
5883

5984
type ValidatorConfig struct {
@@ -132,10 +157,9 @@ func CelestiaDAConfigAddOptions(prefix string, f *pflag.FlagSet) {
132157
f.Int(prefix+".validator-config"+".sleep-time", 3600, "How many seconds to wait before initiating another filtering loop for Blobstream events")
133158
f.Bool(prefix+".dangerous-reorg-on-read-failure", false, "DANGEROUS: reorg if any error during reads from celestia node")
134159
f.Duration(prefix+".cache-time", time.Hour/2, "how often to clean the in memory cache")
160+
CelestiaRetryConfigAddOptions(".retry-config", f)
135161
}
136162

137-
// DefaultKeyringPath constructs the default keyring path using the given
138-
// node type and network.
139163
var DefaultKeyringPath = func(tp string, network string) (string, error) {
140164
home := os.Getenv("CELESTIA_HOME")
141165
if home != "" {
@@ -472,56 +496,123 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error)
472496
}
473497

474498
func (c *CelestiaDA) Read(ctx context.Context, blobPointer *types.BlobPointer) (*types.ReadResult, error) {
475-
header, err := c.ReadClient.Header.GetByHeight(ctx, blobPointer.BlockHeight)
499+
500+
log.Info("reading blob pointer",
501+
"blockHeight", blobPointer.BlockHeight,
502+
"start", blobPointer.Start,
503+
"sharesLength", blobPointer.SharesLength,
504+
"dataRoot", hex.EncodeToString(blobPointer.DataRoot[:]),
505+
"txCommitment", hex.EncodeToString(blobPointer.TxCommitment[:]),
506+
)
507+
508+
// Add timeout to the context
509+
ctx, cancel := context.WithTimeout(ctx, 2*time.Minute)
510+
defer cancel()
511+
512+
// Helper function for retrying with exponential backoff
513+
retryWithBackoff := func(operation func() error) error {
514+
backoff := c.Cfg.RetryConfig.InitialBackoff
515+
for attempt := 0; attempt < c.Cfg.RetryConfig.MaxRetries; attempt++ {
516+
err := operation()
517+
if err == nil {
518+
return nil
519+
}
520+
521+
// Check if context is cancelled
522+
if ctx.Err() != nil {
523+
return fmt.Errorf("context cancelled: %w", ctx.Err())
524+
}
525+
526+
// Last attempt, don't wait
527+
if attempt == c.Cfg.RetryConfig.MaxRetries-1 {
528+
return fmt.Errorf("max retries exceeded: %w", err)
529+
}
530+
531+
log.Warn("operation failed, retrying...", "attempt", attempt+1, "backoff", backoff, "err", err)
532+
533+
// Wait with backoff
534+
select {
535+
case <-time.After(backoff):
536+
// Exponential backoff with jitter
537+
backoff = time.Duration(float64(backoff) * c.Cfg.RetryConfig.BackoffFactor)
538+
if backoff > c.Cfg.RetryConfig.MaxBackoff {
539+
backoff = c.Cfg.RetryConfig.MaxBackoff
540+
}
541+
// Add jitter (±20%)
542+
jitter := time.Duration(rand.Float64()*0.4*float64(backoff)) - time.Duration(0.2*float64(backoff))
543+
backoff += jitter
544+
case <-ctx.Done():
545+
return fmt.Errorf("context cancelled during backoff: %w", ctx.Err())
546+
}
547+
}
548+
return fmt.Errorf("unexpected retry loop exit")
549+
}
550+
551+
// Fetch header with retry
552+
var header *header.ExtendedHeader
553+
err := retryWithBackoff(func() error {
554+
var err error
555+
header, err = c.ReadClient.Header.GetByHeight(ctx, blobPointer.BlockHeight)
556+
if err != nil {
557+
log.Warn("could not fetch header", "height", blobPointer.BlockHeight, "err", err)
558+
return err
559+
}
560+
return nil
561+
})
476562
if err != nil {
477-
log.Error("could not fetch header", "height", blobPointer.BlockHeight, "err", err)
478-
return nil, err
563+
return nil, fmt.Errorf("failed to fetch header after retries: %w", err)
479564
}
480565

566+
// Validate data root
481567
headerDataHash := [32]byte{}
482568
copy(headerDataHash[:], header.DataHash)
483569
if headerDataHash != blobPointer.DataRoot {
484570
return c.returnErrorHelper(fmt.Errorf("data Root mismatch, header.DataHash=%v, blobPointer.DataRoot=%v", header.DataHash, hex.EncodeToString(blobPointer.DataRoot[:])))
485571
}
486572

573+
// Fetch blob with retry
487574
var blobData []byte
488575
var sharesLength int
489-
BlobLoop:
490-
for {
491-
select {
492-
case <-ctx.Done():
493-
return c.returnErrorHelper(fmt.Errorf("context cancelled or deadline exceeded"))
494-
default:
495-
blob, err := c.ReadClient.Blob.Get(ctx, blobPointer.BlockHeight, *c.Namespace, blobPointer.TxCommitment[:])
496-
if err != nil {
497-
log.Warn("failed to read blob, retrying...", "err", err)
498-
continue
499-
}
500-
blob.Index()
501-
blob.Length()
502-
blobData = blob.Data()
503-
length, err := blob.Length()
504-
if err != nil || length == 0 {
505-
celestiaFailureCounter.Inc(1)
506-
log.Warn("could not get shares length for blob", "err", err)
507-
if err == nil {
508-
err = fmt.Errorf("blob found, but has shares length zero")
509-
}
510-
return nil, err
511-
}
512-
sharesLength = length
513-
break BlobLoop
576+
err = retryWithBackoff(func() error {
577+
blob, err := c.ReadClient.Blob.Get(ctx, blobPointer.BlockHeight, *c.Namespace, blobPointer.TxCommitment[:])
578+
if err != nil {
579+
return err
514580
}
581+
582+
blob.Index()
583+
blobData = blob.Data()
584+
length, err := blob.Length()
585+
if err != nil {
586+
return fmt.Errorf("could not get shares length: %w", err)
587+
}
588+
if length == 0 {
589+
return fmt.Errorf("blob found, but has shares length zero")
590+
}
591+
sharesLength = length
592+
return nil
593+
})
594+
if err != nil {
595+
celestiaFailureCounter.Inc(1)
596+
return nil, fmt.Errorf("failed to read blob after retries: %w", err)
515597
}
516598

517-
extendedSquare, err := c.ReadClient.Share.GetEDS(ctx, blobPointer.BlockHeight)
599+
// Fetch EDS with retry
600+
var extendedSquare *rsmt2d.ExtendedDataSquare
601+
err = retryWithBackoff(func() error {
602+
var err error
603+
extendedSquare, err = c.ReadClient.Share.GetEDS(ctx, blobPointer.BlockHeight)
604+
if err != nil {
605+
return fmt.Errorf("failed to get EDS, height=%v: %w", blobPointer.BlockHeight, err)
606+
}
607+
return nil
608+
})
518609
if err != nil {
519-
return c.returnErrorHelper(fmt.Errorf("failed to get EDS, height=%v, err=%v", blobPointer.BlockHeight, err))
610+
return c.returnErrorHelper(err)
520611
}
521612

613+
// Validation logic (no changes needed here)
522614
squareSize := uint64(len(header.DAH.RowRoots))
523615
odsSize := squareSize / 2
524-
525616
startRow := blobPointer.Start / odsSize
526617

527618
if blobPointer.Start >= odsSize*odsSize {
@@ -569,7 +660,6 @@ BlobLoop:
569660
StartRow: startRow,
570661
EndRow: endRow,
571662
}, nil
572-
573663
}
574664

575665
func (c *CelestiaDA) GetProof(ctx context.Context, msg []byte) ([]byte, error) {

daserver/rpc_server.go

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package das
22

33
import (
44
"context"
5+
"encoding/hex"
56
"errors"
67
"fmt"
78
"net"
@@ -137,7 +138,13 @@ func (serv *CelestiaDASRPCServer) Store(ctx context.Context, message hexutil.Byt
137138
}
138139

139140
func (serv *CelestiaDASRPCServer) Read(ctx context.Context, blobPointer *types.BlobPointer) (*types.ReadResult, error) {
140-
log.Trace("celestiaDasRpc.CelestiaDASRPCServer.Read", "blob pointer", blobPointer, "this", serv)
141+
log.Info("CelestiaDASRPCServer.Read",
142+
"blockHeight", blobPointer.BlockHeight,
143+
"start", blobPointer.Start,
144+
"sharesLength", blobPointer.SharesLength,
145+
"dataRoot", hex.EncodeToString(blobPointer.DataRoot[:]),
146+
"txCommitment", hex.EncodeToString(blobPointer.TxCommitment[:]),
147+
)
141148
rpcReadRequestGauge.Inc(1)
142149
start := time.Now()
143150
success := false
@@ -190,25 +197,46 @@ func (serv *DaClientServer) RecoverPayloadFromBatch(
190197
preimages daprovider.PreimagesMap,
191198
validateSeqMsg bool,
192199
) (*types.RecoverPayloadFromBatchResult, error) {
200+
log.Info("CelestiaDASRPCServer.RecoverPayloadFromBatch",
201+
"batchNum", batchNum,
202+
"batchBlockHash", batchBlockHash,
203+
"sequencerMsg", sequencerMsg,
204+
)
193205
// check the header byte before sending out the call
194206
headerByte := sequencerMsg[40]
195207
if IsCelestiaMessageHeaderByte(headerByte) {
208+
log.Info("CelestiaDASRPCServer.RecoverPayloadFromBatch", "celestiaHeaderByte", headerByte)
196209
payload, preimages, err := serv.reader.RecoverPayloadFromBatch(ctx, uint64(batchNum), batchBlockHash, sequencerMsg, preimages, validateSeqMsg)
197210
if err != nil {
211+
log.Error("failed to recover payload from Celestia batch",
212+
"batchNum", batchNum,
213+
"batchBlockHash", batchBlockHash,
214+
"sequencerMsg", sequencerMsg,
215+
"validateSeqMsg", validateSeqMsg,
216+
"err", err)
198217
return nil, err
199218
}
219+
log.Info("Recovered Payload from Celestia batch", "len(payload)", len(payload), "len(preimages)", len(preimages))
200220
return &types.RecoverPayloadFromBatchResult{
201221
Payload: payload,
202222
Preimages: preimages,
203223
}, nil
204224
} else if daprovider.IsDASMessageHeaderByte(headerByte) {
225+
log.Info("CelestiaDASRPCServer.RecoverPayloadFromBatch", "dasHeaderByte", headerByte)
205226
if serv.dasClient == nil {
206227
return nil, fmt.Errorf("found DAS Message header Byte, but das client for fallback not enabled on server")
207228
}
208229
payload, preimages, err := serv.dasClient.RecoverPayloadFromBatch(ctx, uint64(batchNum), batchBlockHash, sequencerMsg, preimages, validateSeqMsg)
209230
if err != nil {
231+
log.Error("failed to recover payload from DAS batch",
232+
"batchNum", batchNum,
233+
"batchBlockHash", batchBlockHash,
234+
"sequencerMsg", sequencerMsg,
235+
"validateSeqMsg", validateSeqMsg,
236+
"err", err)
210237
return nil, err
211238
}
239+
log.Info("Recovered Payload from DAS batch", "len(payload)", len(payload), "len(preimages)", len(preimages))
212240
return &types.RecoverPayloadFromBatchResult{
213241
Payload: payload,
214242
Preimages: preimages,
@@ -219,13 +247,18 @@ func (serv *DaClientServer) RecoverPayloadFromBatch(
219247
}
220248

221249
func (serv *DaClientServer) IsValidHeaderByte(ctx context.Context, headerByte byte) (*types.IsValidHeaderByteResult, error) {
250+
log.Info("Checking valid header byte", "headerByte", headerByte)
222251
valid := false
223252
if serv.reader != nil {
224253
valid = serv.reader.IsValidHeaderByte(ctx, headerByte)
225-
} else if serv.fallback && serv.dasClient != nil {
226-
serv.dasClient.IsValidHeaderByte(ctx, headerByte)
227-
valid = true
254+
log.Info("Got response from CelestiaServer", "headerByteValid", valid)
228255
}
256+
257+
if serv.fallback && serv.dasClient != nil && !valid {
258+
valid = serv.dasClient.IsValidHeaderByte(ctx, headerByte)
259+
log.Info("Got response from DasServer", "headerByteValid", valid)
260+
}
261+
log.Info("Header byte validity", "valid", valid)
229262
return &types.IsValidHeaderByteResult{IsValid: valid}, nil
230263
}
231264

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -406,9 +406,9 @@ replace (
406406
github.com/cometbft/cometbft => github.com/celestiaorg/celestia-core v0.39.4
407407
// Only keep the essential replace directives that don't cause conflicts
408408
github.com/cosmos/cosmos-sdk => github.com/celestiaorg/cosmos-sdk v0.51.2
409+
github.com/cosmos/ibc-go/v8 => github.com/celestiaorg/ibc-go/v8 v8.7.2
409410
github.com/ethereum/go-ethereum => github.com/Ferret-san/go-ethereum v1.13.4-0.20250619005312-0488d1d7bc61
410411
github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1
411-
github.com/cosmos/ibc-go/v8 => github.com/celestiaorg/ibc-go/v8 v8.7.2
412412

413413
// Local bridgegen replacement
414414
github.com/offchainlabs/nitro/solgen/go/bridgegen => ./daserver/bridgegen

0 commit comments

Comments
 (0)