diff --git a/op-avail/avail/avail.go b/op-avail/avail/avail.go new file mode 100644 index 0000000000000..76e0a675fd011 --- /dev/null +++ b/op-avail/avail/avail.go @@ -0,0 +1,90 @@ +package avail + +import ( + "fmt" + + gsrpc "github.com/centrifuge/go-substrate-rpc-client/v4" + + "github.com/ethereum/go-ethereum/log" + + config "github.com/ethereum-optimism/optimism/op-avail/internal/config" + service "github.com/ethereum-optimism/optimism/op-avail/internal/services" + types "github.com/ethereum-optimism/optimism/op-avail/internal/types" +) + +// AvailMessageHeaderFlag indicates that this data is a Blob Pointer +// which will be used to retrieve data from Avail +const AvailMessageHeaderFlag byte = 0x0a + +func IsAvailMessageHeaderByte(header byte) bool { + return (AvailMessageHeaderFlag & header) > 0 +} + +type AvailDA struct { + cfg config.DAConfig + api *gsrpc.SubstrateAPI +} + +func NewAvailDA(path string) (*AvailDA, error) { + + // Load config + var cfg config.DAConfig + err := cfg.GetConfig(path) + if err != nil { + log.Error("Unable to create config variable for op-avail") + panic(fmt.Sprintf("cannot get config:%v", err)) + } + + //Creating new substrate api + api, err := gsrpc.NewSubstrateAPI(cfg.ApiURL) + if err != nil { + return nil, err + } + + log.Info("Connected to Avail ✅") + return &AvailDA{ + cfg: cfg, + api: api, + }, nil +} + +func (a *AvailDA) SubmitTxDataAndGetRef(TxData []byte) ([]byte, error) { + log.Info("Working on batch submission for avail") + + //Checking for the size of TxData + if len(TxData) >= 512000 { + return []byte{}, fmt.Errorf("size of TxData is more than 512KB, it is higher than a single data submit transaction supports on avail") + } + + // Submitting data to Avail + avail_Blk_Ref, err := service.SubmitDataAndWatch(a.api, a.cfg, TxData) + if err != nil { + return []byte{}, fmt.Errorf("cannot submit data:%v", err) + } + + log.Info("Avail Block Reference:", "Ref", avail_Blk_Ref) + + ref_bytes_Data, err := avail_Blk_Ref.MarshalToBinary() + if err != nil { + return []byte{}, fmt.Errorf("cannot get the binary form of avail block reference:%v", err) + } + + return ref_bytes_Data, nil +} + +func (a *AvailDA) GetTxDataByDARef(RefData []byte) ([]byte, error) { + //Getting Avail block reference from callData + avail_blk_ref := types.AvailBlockRef{} + err := avail_blk_ref.UnmarshalFromBinary(RefData) + if err != nil { + return []byte{}, fmt.Errorf("failed to unmarshal the ethereum trxn data to avail block refrence, error:%v", err) + } + log.Info("Avail Block Reference:", "Ref", avail_blk_ref) + + txData, err := service.GetBlockExtrinsicData(a.api, a.cfg, avail_blk_ref) + if err != nil { + return []byte{}, fmt.Errorf("failed to get block extrinsic data, error:%v", err) + } + + return txData, nil +} diff --git a/op-avail/avail/getTxDataByDARef.go b/op-avail/avail/getTxDataByDARef.go deleted file mode 100644 index 7edd74576f676..0000000000000 --- a/op-avail/avail/getTxDataByDARef.go +++ /dev/null @@ -1,26 +0,0 @@ -package avail - -import ( - "fmt" - - service "github.com/ethereum-optimism/optimism/op-avail/internal/services" - types "github.com/ethereum-optimism/optimism/op-avail/internal/types" - "github.com/ethereum/go-ethereum/log" -) - -func GetTxDataByDARef(RefData []byte, l log.Logger) ([]byte, error) { - //Getting Avail block reference from callData - avail_blk_ref := types.AvailBlockRef{} - err := avail_blk_ref.UnmarshalFromBinary(RefData) - if err != nil { - return []byte{}, fmt.Errorf("Failed to unmarshal the ethereum trxn data to avail block refrence, error:%v", err) - } - l.Info("Avail Block Reference:", "Ref", avail_blk_ref) - - txData, err := service.GetBlockExtrinsicData(avail_blk_ref, l) - if err != nil { - return []byte{}, fmt.Errorf("Failed to get block extrinsic data, error:%v", err) - } - - return txData, nil -} diff --git a/op-avail/avail/submitTxDataAndGetRef.go b/op-avail/avail/submitTxDataAndGetRef.go deleted file mode 100644 index 915d47fbfc003..0000000000000 --- a/op-avail/avail/submitTxDataAndGetRef.go +++ /dev/null @@ -1,32 +0,0 @@ -package avail - -import ( - "fmt" - - service "github.com/ethereum-optimism/optimism/op-avail/internal/services" - "github.com/ethereum/go-ethereum/log" -) - -func SubmitTxDataAndGetRef(TxData []byte, l log.Logger) ([]byte, error) { - l.Info("Working on batch submission for avail") - - //Checking for the size of TxData - if len(TxData) >= 512000 { - return []byte{}, fmt.Errorf("size of TxData is more than 512KB, it is higher than a single data submit transaction supports on avail") - } - - // Submitting data to Avail - avail_Blk_Ref, err := service.SubmitDataAndWatch(TxData, l) - if err != nil { - return []byte{}, fmt.Errorf("cannot submit data:%v", err) - } - - l.Info("Avail Block Reference:", "Ref", avail_Blk_Ref) - - ref_bytes_Data, err := avail_Blk_Ref.MarshalToBinary() - if err != nil { - return []byte{}, fmt.Errorf("cannot get the binary form of avail block reference:%v", err) - } - - return ref_bytes_Data, nil -} diff --git a/op-avail/internal/config/config.go b/op-avail/internal/config/config.go index 7f6f080440886..48cd10c38ee03 100644 --- a/op-avail/internal/config/config.go +++ b/op-avail/internal/config/config.go @@ -6,13 +6,13 @@ import ( "os" ) -type Config struct { +type DAConfig struct { Seed string `json:"seed"` ApiURL string `json:"api_url"` AppID int `json:"app_id"` } -func (c *Config) GetConfig(configFileName string) error { +func (c *DAConfig) GetConfig(configFileName string) error { jsonFile, err := os.Open(configFileName) if err != nil { diff --git a/op-avail/internal/services/getBlockExtrinsicData.go b/op-avail/internal/services/getBlockExtrinsicData.go index 3b8b69b324bd8..3c70c85e3075b 100644 --- a/op-avail/internal/services/getBlockExtrinsicData.go +++ b/op-avail/internal/services/getBlockExtrinsicData.go @@ -14,28 +14,13 @@ import ( ) // GetBlock: To fetch the extrinsic Data from block's extrinsic by hash -func GetBlockExtrinsicData(avail_blk_ref types.AvailBlockRef, l log.Logger) ([]byte, error) { - - // Load config - var config config.Config - err := config.GetConfig("../op-avail/config.json") - if err != nil { - l.Error("Unable to create config variable for op-avail") - panic(fmt.Sprintf("cannot get config:%v", err)) - } +func GetBlockExtrinsicData(api *gsrpc.SubstrateAPI, config config.DAConfig, avail_blk_ref types.AvailBlockRef) ([]byte, error) { //Intitializing variables - ApiURL := config.ApiURL Hash := avail_blk_ref.BlockHash Address := avail_blk_ref.Sender Nonce := avail_blk_ref.Nonce - //Creating new substrate api - api, err := gsrpc.NewSubstrateAPI(ApiURL) - if err != nil { - return []byte{}, fmt.Errorf("cannot create api:%w", err) - } - // Converting this string type into gsrpc_types.hash type blk_hash, err := gsrpc_types.NewHashFromHexString(Hash) if err != nil { @@ -53,7 +38,7 @@ func GetBlockExtrinsicData(avail_blk_ref types.AvailBlockRef, l log.Logger) ([]b //Extracting sender address for extrinsic ext_Addr, err := subkey.SS58Address(ext.Signature.Signer.AsID.ToBytes(), 42) if err != nil { - l.Error("unable to get sender address from extrinsic", "err", err) + log.Error("unable to get sender address from extrinsic", "err", err) } if ext_Addr == Address && ext.Signature.Nonce.Int64() == Nonce { args := ext.Method.Args diff --git a/op-avail/internal/services/submitDataAndWatch.go b/op-avail/internal/services/submitDataAndWatch.go index 5ad068ecffdec..85a9cd5249e8e 100644 --- a/op-avail/internal/services/submitDataAndWatch.go +++ b/op-avail/internal/services/submitDataAndWatch.go @@ -15,27 +15,17 @@ import ( ) // submitData creates a transaction and makes a Avail data submission -func SubmitDataAndWatch(data []byte, l log.Logger) (types.AvailBlockRef, error) { +func SubmitDataAndWatch(api *gsrpc.SubstrateAPI, config config.DAConfig, data []byte) (types.AvailBlockRef, error) { - //Load variables - var config config.Config - err := config.GetConfig("../op-avail/config.json") - if err != nil { - l.Error("Unable to create config variable for op-avail") - panic(fmt.Sprintf("cannot get config:%v", err)) - } - - //Intitializing variables - ApiURL := config.ApiURL Seed := config.Seed AppID := config.AppID - //Creating new substrate api - api, err := gsrpc.NewSubstrateAPI(ApiURL) - if err != nil { - fmt.Printf("cannot create api: error:%v", err) - return types.AvailBlockRef{}, err - } + // //Creating new substrate api + // api, err := gsrpc.NewSubstrateAPI(ApiURL) + // if err != nil { + // fmt.Printf("cannot create api: error:%v", err) + // return types.AvailBlockRef{}, err + // } meta, err := api.RPC.State.GetMetadataLatest() if err != nil { @@ -116,7 +106,7 @@ func SubmitDataAndWatch(data []byte, l log.Logger) (types.AvailBlockRef, error) return types.AvailBlockRef{}, err } - l.Info("Tx batch is submitted to Avail", "length", len(data), "address", keyringPair.Address, "appID", appID) + log.Info("Tx batch is submitted to Avail", "length", len(data), "address", keyringPair.Address, "appID", appID) defer sub.Unsubscribe() timeout := time.After(100 * time.Second) diff --git a/op-batcher/batcher/config.go b/op-batcher/batcher/config.go index 3ac53bcf5d7b2..9d814fb7d7123 100644 --- a/op-batcher/batcher/config.go +++ b/op-batcher/batcher/config.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/urfave/cli/v2" + "github.com/ethereum-optimism/optimism/op-avail/avail" "github.com/ethereum-optimism/optimism/op-batcher/compressor" "github.com/ethereum-optimism/optimism/op-batcher/flags" "github.com/ethereum-optimism/optimism/op-batcher/metrics" @@ -25,7 +26,9 @@ type Config struct { L1Client *ethclient.Client L2Client *ethclient.Client RollupNode *sources.RollupClient - TxManager txmgr.TxManager + AvailDA *avail.AvailDA + + TxManager txmgr.TxManager NetworkTimeout time.Duration PollInterval time.Duration @@ -59,6 +62,9 @@ type CLIConfig struct { // RollupRpc is the HTTP provider URL for the L2 rollup node. RollupRpc string + // AvailConfig provides the path of config file for setting up avail da + AvailConfig string + // MaxChannelDuration is the maximum duration (in #L1-blocks) to keep a // channel open. This allows to more eagerly send batcher transactions // during times of low L2 transaction volume. Note that the effective @@ -123,6 +129,7 @@ func NewConfig(ctx *cli.Context) CLIConfig { RollupRpc: ctx.String(flags.RollupRpcFlag.Name), SubSafetyMargin: ctx.Uint64(flags.SubSafetyMarginFlag.Name), PollInterval: ctx.Duration(flags.PollIntervalFlag.Name), + AvailConfig: ctx.String(flags.AvailDAConfigFlag.Name), /* Optional Flags */ MaxPendingTransactions: ctx.Uint64(flags.MaxPendingTransactionsFlag.Name), diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 3539d5d83aee3..9fe013c0577d5 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -72,6 +72,11 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri return nil, fmt.Errorf("querying rollup config: %w", err) } + availDA, err := avail.NewAvailDA(cfg.AvailConfig) + if err != nil { + return nil, fmt.Errorf("Unable to intialize Avail DA, err:%w", err) + } + txManager, err := txmgr.NewSimpleTxManager("batcher", l, m, cfg.TxMgrConfig) if err != nil { return nil, err @@ -81,6 +86,7 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri L1Client: l1Client, L2Client: l2Client, RollupNode: rollupClient, + AvailDA: availDA, PollInterval: cfg.PollInterval, MaxPendingTransactions: cfg.MaxPendingTransactions, NetworkTimeout: cfg.TxMgrConfig.NetworkTimeout, @@ -398,7 +404,7 @@ func (l *BatchSubmitter) sendTransaction(txdata txData, queue *txmgr.Queue[txDat if l.Rollup.DAEnabled { // Submit transaction data on Data and get reference to submit on ethereum layer - refData, err := avail.SubmitTxDataAndGetRef(data, l.log) + refData, err := l.AvailDA.SubmitTxDataAndGetRef(data) if err != nil { l.log.Error("failed to submit txData on avail", "err", err) return diff --git a/op-batcher/flags/flags.go b/op-batcher/flags/flags.go index 6beb8d01ab9ef..8f264f001dc61 100644 --- a/op-batcher/flags/flags.go +++ b/op-batcher/flags/flags.go @@ -39,6 +39,11 @@ var ( Usage: "HTTP provider URL for Rollup node", EnvVars: prefixEnvVars("ROLLUP_RPC"), } + AvailDAConfigFlag = &cli.StringFlag{ + Name: "avail-da.config", + Usage: "Config for setting up Avail DA", + EnvVars: prefixEnvVars("AVAIL_DA_CONFIG"), + } // Optional flags SubSafetyMarginFlag = &cli.Uint64Flag{ Name: "sub-safety-margin", @@ -85,6 +90,7 @@ var requiredFlags = []cli.Flag{ L1EthRpcFlag, L2EthRpcFlag, RollupRpcFlag, + AvailDAConfigFlag, } var optionalFlags = []cli.Flag{ diff --git a/op-e2e/actions/l2_sequencer.go b/op-e2e/actions/l2_sequencer.go index 3d9deefcd5859..063113fec721e 100644 --- a/op-e2e/actions/l2_sequencer.go +++ b/op-e2e/actions/l2_sequencer.go @@ -39,8 +39,8 @@ type L2Sequencer struct { mockL1OriginSelector *MockL1OriginSelector } -func NewL2Sequencer(t Testing, log log.Logger, l1 derive.L1Fetcher, eng L2API, cfg *rollup.Config, seqConfDepth uint64) *L2Sequencer { - ver := NewL2Verifier(t, log, l1, eng, cfg) +func NewL2Sequencer(t Testing, log log.Logger, l1 derive.L1Fetcher, availDAFetcher derive.AvailDAFetcher, eng L2API, cfg *rollup.Config, seqConfDepth uint64) *L2Sequencer { + ver := NewL2Verifier(t, log, l1, availDAFetcher, eng, cfg) attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, eng) seqConfDepthL1 := driver.NewConfDepth(seqConfDepth, ver.l1State.L1Head, l1) l1OriginSelector := &MockL1OriginSelector{ diff --git a/op-e2e/actions/l2_verifier.go b/op-e2e/actions/l2_verifier.go index 48b02111dfe32..785525a301fe0 100644 --- a/op-e2e/actions/l2_verifier.go +++ b/op-e2e/actions/l2_verifier.go @@ -55,9 +55,9 @@ type L2API interface { GetProof(ctx context.Context, address common.Address, storage []common.Hash, blockTag string) (*eth.AccountResult, error) } -func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, eng L2API, cfg *rollup.Config) *L2Verifier { +func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, availDAFetcher derive.AvailDAFetcher, eng L2API, cfg *rollup.Config) *L2Verifier { metrics := &testutils.TestDerivationMetrics{} - pipeline := derive.NewDerivationPipeline(log, cfg, l1, eng, metrics) + pipeline := derive.NewDerivationPipeline(log, cfg, l1, availDAFetcher, eng, metrics) pipeline.Reset() rollupNode := &L2Verifier{ diff --git a/op-node/flags/flags.go b/op-node/flags/flags.go index 781daadb0294f..d897d62aff4bd 100644 --- a/op-node/flags/flags.go +++ b/op-node/flags/flags.go @@ -39,6 +39,11 @@ var ( Usage: "Rollup chain parameters", EnvVars: prefixEnvVars("ROLLUP_CONFIG"), } + AvailDAConfig = &cli.StringFlag{ + Name: "avail-da.config", + Usage: "Config for setting up Avail DA", + EnvVars: prefixEnvVars("AVAIL_DA_CONFIG"), + } Network = &cli.StringFlag{ Name: "network", Usage: fmt.Sprintf("Predefined network selection. Available networks: %s", strings.Join(chaincfg.AvailableNetworks(), ", ")), @@ -219,6 +224,7 @@ var ( var requiredFlags = []cli.Flag{ L1NodeAddr, L2EngineAddr, + AvailDAConfig, } var optionalFlags = []cli.Flag{ diff --git a/op-node/node/config.go b/op-node/node/config.go index 0491882c4ab91..b01f419799069 100644 --- a/op-node/node/config.go +++ b/op-node/node/config.go @@ -23,6 +23,8 @@ type Config struct { Rollup rollup.Config + AvailDAConfig string + // P2PSigner will be used for signing off on published content // if the node is sequencing and if the p2p stack is enabled P2PSigner p2p.SignerSetup diff --git a/op-node/node/node.go b/op-node/node/node.go index 7ec140eb1f075..14d5743377356 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -13,6 +13,7 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum-optimism/optimism/op-avail/avail" "github.com/ethereum-optimism/optimism/op-node/client" "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/metrics" @@ -32,6 +33,7 @@ type OpNode struct { l1Source *sources.L1Client // L1 Client to fetch data from l2Driver *driver.Driver // L2 Engine to Sync + AvailDA *avail.AvailDA // Avail DA to Sync l2Source *sources.EngineClient // L2 Execution Engine RPC bindings rpcSync *sources.SyncClient // Alt-sync RPC client, optional (may be nil) server *rpcServer // RPC server hosting the rollup-node API @@ -84,6 +86,12 @@ func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger) if err := n.initRuntimeConfig(ctx, cfg); err != nil { return err } + + //Initialization of Avail DA + if err := n.initAvailDA(ctx, cfg); err != nil { + return err + } + if err := n.initL2(ctx, cfg, snapshotLog); err != nil { return err } @@ -96,6 +104,7 @@ func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger) if err := n.initP2P(ctx, cfg); err != nil { return err } + // Only expose the server at the end, ensuring all RPC backend components are initialized. if err := n.initRPCServer(ctx, cfg); err != nil { return err @@ -106,6 +115,15 @@ func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger) return nil } +func (n *OpNode) initAvailDA(ctx context.Context, cfg *Config) error { + availDA, err := avail.NewAvailDA(cfg.AvailDAConfig) + if err != nil { + return err + } + n.AvailDA = availDA + return nil +} + func (n *OpNode) initTracer(ctx context.Context, cfg *Config) error { if cfg.Tracer != nil { n.tracer = cfg.Tracer @@ -199,7 +217,7 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger return err } - n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n, n, n.log, snapshotLog, n.metrics, cfg.ConfigPersistence) + n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n.AvailDA, n, n, n.log, snapshotLog, n.metrics, cfg.ConfigPersistence) return nil } diff --git a/op-node/rollup/derive/calldata_source.go b/op-node/rollup/derive/calldata_source.go index c297ac4eba159..5f029aff66f24 100644 --- a/op-node/rollup/derive/calldata_source.go +++ b/op-node/rollup/derive/calldata_source.go @@ -6,15 +6,12 @@ import ( "fmt" "io" + "github.com/ethereum-optimism/optimism/op-node/eth" + "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" - - avail "github.com/ethereum-optimism/optimism/op-avail/avail" - - "github.com/ethereum-optimism/optimism/op-node/eth" - "github.com/ethereum-optimism/optimism/op-node/rollup" ) type DataIter interface { @@ -29,18 +26,19 @@ type L1TransactionFetcher interface { // batch submitter transactions. // This is not a stage in the pipeline, but a wrapper for another stage in the pipeline type DataSourceFactory struct { - log log.Logger - cfg *rollup.Config - fetcher L1TransactionFetcher + log log.Logger + cfg *rollup.Config + fetcher L1TransactionFetcher + availDAFetcher AvailDAFetcher } -func NewDataSourceFactory(log log.Logger, cfg *rollup.Config, fetcher L1TransactionFetcher) *DataSourceFactory { - return &DataSourceFactory{log: log, cfg: cfg, fetcher: fetcher} +func NewDataSourceFactory(log log.Logger, cfg *rollup.Config, fetcher L1TransactionFetcher, availDAFetcher AvailDAFetcher) *DataSourceFactory { + return &DataSourceFactory{log: log, cfg: cfg, fetcher: fetcher, availDAFetcher: availDAFetcher} } // OpenData returns a DataIter. This struct implements the `Next` function. func (ds *DataSourceFactory) OpenData(ctx context.Context, id eth.BlockID, batcherAddr common.Address) DataIter { - return NewDataSource(ctx, ds.log, ds.cfg, ds.fetcher, id, batcherAddr) + return NewDataSource(ctx, ds.log, ds.cfg, ds.fetcher, ds.availDAFetcher, id, batcherAddr) } // DataSource is a fault tolerant approach to fetching data. @@ -51,31 +49,33 @@ type DataSource struct { open bool data []eth.Data // Required to re-attempt fetching - id eth.BlockID - cfg *rollup.Config // TODO: `DataFromEVMTransactions` should probably not take the full config - fetcher L1TransactionFetcher - log log.Logger + id eth.BlockID + cfg *rollup.Config // TODO: `DataFromEVMTransactions` should probably not take the full config + fetcher L1TransactionFetcher + availDAFetcher AvailDAFetcher + log log.Logger batcherAddr common.Address } // NewDataSource creates a new calldata source. It suppresses errors in fetching the L1 block if they occur. // If there is an error, it will attempt to fetch the result on the next call to `Next`. -func NewDataSource(ctx context.Context, log log.Logger, cfg *rollup.Config, fetcher L1TransactionFetcher, block eth.BlockID, batcherAddr common.Address) DataIter { +func NewDataSource(ctx context.Context, log log.Logger, cfg *rollup.Config, fetcher L1TransactionFetcher, availDAFetcher AvailDAFetcher, block eth.BlockID, batcherAddr common.Address) DataIter { _, txs, err := fetcher.InfoAndTxsByHash(ctx, block.Hash) if err != nil { return &DataSource{ - open: false, - id: block, - cfg: cfg, - fetcher: fetcher, - log: log, - batcherAddr: batcherAddr, + open: false, + id: block, + cfg: cfg, + fetcher: fetcher, + availDAFetcher: availDAFetcher, + log: log, + batcherAddr: batcherAddr, } } else { return &DataSource{ open: true, - data: DataFromEVMTransactions(cfg, batcherAddr, txs, log.New("origin", block)), + data: DataFromEVMTransactions(cfg, availDAFetcher, batcherAddr, txs, log.New("origin", block)), } } } @@ -87,7 +87,7 @@ func (ds *DataSource) Next(ctx context.Context) (eth.Data, error) { if !ds.open { if _, txs, err := ds.fetcher.InfoAndTxsByHash(ctx, ds.id.Hash); err == nil { ds.open = true - ds.data = DataFromEVMTransactions(ds.cfg, ds.batcherAddr, txs, log.New("origin", ds.id)) + ds.data = DataFromEVMTransactions(ds.cfg, ds.availDAFetcher, ds.batcherAddr, txs, log.New("origin", ds.id)) } else if errors.Is(err, ethereum.NotFound) { return nil, NewResetError(fmt.Errorf("failed to open calldata source: %w", err)) } else { @@ -106,7 +106,7 @@ func (ds *DataSource) Next(ctx context.Context) (eth.Data, error) { // DataFromEVMTransactions filters all of the transactions and returns the calldata from transactions // that are sent to the batch inbox address from the batch sender address. // This will return an empty array if no valid transactions are found. -func DataFromEVMTransactions(config *rollup.Config, batcherAddr common.Address, txs types.Transactions, log log.Logger) []eth.Data { +func DataFromEVMTransactions(config *rollup.Config, availDAFetcher AvailDAFetcher, batcherAddr common.Address, txs types.Transactions, log log.Logger) []eth.Data { var out []eth.Data l1Signer := config.L1Signer() for j, tx := range txs { @@ -123,13 +123,7 @@ func DataFromEVMTransactions(config *rollup.Config, batcherAddr common.Address, } if config.DAEnabled { - // Get Transaction data from da reference - txData, err := avail.GetTxDataByDARef(tx.Data(), log) - if err != nil { - log.Error("unable to retrieve the data back from Avail", "index", j, "err", err) - panic("Failed to get TxData from Avail block ref") - } - out = append(out, txData) + out = append(out, DataFromAvailDA(availDAFetcher, tx.Data(), log)) } else { out = append(out, tx.Data()) } @@ -137,3 +131,13 @@ func DataFromEVMTransactions(config *rollup.Config, batcherAddr common.Address, } return out } + +func DataFromAvailDA(availDAFetcher AvailDAFetcher, data []byte, log log.Logger) []byte { + // Get Transaction data from da reference + txData, err := availDAFetcher.GetTxDataByDARef(data) + if err != nil { + log.Error("unable to retrieve the data back from Avail", "err", err) + panic("Failed to get TxData from Avail block ref") + } + return txData +} diff --git a/op-node/rollup/derive/calldata_source_test.go b/op-node/rollup/derive/calldata_source_test.go index b7bfcd5a4ddf0..d4b1ea8e786bc 100644 --- a/op-node/rollup/derive/calldata_source_test.go +++ b/op-node/rollup/derive/calldata_source_test.go @@ -121,7 +121,7 @@ func TestDataFromEVMTransactions(t *testing.T) { } } - out := DataFromEVMTransactions(cfg, batcherAddr, txs, testlog.Logger(t, log.LvlCrit)) + out := DataFromEVMTransactions(cfg, nil, batcherAddr, txs, testlog.Logger(t, log.LvlCrit)) require.ElementsMatch(t, expectedData, out) } diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index 4dcd74a8e7783..18675ec49defd 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -26,6 +26,10 @@ type L1Fetcher interface { L1TransactionFetcher } +type AvailDAFetcher interface { + GetTxDataByDARef(RefData []byte) ([]byte, error) +} + // ResettableEngineControl wraps EngineControl with reset-functionality, // which handles reorgs like the derivation pipeline: // by determining the last valid block references to continue from. @@ -75,11 +79,11 @@ type DerivationPipeline struct { } // NewDerivationPipeline creates a derivation pipeline, which should be reset before use. -func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetcher, engine Engine, metrics Metrics) *DerivationPipeline { +func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetcher, availDAFetcher AvailDAFetcher, engine Engine, metrics Metrics) *DerivationPipeline { // Pull stages l1Traversal := NewL1Traversal(log, cfg, l1Fetcher) - dataSrc := NewDataSourceFactory(log, cfg, l1Fetcher) // auxiliary stage for L1Retrieval + dataSrc := NewDataSourceFactory(log, cfg, l1Fetcher, availDAFetcher) // auxiliary stage for L1Retrieval l1Src := NewL1Retrieval(log, dataSrc, l1Traversal) frameQueue := NewFrameQueue(log, l1Src) bank := NewChannelBank(log, cfg, frameQueue, l1Fetcher) diff --git a/op-node/rollup/driver/driver.go b/op-node/rollup/driver/driver.go index 7bee9307bb712..fa8792cecc872 100644 --- a/op-node/rollup/driver/driver.go +++ b/op-node/rollup/driver/driver.go @@ -46,6 +46,11 @@ type L2Chain interface { L2BlockRefByNumber(ctx context.Context, num uint64) (eth.L2BlockRef, error) } +type AvailDA interface { + derive.AvailDAFetcher + SubmitTxDataAndGetRef(TxData []byte) ([]byte, error) +} + type DerivationPipeline interface { Reset() Step(ctx context.Context) error @@ -108,13 +113,13 @@ type SequencerStateListener interface { } // NewDriver composes an events handler that tracks L1 state, triggers L2 derivation, and optionally sequences new L2 blocks. -func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, altSync AltSync, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics, sequencerStateListener SequencerStateListener) *Driver { +func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, availDA AvailDA, altSync AltSync, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics, sequencerStateListener SequencerStateListener) *Driver { l1 = NewMeteredL1Fetcher(l1, metrics) l1State := NewL1State(log, metrics) sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, l1State.L1Head, l1) findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth) verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, l1State.L1Head, l1) - derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l2, metrics) + derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, availDA, l2, metrics) attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, l2) engine := derivationPipeline meteredEngine := NewMeteredEngine(cfg, engine, metrics, log) diff --git a/op-node/service.go b/op-node/service.go index 0dd363e58f5ef..c6136f73ebd8f 100644 --- a/op-node/service.go +++ b/op-node/service.go @@ -88,6 +88,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { URL: ctx.String(flags.HeartbeatURLFlag.Name), }, ConfigPersistence: configPersistence, + AvailDAConfig: ctx.String(flags.AvailDAConfig.Name), } if err := cfg.LoadPersisted(log); err != nil { diff --git a/op-program/client/driver/driver.go b/op-program/client/driver/driver.go index 8372d1996a823..d0b38a9031e5a 100644 --- a/op-program/client/driver/driver.go +++ b/op-program/client/driver/driver.go @@ -34,8 +34,8 @@ type Driver struct { targetBlockNum uint64 } -func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher, l2Source L2Source, targetBlockNum uint64) *Driver { - pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, l2Source, metrics.NoopMetrics) +func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher, availDAFetcher derive.AvailDAFetcher, l2Source L2Source, targetBlockNum uint64) *Driver { + pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, availDAFetcher, l2Source, metrics.NoopMetrics) pipeline.Reset() return &Driver{ logger: logger,