Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions arbnode/mel/extraction/batch_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,6 @@ func ParseBatchesFromBlock(
return nil, nil, fmt.Errorf("error fetching tx by hash: %v in ParseBatchesFromBlock: %w ", log.TxHash, err)
}

// Record this log for MEL validation. This is a very cheap operation in native mode
// and is optimized for recording mode as well.
if _, err := logsFetcher.LogsForTxIndex(ctx, parentChainHeader.Hash(), log.TxIndex); err != nil {
return nil, nil, fmt.Errorf("error recording relevant logs: %w", err)
}

batch := &mel.SequencerInboxBatch{
BlockHash: log.BlockHash,
ParentChainBlockNumber: log.BlockNumber,
Expand Down
10 changes: 0 additions & 10 deletions arbnode/mel/extraction/delayed_message_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ func parseDelayedMessagesFromBlock(
// On Arbitrum One, this is the bridge contract which emits a MessageDelivered event.
if log.Address == melState.DelayedMessagePostingTargetAddress {
relevantLogs = append(relevantLogs, log)
// Record this log for MEL validation. This is a very cheap operation in native mode
// and is optimized for recording mode as well.
if _, err := logsFetcher.LogsForTxIndex(ctx, parentChainHeader.Hash(), log.TxIndex); err != nil {
return nil, fmt.Errorf("error recording relevant logs: %w", err)
}
}
}
if len(relevantLogs) > 0 {
Expand Down Expand Up @@ -83,11 +78,6 @@ func parseDelayedMessagesFromBlock(
return nil, err
}
messageData[common.BigToHash(msgNum)] = msg
// Record this log for MEL validation. This is a very cheap operation in native mode
// and is optimized for recording mode as well.
if _, err := logsFetcher.LogsForTxIndex(ctx, parentChainHeader.Hash(), inboxMsgLog.TxIndex); err != nil {
return nil, fmt.Errorf("error recording relevant logs: %w", err)
}
}
for i, parsedLog := range messageDeliveredEvents {
msgKey := common.BigToHash(parsedLog.MessageIndex)
Expand Down
2 changes: 1 addition & 1 deletion arbnode/mel/extraction/message_extraction_function.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,10 @@ func extractMessagesImpl(
}
for _, msg := range messagesInBatch {
messages = append(messages, msg)
state.MsgCount += 1
if err = state.AccumulateMessage(msg); err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to accumulate message: %w", err)
}
state.MsgCount += 1
}
state.BatchCount += 1
batchMetas = append(batchMetas, &mel.BatchMetadata{
Expand Down
3 changes: 2 additions & 1 deletion arbnode/mel/recording/delayed_msg_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ func (r *DelayedMsgDatabase) ReadDelayedMessage(ctx context.Context, state *mel.
return nil, err
}
hashDelayedHash := crypto.Keccak256(delayed.Hash().Bytes())
r.preimages[common.BytesToHash(hashDelayedHash)] = delayedMsgBytes
r.preimages[common.BytesToHash(hashDelayedHash)] = delayed.Hash().Bytes()
r.preimages[delayed.Hash()] = delayedMsgBytes
return delayed, nil
}

Expand Down
210 changes: 72 additions & 138 deletions arbnode/mel/recording/receipt_recorder.go
Original file line number Diff line number Diff line change
@@ -1,184 +1,118 @@
package melrecording

import (
"bytes"
"context"
"errors"
"fmt"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
"github.com/ethereum/go-ethereum/trie/trienode"
"github.com/ethereum/go-ethereum/triedb"

melextraction "github.com/offchainlabs/nitro/arbnode/mel/extraction"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/daprovider"
"github.com/offchainlabs/nitro/mel-replay"
)

// ReceiptRecorder records preimages corresponding to the receipts of a parent chain block
// needed during the message extraction. These preimages are needed for MEL validation and
// is used in creation of the validation entries by the MEL validator
type ReceiptRecorder struct {
parentChainReader BlockReader
parentChainBlockHash common.Hash
parentChainBlockNumber uint64
recordPreimages daprovider.PreimageRecorder
receipts []*types.Receipt
logs []*types.Log
relevantLogsTxIndexes map[uint]struct{}
trieDB *triedb.Database
blockReceiptHash common.Hash
// Implements a hasher that captures preimages of hashes as it computes them.
type preimageRecordingHasher struct {
trie *trie.StackTrie
recordPreimages daprovider.PreimageRecorder
}

// NewReceiptRecorder returns ReceiptRecorder that records
// the receipt preimages into the given preimages map
func NewReceiptRecorder(
parentChainReader BlockReader,
parentChainBlockHash common.Hash,
preimages daprovider.PreimagesMap,
) (*ReceiptRecorder, error) {
if preimages == nil {
return nil, errors.New("preimages recording destination cannot be nil")
func newRecordingHasher(recordPreimages daprovider.PreimageRecorder) *preimageRecordingHasher {
h := &preimageRecordingHasher{
recordPreimages: recordPreimages,
}
return &ReceiptRecorder{
parentChainReader: parentChainReader,
parentChainBlockHash: parentChainBlockHash,
recordPreimages: daprovider.RecordPreimagesTo(preimages),
relevantLogsTxIndexes: make(map[uint]struct{}),
}, nil
// OnTrieNode callback captures all trie nodes.
onTrieNode := func(path []byte, hash common.Hash, blob []byte) {
// Deep copy the blob since the callback warns contents may change, so this is required.
recordPreimages(hash, common.CopyBytes(blob), arbutil.Keccak256PreimageType)
}

h.trie = trie.NewStackTrie(onTrieNode)
return h
}

func (h *preimageRecordingHasher) Reset() {
onTrieNode := func(path []byte, hash common.Hash, blob []byte) {
h.recordPreimages(hash, common.CopyBytes(blob), arbutil.Keccak256PreimageType)
}
h.trie = trie.NewStackTrie(onTrieNode)
}

func (h *preimageRecordingHasher) Update(key, value []byte) error {
valueHash := crypto.Keccak256Hash(value)
h.recordPreimages(valueHash, common.CopyBytes(value), arbutil.Keccak256PreimageType)
return h.trie.Update(key, value)
}

func (h *preimageRecordingHasher) Hash() common.Hash {
return h.trie.Hash()
}

// Initialize must be called first to setup the recording trie database and store all the
// block receipts into the triedb. Without this, preimage recording is not possible and
// the other functions will error out if called beforehand
func (rr *ReceiptRecorder) Initialize(ctx context.Context) error {
block, err := rr.parentChainReader.BlockByHash(ctx, rr.parentChainBlockHash)
// recordedLogsFetcher holds the logs of recorded receipt preimages. These preimages are
// needed for MEL validation and is used in creation of the validation entries by the MEL validator
type recordedLogsFetcher struct {
parentChainBlockHash common.Hash
receipts []*types.Receipt
logs []*types.Log
}

// RecordReceipts records preimages of all the receipts in a block and returns a LogsFetcher
// that provides these logs during message extraction
func RecordReceipts(ctx context.Context, parentChainReader BlockReader, parentChainBlockHash common.Hash, preimages daprovider.PreimagesMap) (melextraction.LogsFetcher, error) {
if preimages == nil {
return nil, errors.New("preimages recording destination cannot be nil")
}
block, err := parentChainReader.BlockByHash(ctx, parentChainBlockHash)
if err != nil {
return err
return nil, err
}
tdb := triedb.NewDatabase(rawdb.NewMemoryDatabase(), &triedb.Config{
Preimages: true,
})
receiptsTrie := trie.NewEmpty(tdb)
var receipts []*types.Receipt
txs := block.Body().Transactions
for i, tx := range txs {
receipt, err := rr.parentChainReader.TransactionReceipt(ctx, tx.Hash())
var receipts []*types.Receipt
var logs []*types.Log
for _, tx := range txs {
receipt, err := parentChainReader.TransactionReceipt(ctx, tx.Hash())
if err != nil {
return fmt.Errorf("error fetching receipt for tx: %v, blockHash: %v", tx.Hash(), block.Hash())
return nil, fmt.Errorf("error fetching receipt for tx: %v, blockHash: %v", tx.Hash(), block.Hash())
}
receipts = append(receipts, receipt)
rr.logs = append(rr.logs, receipt.Logs...)
// #nosec G115
indexBytes, err := rlp.EncodeToBytes(uint64(i))
if err != nil {
return fmt.Errorf("failed to encode index %d: %w", i, err)
}
receiptBytes, err := receipt.MarshalBinary()
if err != nil {
return fmt.Errorf("failed to marshal receipt %d: %w", i, err)
}
if err := receiptsTrie.Update(indexBytes, receiptBytes); err != nil {
return fmt.Errorf("failed to update trie at index %d: %w", i, err)
}
logs = append(logs, receipt.Logs...)
}
root, nodes := receiptsTrie.Commit(false)
if root != block.ReceiptHash() {
return fmt.Errorf("computed root %s doesn't match header root %s",
root.Hex(), block.ReceiptHash().Hex())
hasher := newRecordingHasher(daprovider.RecordPreimagesTo(preimages))
receiptsRoot := types.DeriveSha(types.Receipts(receipts), hasher)
if receiptsRoot != block.ReceiptHash() {
return nil, fmt.Errorf("computed root %s doesn't match header root %s", receiptsRoot.Hex(), block.ReceiptHash().Hex())
}
if nodes != nil {
if err := tdb.Update(root, types.EmptyRootHash, 0, trienode.NewWithNodeSet(nodes), nil); err != nil {
return fmt.Errorf("failed to commit trie nodes: %w", err)
}
}
if err := tdb.Commit(root, false); err != nil {
return fmt.Errorf("failed to commit database: %w", err)
}
rr.receipts = receipts
rr.trieDB = tdb
rr.blockReceiptHash = root
rr.parentChainBlockNumber = block.NumberU64()
return &recordedLogsFetcher{
logs: logs,
receipts: receipts,
parentChainBlockHash: parentChainBlockHash,
}, nil
}

// RecordPreimages records preimages corresponding to all the receipts in a block using preimageRecordingHasher
func (rr *recordedLogsFetcher) RecordPreimages(ctx context.Context) error {

return nil
}

func (rr *ReceiptRecorder) LogsForTxIndex(ctx context.Context, parentChainBlockHash common.Hash, txIndex uint) ([]*types.Log, error) {
if rr.trieDB == nil {
return nil, errors.New("TransactionRecorder not initialized")
}
func (rr *recordedLogsFetcher) LogsForTxIndex(ctx context.Context, parentChainBlockHash common.Hash, txIndex uint) ([]*types.Log, error) {
if rr.parentChainBlockHash != parentChainBlockHash {
return nil, fmt.Errorf("parentChainBlockHash mismatch. expected: %v got: %v", rr.parentChainBlockHash, parentChainBlockHash)
}
if _, recorded := rr.relevantLogsTxIndexes[txIndex]; recorded {
return rr.receipts[txIndex].Logs, nil
}
// #nosec G115
if int(txIndex) >= len(rr.receipts) {
return nil, fmt.Errorf("index out of range: %d", txIndex)
}
recordingDB := &TxsAndReceiptsDatabase{
underlying: rr.trieDB,
recorder: rr.recordPreimages, // RecordingDB will record relevant preimages into the given preimagesmap
}
recordingTDB := triedb.NewDatabase(recordingDB, nil)
receiptsTrie, err := trie.New(trie.TrieID(rr.blockReceiptHash), recordingTDB)
if err != nil {
return nil, fmt.Errorf("failed to create trie: %w", err)
}
indexBytes, err := rlp.EncodeToBytes(txIndex)
if err != nil {
return nil, fmt.Errorf("failed to encode index: %w", err)
}
receiptBytes, err := receiptsTrie.Get(indexBytes)
if err != nil {
return nil, fmt.Errorf("failed to get receipt from trie: %w", err)
}
receipt := new(types.Receipt)
if err = receipt.UnmarshalBinary(receiptBytes); err != nil {
return nil, fmt.Errorf("failed to unmarshal receipt: %w", err)
}
// Add the receipt marshaled binary by hash to the preimages map
rr.recordPreimages(crypto.Keccak256Hash(receiptBytes), receiptBytes, arbutil.Keccak256PreimageType)
// Fill in the TxIndex (give as input to this method) into the logs so that Tx recording
// is possible. This field is one of the derived fields of Log hence won't be stored in trie.
//
// We use this same trick in validation as well in order to link a tx with its logs
for _, log := range receipt.Logs {
log.TxIndex = txIndex
log.BlockHash = parentChainBlockHash
log.BlockNumber = rr.parentChainBlockNumber
}
rr.relevantLogsTxIndexes[txIndex] = struct{}{}
return receipt.Logs, nil
return rr.receipts[txIndex].Logs, nil
}

func (rr *ReceiptRecorder) LogsForBlockHash(ctx context.Context, parentChainBlockHash common.Hash) ([]*types.Log, error) {
if rr.trieDB == nil {
return nil, errors.New("TransactionRecorder not initialized")
}
func (rr *recordedLogsFetcher) LogsForBlockHash(ctx context.Context, parentChainBlockHash common.Hash) ([]*types.Log, error) {
if rr.parentChainBlockHash != parentChainBlockHash {
return nil, fmt.Errorf("parentChainBlockHash mismatch. expected: %v got: %v", rr.parentChainBlockHash, parentChainBlockHash)
}
return rr.logs, nil
}

// CollectTxIndicesPreimage adds the array of relevant tx indexes to the preimages map as a value
// to the key represented by parentChainBlockHash.
func (rr *ReceiptRecorder) CollectTxIndicesPreimage() error {
var relevantLogsTxIndexes []uint
for k := range rr.relevantLogsTxIndexes {
relevantLogsTxIndexes = append(relevantLogsTxIndexes, k)
}
var buf bytes.Buffer
if err := rlp.Encode(&buf, relevantLogsTxIndexes); err != nil {
return err
}
relevantTxIndicesKey := melreplay.RelevantTxIndexesKey(rr.parentChainBlockHash)
rr.recordPreimages(relevantTxIndicesKey, buf.Bytes(), arbutil.Keccak256PreimageType)
return nil
}
2 changes: 1 addition & 1 deletion arbnode/mel/recording/tx_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (tr *TransactionRecorder) TransactionByLog(ctx context.Context, log *types.
if int(log.TxIndex) >= len(tr.txs) {
return nil, fmt.Errorf("index out of range: %d", log.TxIndex)
}
recordingDB := &TxsAndReceiptsDatabase{
recordingDB := &TxsRecordingDatabase{
underlying: tr.trieDB,
recorder: tr.recordPreimages, // RecordingDB will record relevant preimages into the given preimagesmap
}
Expand Down
Loading
Loading