Skip to content

Commit b20017c

Browse files
Merge pull request #140 from Layr-Labs/kira--rebase-v3.8.0
Merges upstream v3.8.0 (from v3.8.0-rc.16)
2 parents 82548b8 + d4dd4f9 commit b20017c

File tree

9 files changed

+98
-124
lines changed

9 files changed

+98
-124
lines changed

arbnode/inbox_tracker.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,8 +318,7 @@ func (t *InboxTracker) PopulateFeedBacklog(broadcastServer *broadcaster.Broadcas
318318
}
319319
feedMessages = append(feedMessages, feedMessage)
320320
}
321-
broadcastServer.BroadcastFeedMessages(feedMessages)
322-
return nil
321+
return broadcastServer.PopulateFeedBacklog(feedMessages)
323322
}
324323

325324
func (t *InboxTracker) legacyGetDelayedMessageAndAccumulator(ctx context.Context, seqNum uint64) (*arbostypes.L1IncomingMessage, common.Hash, error) {

arbnode/node.go

Lines changed: 50 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,6 @@ import (
99
"errors"
1010
"fmt"
1111
"math/big"
12-
"os"
13-
"path"
14-
"path/filepath"
1512
"strings"
1613

1714
"github.com/spf13/pflag"
@@ -36,12 +33,11 @@ import (
3633
"github.com/offchainlabs/nitro/broadcastclients"
3734
"github.com/offchainlabs/nitro/broadcaster"
3835
"github.com/offchainlabs/nitro/cmd/chaininfo"
39-
"github.com/offchainlabs/nitro/cmd/genericconf"
4036
"github.com/offchainlabs/nitro/daprovider"
4137
"github.com/offchainlabs/nitro/daprovider/daclient"
4238
"github.com/offchainlabs/nitro/daprovider/das"
4339
"github.com/offchainlabs/nitro/daprovider/data_streaming"
44-
dapserver "github.com/offchainlabs/nitro/daprovider/server"
40+
"github.com/offchainlabs/nitro/daprovider/factory"
4541
"github.com/offchainlabs/nitro/eigenda"
4642
"github.com/offchainlabs/nitro/execution"
4743
"github.com/offchainlabs/nitro/execution/gethexec"
@@ -593,6 +589,8 @@ func getDAS(
593589

594590
var err error
595591
var daClient *daclient.Client
592+
var anytrustWriter daprovider.Writer
593+
var anytrustReader daprovider.Reader
596594
var withDAWriter bool
597595
var dasServerCloseFn func()
598596
if config.DAProvider.Enable {
@@ -603,43 +601,49 @@ func getDAS(
603601
// Only allow dawriter if batchposter is enabled
604602
withDAWriter = config.DAProvider.WithWriter && config.BatchPoster.Enable
605603
} else if config.DataAvailability.Enable {
606-
jwtPath := path.Join(filepath.Dir(stack.InstanceDir()), "dasserver-jwtsecret")
607-
if err := genericconf.TryCreatingJWTSecret(jwtPath); err != nil {
608-
return nil, nil, nil, nil, fmt.Errorf("error writing ephemeral jwtsecret of dasserver to file: %w", err)
604+
// Create AnyTrust factory
605+
daFactory, err := factory.NewDAProviderFactory(
606+
factory.ModeAnyTrust,
607+
&config.DataAvailability,
608+
nil, // referencedaCfg
609+
dataSigner,
610+
l1client,
611+
l1Reader,
612+
deployInfo.SequencerInbox,
613+
config.BatchPoster.Enable,
614+
)
615+
if err != nil {
616+
return nil, nil, nil, nil, err
609617
}
610-
log.Info("Generated ephemeral JWT secret for dasserver", "jwtPath", jwtPath)
611-
// JWTSecret is no longer needed, cleanup when returning
612-
defer func() {
613-
if err := os.Remove(jwtPath); err != nil {
614-
log.Error("error deleting generated ephemeral JWT secret of dasserver", "jwtPath", jwtPath)
615-
}
616-
}()
617618

618-
serverConfig := dapserver.DefaultDASServerConfig
619-
serverConfig.Port = 0 // Initializes server at a random available port
620-
serverConfig.DataAvailability = config.DataAvailability
621-
serverConfig.EnableDAWriter = config.BatchPoster.Enable
622-
serverConfig.JWTSecret = jwtPath
623-
withDAWriter = config.BatchPoster.Enable
624-
dasServer, closeFn, err := dapserver.NewServerForDAS(ctx, &serverConfig, dataSigner, l1client, l1Reader, deployInfo.SequencerInbox)
625-
if err != nil {
619+
if err := daFactory.ValidateConfig(); err != nil {
626620
return nil, nil, nil, nil, err
627621
}
628-
rpcClientConfig := rpcclient.DefaultClientConfig
629-
rpcClientConfig.URL = dasServer.Addr
630-
rpcClientConfig.JWTSecret = jwtPath
631622

632-
daClientConfig := config.DAProvider
633-
daClientConfig.RPC = rpcClientConfig
623+
// Create writer if batch poster is enabled
624+
var writerCleanup func()
625+
if config.BatchPoster.Enable {
626+
anytrustWriter, writerCleanup, err = daFactory.CreateWriter(ctx)
627+
if err != nil {
628+
return nil, nil, nil, nil, err
629+
}
630+
withDAWriter = true
631+
}
634632

635-
daClient, err = daclient.NewClient(ctx, &daClientConfig, data_streaming.PayloadCommiter())
633+
// Create reader
634+
var readerCleanup func()
635+
anytrustReader, readerCleanup, err = daFactory.CreateReader(ctx)
636636
if err != nil {
637637
return nil, nil, nil, nil, err
638638
}
639+
640+
// Set up cleanup function
639641
dasServerCloseFn = func() {
640-
_ = dasServer.Shutdown(ctx)
641-
if closeFn != nil {
642-
closeFn()
642+
if writerCleanup != nil {
643+
writerCleanup()
644+
}
645+
if readerCleanup != nil {
646+
readerCleanup()
643647
}
644648
}
645649
} else if l2Config.ArbitrumChainParams.DataAvailabilityCommittee {
@@ -651,7 +655,7 @@ func getDAS(
651655
}
652656

653657
// We support a nil txStreamer for the pruning code
654-
if txStreamer != nil && txStreamer.chainConfig.ArbitrumChainParams.DataAvailabilityCommittee && daClient == nil {
658+
if txStreamer != nil && txStreamer.chainConfig.ArbitrumChainParams.DataAvailabilityCommittee && daClient == nil && anytrustReader == nil {
655659
return nil, nil, nil, nil, errors.New("data availability service required but unconfigured")
656660
}
657661

@@ -679,15 +683,26 @@ func getDAS(
679683
return nil, nil, nil, nil, fmt.Errorf("failed to register DA client: %w", err)
680684
}
681685
}
686+
if anytrustReader != nil {
687+
headerBytes := []byte{
688+
daprovider.DASMessageHeaderFlag,
689+
daprovider.DASMessageHeaderFlag | daprovider.TreeDASMessageHeaderFlag,
690+
}
691+
if err := dapReaders.RegisterAll(headerBytes, anytrustReader); err != nil {
692+
return nil, nil, nil, nil, fmt.Errorf("failed to register AnyTrust reader: %w", err)
693+
}
694+
}
682695
if blobReader != nil {
683696
if err := dapReaders.SetupBlobReader(daprovider.NewReaderForBlobReader(blobReader)); err != nil {
684697
return nil, nil, nil, nil, fmt.Errorf("failed to register blob reader: %w", err)
685698
}
686699
}
687-
// AnyTrust now always uses the daClient, which is already registered,
688-
// so we don't need to register it separately here.
689700

690701
if withDAWriter {
702+
// Return anytrustWriter if it exists, otherwise daClient
703+
if anytrustWriter != nil {
704+
return anytrustWriter, dasServerCloseFn, dapReaders, eigenDAWriter, nil
705+
}
691706
return daClient, dasServerCloseFn, dapReaders, eigenDAWriter, nil
692707
}
693708
return nil, dasServerCloseFn, dapReaders, eigenDAWriter, nil

broadcaster/broadcaster.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,14 @@ func (b *Broadcaster) BroadcastFeedMessages(messages []*m.BroadcastFeedMessage)
128128
b.server.Broadcast(bm)
129129
}
130130

131+
func (s *Broadcaster) PopulateFeedBacklog(messages []*m.BroadcastFeedMessage) error {
132+
bm := &m.BroadcastMessage{
133+
Version: 1,
134+
Messages: messages,
135+
}
136+
return s.server.PopulateFeedBacklog(bm)
137+
}
138+
131139
func (b *Broadcaster) Confirm(msgIdx arbutil.MessageIndex) {
132140
log.Debug("confirming msgIdx", "msgIdx", msgIdx)
133141
b.server.Broadcast(&m.BroadcastMessage{

cmd/chaininfo/arbitrum_chain_info.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@
254254
"bridge": "0x38f918D0E9F1b721EDaA41302E399fa1B79333a9",
255255
"inbox": "0xaAe29B0366299461418F5324a79Afc425BE5ae21",
256256
"sequencer-inbox": "0x6c97864CE4bEf387dE0b3310A44230f7E3F1be0D",
257-
"rollup": "0xd80810638dbDF9081b72C1B33c65375e807281C8",
257+
"rollup": "0x042B2E6C5E99d4c521bd49beeD5E99651D9B0Cf4",
258258
"validator-utils": "0x1f6860C3cac255fFFa72B7410b1183c3a0D261e0",
259259
"validator-wallet-creator": "0x894fC71fA0A666352824EC954B401573C861D664",
260260
"stake-token": "0xefb383126640fe4a760010c6e59c397d2b6c7141",

daprovider/data_streaming/protocol_test.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package data_streaming
55

66
import (
77
"context"
8-
"errors"
98
"math/rand"
109
"net"
1110
"net/http"
@@ -169,23 +168,6 @@ func TestDataStreaming_ProtocolSucceedsEvenWithDelays(t *testing.T) {
169168
require.Equal(t, message, ([]byte)(result.Message), "protocol resulted in an incorrect message")
170169
}
171170

172-
func TestDataStreaming_ClientRetriesWhenThereAreConnectionProblems(t *testing.T) {
173-
// Server 'goes offline' for a moment just before reading the second chunk
174-
var alreadyWentOffline = false
175-
ctx, streamer := prepareTestEnv(t, func(i uint64) error {
176-
if i == 1 && !alreadyWentOffline {
177-
alreadyWentOffline = true
178-
return errors.New("service unavailable")
179-
}
180-
return nil
181-
182-
})
183-
message, _ := getLongRandomMessage(streamer.chunkSize)
184-
result, err := streamer.StreamData(ctx, message, timeout)
185-
testhelpers.RequireImpl(t, err)
186-
require.Equal(t, message, ([]byte)(result.Message), "protocol resulted in an incorrect message")
187-
}
188-
189171
func TestDataStreaming_ServerDeniesTooOldAndFutureRequests(t *testing.T) {
190172
ctx, streamer := prepareTestEnv(t, nil)
191173
message, _ := getLongRandomMessage(streamer.chunkSize)

daprovider/data_streaming/sender.go

Lines changed: 9 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -19,60 +19,42 @@ import (
1919
)
2020

2121
const (
22-
DefaultHttpBodyLimit = 5 * 1024 * 1024 // Taken from go-ethereum http.defaultBodyLimit
23-
TestHttpBodyLimit = 1024
24-
DefaultBaseRetryDelay = 2 * time.Second
25-
DefaultMaxRetryDelay = 1 * time.Minute
26-
DefaultMaxRetryAttempts = 5
22+
DefaultHttpBodyLimit = 5 * 1024 * 1024 // Taken from go-ethereum http.defaultBodyLimit
23+
TestHttpBodyLimit = 1024
2724
)
2825

2926
// lint:require-exhaustive-initialization
3027
type DataStreamerConfig struct {
3128
MaxStoreChunkBodySize int `koanf:"max-store-chunk-body-size"`
3229
RpcMethods DataStreamingRPCMethods `koanf:"rpc-methods"`
33-
34-
// Retry policy for RPC calls
35-
BaseRetryDelay time.Duration `koanf:"base-retry-delay"`
36-
MaxRetryDelay time.Duration `koanf:"max-retry-delay"`
37-
MaxRetryAttempts int `koanf:"max-retry-attempts"`
3830
}
3931

4032
func DefaultDataStreamerConfig(rpcMethods DataStreamingRPCMethods) DataStreamerConfig {
4133
return DataStreamerConfig{
4234
MaxStoreChunkBodySize: DefaultHttpBodyLimit,
4335
RpcMethods: rpcMethods,
44-
BaseRetryDelay: DefaultBaseRetryDelay,
45-
MaxRetryDelay: DefaultMaxRetryDelay,
46-
MaxRetryAttempts: DefaultMaxRetryAttempts,
4736
}
4837
}
4938

5039
func TestDataStreamerConfig(rpcMethods DataStreamingRPCMethods) DataStreamerConfig {
5140
return DataStreamerConfig{
5241
MaxStoreChunkBodySize: TestHttpBodyLimit,
5342
RpcMethods: rpcMethods,
54-
BaseRetryDelay: 100 * time.Millisecond,
55-
MaxRetryDelay: 100 * time.Millisecond,
56-
MaxRetryAttempts: 3,
5743
}
5844
}
5945

6046
func DataStreamerConfigAddOptions(prefix string, f *pflag.FlagSet, defaultRpcMethods DataStreamingRPCMethods) {
6147
f.Int(prefix+".max-store-chunk-body-size", DefaultHttpBodyLimit, "maximum HTTP body size for chunked store requests")
62-
f.Duration(prefix+".base-retry-delay", DefaultBaseRetryDelay, "base delay for retrying failed RPC calls")
63-
f.Duration(prefix+".max-retry-delay", DefaultMaxRetryDelay, "maximum delay for retrying failed RPC calls")
64-
f.Int(prefix+".max-retry-attempts", DefaultMaxRetryAttempts, "maximum number of attempts for retrying failed RPC calls")
6548
DataStreamingRPCMethodsAddOptions(prefix+".rpc-methods", f, defaultRpcMethods)
6649
}
6750

6851
// DataStreamer allows sending arbitrarily big payloads with JSON RPC. It follows a simple chunk-based protocol.
6952
// lint:require-exhaustive-initialization
7053
type DataStreamer[Result any] struct {
71-
rpcClient *rpcclient.RpcClient
72-
chunkSize uint64
73-
dataSigner *PayloadSigner
74-
rpcMethods DataStreamingRPCMethods
75-
retryDelayPolicy *expDelayPolicy
54+
rpcClient *rpcclient.RpcClient
55+
chunkSize uint64
56+
dataSigner *PayloadSigner
57+
rpcMethods DataStreamingRPCMethods
7658
}
7759

7860
// DataStreamingRPCMethods configuration specifies names of the protocol's RPC methods on the server side.
@@ -104,11 +86,6 @@ func NewDataStreamer[T any](config DataStreamerConfig, dataSigner *PayloadSigner
10486
chunkSize: chunkSize,
10587
dataSigner: dataSigner,
10688
rpcMethods: config.RpcMethods,
107-
retryDelayPolicy: &expDelayPolicy{
108-
baseDelay: config.BaseRetryDelay,
109-
maxDelay: config.MaxRetryDelay,
110-
maxAttempts: config.MaxRetryAttempts,
111-
},
11289
}, nil
11390
}
11491

@@ -144,7 +121,7 @@ func (ds *DataStreamer[Result]) startStream(ctx context.Context, params streamPa
144121
}
145122

146123
var result StartStreamingResult
147-
err = ds.call(
124+
err = ds.rpcClient.CallContext(
148125
ctx,
149126
&result,
150127
ds.rpcMethods.StartStream,
@@ -172,51 +149,18 @@ func (ds *DataStreamer[Result]) sendChunk(ctx context.Context, messageId Message
172149
if err != nil {
173150
return err
174151
}
175-
return ds.call(ctx, nil, ds.rpcMethods.StreamChunk, hexutil.Uint64(messageId), hexutil.Uint64(chunkId), hexutil.Bytes(chunkData), hexutil.Bytes(payloadSignature))
152+
return ds.rpcClient.CallContext(ctx, nil, ds.rpcMethods.StreamChunk, hexutil.Uint64(messageId), hexutil.Uint64(chunkId), hexutil.Bytes(chunkData), hexutil.Bytes(payloadSignature))
176153
}
177154

178155
func (ds *DataStreamer[Result]) finalizeStream(ctx context.Context, messageId MessageId) (result *Result, err error) {
179156
payloadSignature, err := ds.sign(nil, uint64(messageId))
180157
if err != nil {
181158
return nil, err
182159
}
183-
err = ds.call(ctx, &result, ds.rpcMethods.FinalizeStream, hexutil.Uint64(messageId), hexutil.Bytes(payloadSignature))
160+
err = ds.rpcClient.CallContext(ctx, &result, ds.rpcMethods.FinalizeStream, hexutil.Uint64(messageId), hexutil.Bytes(payloadSignature))
184161
return
185162
}
186163

187-
type expDelayPolicy struct {
188-
baseDelay, maxDelay time.Duration
189-
maxAttempts int
190-
}
191-
192-
func (e *expDelayPolicy) NextDelay(attempt int) (time.Duration, bool) {
193-
if attempt >= e.maxAttempts {
194-
return 0, false
195-
}
196-
if attempt <= 0 {
197-
return time.Duration(0), true
198-
}
199-
200-
delay := e.baseDelay * time.Duration(1<<uint(attempt-1)) // nolint:gosec
201-
if delay > e.maxDelay {
202-
delay = e.maxDelay
203-
}
204-
return delay, true
205-
}
206-
207-
func (ds *DataStreamer[Result]) call(ctx context.Context, result interface{}, method string, args ...interface{}) (err error) {
208-
for attempt := 1; ; attempt++ {
209-
if err = ds.rpcClient.CallContext(ctx, result, method, args...); err == nil {
210-
return nil
211-
}
212-
delay, proceed := ds.retryDelayPolicy.NextDelay(attempt)
213-
if !proceed {
214-
return fmt.Errorf("failed after %d attempts: %w", attempt, err)
215-
}
216-
time.Sleep(delay)
217-
}
218-
}
219-
220164
func (ds *DataStreamer[Result]) sign(bytes []byte, extras ...uint64) ([]byte, error) {
221165
return ds.dataSigner.signPayload(bytes, extras...)
222166
}

execution/gethexec/sync_monitor.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -278,9 +278,6 @@ func (s *SyncMonitor) SetFinalityData(
278278
finalizedFinalityData *arbutil.FinalityData,
279279
validatedFinalityData *arbutil.FinalityData,
280280
) error {
281-
s.exec.createBlocksMutex.Lock()
282-
defer s.exec.createBlocksMutex.Unlock()
283-
284281
finalizedBlockHeader, err := s.getFinalityBlockHeader(
285282
s.config.FinalizedBlockWaitForBlockValidator,
286283
validatedFinalityData,

wsbroadcastserver/clientmanager.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,31 @@ func (cm *ClientManager) Broadcast(bm *m.BroadcastMessage) {
151151
cm.broadcastChan <- bm
152152
}
153153

154+
// populateFeedBacklog adds the given BroadcastMessage to backlog, and also broadcasts it to feed if the ClientManager is started
155+
func (cm *ClientManager) populateFeedBacklog(bm *m.BroadcastMessage) error {
156+
if cm.Started() {
157+
cm.Broadcast(bm)
158+
return nil
159+
}
160+
if len(bm.Messages) == 0 {
161+
return cm.backlog.Append(bm)
162+
}
163+
for i, msg := range bm.Messages {
164+
m := &m.BroadcastMessage{
165+
Version: bm.Version,
166+
Messages: []*m.BroadcastFeedMessage{msg},
167+
}
168+
// This ensures that only one message is added to backlog with the confirmed sequence number
169+
if i == 0 {
170+
m.ConfirmedSequenceNumberMessage = bm.ConfirmedSequenceNumberMessage
171+
}
172+
if err := cm.backlog.Append(m); err != nil {
173+
return err
174+
}
175+
}
176+
return nil
177+
}
178+
154179
func (cm *ClientManager) doBroadcast(bm *m.BroadcastMessage) ([]*ClientConnection, error) {
155180
if err := cm.backlog.Append(bm); err != nil {
156181
return nil, err

0 commit comments

Comments
 (0)