diff --git a/cmd/ulxly/ulxly.go b/cmd/ulxly/ulxly.go index f33d0b45..6a0d80e8 100644 --- a/cmd/ulxly/ulxly.go +++ b/cmd/ulxly/ulxly.go @@ -347,50 +347,50 @@ func balanceTree() error { return err } type BalanceEntry struct { - OriginNetwork uint32 `json:"originNetwork"` - OriginTokenAddress common.Address `json:"originTokenAddress"` - TotalSupply string `json:"totalSupply"` - } - - var balanceEntries []BalanceEntry - for tokenKey, balance := range balances { - if balance.Cmp(big.NewInt(0)) == 0 { - continue - } - - var token TokenInfo - token, err = TokenInfoStringToStruct(tokenKey) - if err != nil { - return err - } - - if token.OriginNetwork.Uint64() == uint64(l2NetworkID) { - continue - } + OriginNetwork uint32 `json:"originNetwork"` + OriginTokenAddress common.Address `json:"originTokenAddress"` + TotalSupply string `json:"totalSupply"` + } + + var balanceEntries []BalanceEntry + for tokenKey, balance := range balances { + if balance.Cmp(big.NewInt(0)) == 0 { + continue + } + + var token TokenInfo + token, err = TokenInfoStringToStruct(tokenKey) + if err != nil { + return err + } + + if token.OriginNetwork.Uint64() == uint64(l2NetworkID) { + continue + } balanceEntries = append(balanceEntries, BalanceEntry{ OriginNetwork: uint32(token.OriginNetwork.Uint64()), OriginTokenAddress: token.OriginTokenAddress, TotalSupply: balance.String(), }) - } - - // Create the response structure - response := struct { - Root string `json:"root"` - Balances []BalanceEntry `json:"balances"` - }{ - Root: root.String(), - Balances: balanceEntries, - } - - // Marshal to JSON with proper formatting - jsonOutput, err := json.MarshalIndent(response, "", " ") - if err != nil { - return err - } - - fmt.Println(string(jsonOutput)) + } + + // Create the response structure + response := struct { + Root string `json:"root"` + Balances []BalanceEntry `json:"balances"` + }{ + Root: root.String(), + Balances: balanceEntries, + } + + // Marshal to JSON with proper formatting + jsonOutput, err := json.MarshalIndent(response, "", " ") + if err != nil { + return err + } + + fmt.Println(string(jsonOutput)) return nil } diff --git a/p2p/conns.go b/p2p/conns.go index 631f203c..c0e313f0 100644 --- a/p2p/conns.go +++ b/p2p/conns.go @@ -1,6 +1,7 @@ package p2p import ( + "math/big" "sync" "time" @@ -11,6 +12,13 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" ) +// BlockCache stores the actual block data to avoid duplicate fetches and database queries. +type BlockCache struct { + Header *types.Header + Body *eth.BlockBody + TD *big.Int +} + // Conns manages a collection of active peer connections for transaction broadcasting. // It also maintains a global cache of blocks written to the database. type Conns struct { diff --git a/p2p/database/database.go b/p2p/database/database.go index cde50c44..b6fc60d0 100644 --- a/p2p/database/database.go +++ b/p2p/database/database.go @@ -21,14 +21,17 @@ type Database interface { WriteBlock(context.Context, *enode.Node, *types.Block, *big.Int, time.Time) // WriteBlockHeaders will write the block headers if ShouldWriteBlocks - // returns true. The isParent parameter indicates if these headers were - // fetched as parent blocks. + // returns true. WriteBlockHeaders(context.Context, []*types.Header, time.Time, bool) // WriteBlockHashes will write the block hashes if ShouldWriteBlockEvents // returns true. WriteBlockHashes(context.Context, *enode.Node, []common.Hash, time.Time) + // WriteBlockHashFirstSeen writes a partial block entry with just the hash + // first seen time if the block doesn't exist yet. + WriteBlockHashFirstSeen(context.Context, common.Hash, time.Time) + // WriteBlockBody will write the block bodies if ShouldWriteBlocks returns // true. WriteBlockBody(context.Context, *eth.BlockBody, common.Hash, time.Time) diff --git a/p2p/database/datastore.go b/p2p/database/datastore.go index 6eed8b1a..d1629b9c 100644 --- a/p2p/database/datastore.go +++ b/p2p/database/datastore.go @@ -23,7 +23,7 @@ const ( TransactionsKind = "transactions" TransactionEventsKind = "transaction_events" PeersKind = "peers" - MaxAttempts = 3 + MaxAttempts = 5 ) // Datastore wraps the datastore client, stores the sensorID, and other @@ -57,33 +57,36 @@ type DatastoreEvent struct { // DatastoreHeader stores the data in manner that can be easily written without // loss of precision. type DatastoreHeader struct { - ParentHash *datastore.Key - UncleHash string `datastore:",noindex"` - Coinbase string `datastore:",noindex"` - Root string `datastore:",noindex"` - TxHash string `datastore:",noindex"` - ReceiptHash string `datastore:",noindex"` - Bloom []byte `datastore:",noindex"` - Difficulty string `datastore:",noindex"` - Number string - GasLimit string `datastore:",noindex"` - GasUsed string - Time time.Time - Extra []byte `datastore:",noindex"` - MixDigest string `datastore:",noindex"` - Nonce string `datastore:",noindex"` - BaseFee string `datastore:",noindex"` - TimeFirstSeen time.Time - TTL time.Time - IsParent bool + ParentHash *datastore.Key + UncleHash string `datastore:",noindex"` + Coinbase string `datastore:",noindex"` + Root string `datastore:",noindex"` + TxHash string `datastore:",noindex"` + ReceiptHash string `datastore:",noindex"` + Bloom []byte `datastore:",noindex"` + Difficulty string `datastore:",noindex"` + Number string + GasLimit string `datastore:",noindex"` + GasUsed string + Time time.Time + Extra []byte `datastore:",noindex"` + MixDigest string `datastore:",noindex"` + Nonce string `datastore:",noindex"` + BaseFee string `datastore:",noindex"` + TimeFirstSeen time.Time + TTL time.Time + IsParent bool + SensorFirstSeen string } // DatastoreBlock represents a block stored in datastore. type DatastoreBlock struct { *DatastoreHeader - TotalDifficulty string `datastore:",noindex"` - Transactions []*datastore.Key `datastore:",noindex"` - Uncles []*datastore.Key `datastore:",noindex"` + TotalDifficulty string `datastore:",noindex"` + Transactions []*datastore.Key `datastore:",noindex"` + Uncles []*datastore.Key `datastore:",noindex"` + TimeFirstSeenHash time.Time + SensorFirstSeenHash string } // DatastoreTransaction represents a transaction stored in datastore. Data is @@ -225,6 +228,52 @@ func (d *Datastore) WriteBlockHashes(ctx context.Context, peer *enode.Node, hash }) } +// WriteBlockHashFirstSeen writes a partial block entry with just the hash +// first seen time if the block doesn't exist yet. If it exists, updates the +// TimeFirstSeenHash if the new time is earlier. +func (d *Datastore) WriteBlockHashFirstSeen(ctx context.Context, hash common.Hash, tfsh time.Time) { + if d.client == nil || !d.ShouldWriteBlocks() { + return + } + + d.runAsync(func() { + d.writeBlockHashFirstSeen(ctx, hash, tfsh) + }) +} + +// writeBlockHashFirstSeen performs the actual transaction to write or update the block hash first seen time. +func (d *Datastore) writeBlockHashFirstSeen(ctx context.Context, hash common.Hash, tfsh time.Time) { + key := datastore.NameKey(BlocksKind, hash.Hex(), nil) + + _, err := d.client.RunInTransaction(ctx, func(tx *datastore.Transaction) error { + var block DatastoreBlock + err := tx.Get(key, &block) + + // If block doesn't exist, create partial entry with just hash timing + if err != nil { + block.TimeFirstSeenHash = tfsh + block.SensorFirstSeenHash = d.sensorID + _, err = tx.Put(key, &block) + return err + } + + // If timestamp already set and not earlier, no update needed + if !block.TimeFirstSeenHash.IsZero() && !tfsh.Before(block.TimeFirstSeenHash) { + return nil + } + + // Update with earlier timestamp + block.TimeFirstSeenHash = tfsh + block.SensorFirstSeenHash = d.sensorID + _, err = tx.Put(key, &block) + return err + }, datastore.MaxAttempts(MaxAttempts)) + + if err != nil { + log.Error().Err(err).Str("hash", hash.Hex()).Msg("Failed to write block hash first seen") + } +} + // WriteTransactions will write the transactions and transaction events to datastore. func (d *Datastore) WriteTransactions(ctx context.Context, peer *enode.Node, txs []*types.Transaction, tfs time.Time) { if d.client == nil { @@ -318,25 +367,43 @@ func (d *Datastore) HasBlock(ctx context.Context, hash common.Hash) bool { // values are converted into strings to prevent a loss of precision. func (d *Datastore) newDatastoreHeader(header *types.Header, tfs time.Time, isParent bool) *DatastoreHeader { return &DatastoreHeader{ - ParentHash: datastore.NameKey(BlocksKind, header.ParentHash.Hex(), nil), - UncleHash: header.UncleHash.Hex(), - Coinbase: header.Coinbase.Hex(), - Root: header.Root.Hex(), - TxHash: header.TxHash.Hex(), - ReceiptHash: header.ReceiptHash.Hex(), - Bloom: header.Bloom.Bytes(), - Difficulty: header.Difficulty.String(), - Number: header.Number.String(), - GasLimit: fmt.Sprint(header.GasLimit), - GasUsed: fmt.Sprint(header.GasUsed), - Time: time.Unix(int64(header.Time), 0), - Extra: header.Extra, - MixDigest: header.MixDigest.String(), - Nonce: fmt.Sprint(header.Nonce.Uint64()), - BaseFee: header.BaseFee.String(), - TimeFirstSeen: tfs, - TTL: tfs.Add(d.ttl), - IsParent: isParent, + ParentHash: datastore.NameKey(BlocksKind, header.ParentHash.Hex(), nil), + UncleHash: header.UncleHash.Hex(), + Coinbase: header.Coinbase.Hex(), + Root: header.Root.Hex(), + TxHash: header.TxHash.Hex(), + ReceiptHash: header.ReceiptHash.Hex(), + Bloom: header.Bloom.Bytes(), + Difficulty: header.Difficulty.String(), + Number: header.Number.String(), + GasLimit: fmt.Sprint(header.GasLimit), + GasUsed: fmt.Sprint(header.GasUsed), + Time: time.Unix(int64(header.Time), 0), + Extra: header.Extra, + MixDigest: header.MixDigest.String(), + Nonce: fmt.Sprint(header.Nonce.Uint64()), + BaseFee: header.BaseFee.String(), + TimeFirstSeen: tfs, + TTL: tfs.Add(d.ttl), + IsParent: isParent, + SensorFirstSeen: d.sensorID, + } +} + +// writeFirstSeen updates timing fields on a header and block, preserving earlier timestamps. +func (d *Datastore) writeFirstSeen(header *DatastoreHeader, block *DatastoreBlock, tfs time.Time) { + // Preserve earlier header timing if it exists + if block.DatastoreHeader != nil && + !block.DatastoreHeader.TimeFirstSeen.IsZero() && + block.DatastoreHeader.TimeFirstSeen.Before(tfs) { + header.TimeFirstSeen = block.DatastoreHeader.TimeFirstSeen + header.SensorFirstSeen = block.DatastoreHeader.SensorFirstSeen + } + + // Set hash timing if it doesn't exist or if new timestamp is earlier + if block.TimeFirstSeenHash.IsZero() || tfs.Before(block.TimeFirstSeenHash) { + block.TimeFirstSeenHash = tfs + block.SensorFirstSeenHash = d.sensorID } } @@ -391,9 +458,16 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big. shouldWrite := false - if dsBlock.DatastoreHeader == nil { + if dsBlock.DatastoreHeader == nil || tfs.Before(dsBlock.DatastoreHeader.TimeFirstSeen) { shouldWrite = true - dsBlock.DatastoreHeader = d.newDatastoreHeader(block.Header(), tfs, false) + + // Create new header with current timing + header := d.newDatastoreHeader(block.Header(), tfs, false) + + // Preserve earlier timestamps from any earlier announcement + d.writeFirstSeen(header, &dsBlock, tfs) + + dsBlock.DatastoreHeader = header } if len(dsBlock.TotalDifficulty) == 0 { @@ -484,12 +558,21 @@ func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header, _, err := d.client.RunInTransaction(ctx, func(tx *datastore.Transaction) error { var block DatastoreBlock - if err := tx.Get(key, &block); err == nil && block.DatastoreHeader != nil { + err := tx.Get(key, &block) + + // If block header already exists and new timestamp is not earlier, don't overwrite + if err == nil && block.DatastoreHeader != nil && !tfs.Before(block.DatastoreHeader.TimeFirstSeen) { return nil } - block.DatastoreHeader = d.newDatastoreHeader(header, tfs, isParent) - _, err := tx.Put(key, &block) + // Create new header with current timing + newHeader := d.newDatastoreHeader(header, tfs, isParent) + + // Preserve earlier timestamps from any earlier announcement or full block + d.writeFirstSeen(newHeader, &block, tfs) + + block.DatastoreHeader = newHeader + _, err = tx.Put(key, &block) return err }, datastore.MaxAttempts(MaxAttempts)) diff --git a/p2p/database/json.go b/p2p/database/json.go index 76c510d3..37782509 100644 --- a/p2p/database/json.go +++ b/p2p/database/json.go @@ -81,6 +81,14 @@ type JSONBlockEvent struct { Timestamp time.Time `json:"timestamp"` } +// JSONBlockHashFirstSeen represents a block hash announcement in JSON format. +type JSONBlockHashFirstSeen struct { + Type string `json:"type"` + SensorID string `json:"sensor_id"` + Hash string `json:"hash"` + TimeFirstSeen time.Time `json:"time_first_seen"` +} + // JSONTransaction represents a transaction in JSON format. type JSONTransaction struct { Type string `json:"type"` @@ -228,6 +236,21 @@ func (j *JSONDatabase) WriteBlockHashes(ctx context.Context, peer *enode.Node, h } } +// WriteBlockHashFirstSeen writes a partial block entry with just the hash +// first seen time. For JSON output, this writes a separate record type. +func (j *JSONDatabase) WriteBlockHashFirstSeen(ctx context.Context, hash common.Hash, tfsh time.Time) { + if !j.ShouldWriteBlocks() { + return + } + + j.Write(JSONBlockHashFirstSeen{ + Type: "block_hash_first_seen", + SensorID: j.sensorID, + Hash: hash.Hex(), + TimeFirstSeen: tfsh, + }) +} + // WriteBlockBody writes the block body as JSON. func (j *JSONDatabase) WriteBlockBody(ctx context.Context, body *eth.BlockBody, hash common.Hash, tfs time.Time) { if !j.ShouldWriteBlocks() { diff --git a/p2p/database/nodb.go b/p2p/database/nodb.go index 71fefa13..b6223ba2 100644 --- a/p2p/database/nodb.go +++ b/p2p/database/nodb.go @@ -33,6 +33,10 @@ func (n *nodb) WriteBlockHeaders(ctx context.Context, headers []*types.Header, t func (n *nodb) WriteBlockHashes(ctx context.Context, peer *enode.Node, hashes []common.Hash, tfs time.Time) { } +// WriteBlockHashFirstSeen does nothing. +func (n *nodb) WriteBlockHashFirstSeen(ctx context.Context, hash common.Hash, tfsh time.Time) { +} + // WriteBlockBody does nothing. func (n *nodb) WriteBlockBody(ctx context.Context, body *eth.BlockBody, hash common.Hash, tfs time.Time) { } diff --git a/p2p/protocol.go b/p2p/protocol.go index c7b82803..ae39a29e 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -21,13 +21,6 @@ import ( "github.com/0xPolygon/polygon-cli/p2p/database" ) -// BlockCache stores the actual block data to avoid duplicate fetches and database queries. -type BlockCache struct { - Header *types.Header - Body *eth.BlockBody - TD *big.Int -} - // conn represents an individual connection with a peer. type conn struct { sensorID string @@ -356,6 +349,9 @@ func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error { continue } + // Write hash first seen time immediately for new blocks + c.db.WriteBlockHashFirstSeen(ctx, hash, tfs) + // Request only the parts we don't have if err := c.getBlockData(hash, cache, false); err != nil { return err