Skip to content

Commit 2a72944

Browse files
authored
feat(sensor): add TimeFirstSeenHash and SensorFirstSeen values to Blocks entity (#757)
* feat(sensor): use LRU with TTL for requests cache * docs: make gen-doc * fix: rename requests cache variables * fix: comment * fix: requests cache opts name * feat(sensor): use cache to track seen blocks * docs: make gen-doc * feat: global blocks cache * fix: rename method * fix: conditional sends? * fix: flags * chore: use conns opts * fix: revert request spelling * fix: remove wrong check * fix: logic issues * feat: store entire blocks * chore: runAsync refactor * feat: optimizations * feat(sensor): track if block headers are fetched as parents * feat(sensor): add `TimeFirstSeenHash`, `IsParent`, and `SensorFirstSeen` values to Datastore * chore: move BlockCache * fix: remove comment * chore: refactor * refactor: move time first seen hash outside of block header * fix: empty if * docs: make gen-doc * docs: make gen-doc * chore: rename newHeader to header * chore: refactor to use CacheOptions struct * chore(sensor): move oldest and head block to Conns * chore: remove HeadBlock struct in favor of eth.NewBlockPacket * chore: add ToBlock to rpc block type * fix: only log if value was changed * chore: use ReplaceAll instead of Replace * chore: clean up oldest logic * feat: add head and oldest block to the api * feat: add promtheus metrics * docs: metrics generation * docs: update examples * chore: lint * fix: rename methods to be more idiomatic * fix: nil checks * fix: go fmt * fix: overwrite * fix: remove fields from json * fix: make a struct * fix: remove redundant field * fix: redundant check * fix: redudant check (again) * fix: check if header is earlier * fix: preserve new block time
1 parent 5c2f39a commit 2a72944

File tree

7 files changed

+211
-94
lines changed

7 files changed

+211
-94
lines changed

cmd/ulxly/ulxly.go

Lines changed: 38 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -347,50 +347,50 @@ func balanceTree() error {
347347
return err
348348
}
349349
type BalanceEntry struct {
350-
OriginNetwork uint32 `json:"originNetwork"`
351-
OriginTokenAddress common.Address `json:"originTokenAddress"`
352-
TotalSupply string `json:"totalSupply"`
353-
}
354-
355-
var balanceEntries []BalanceEntry
356-
for tokenKey, balance := range balances {
357-
if balance.Cmp(big.NewInt(0)) == 0 {
358-
continue
359-
}
360-
361-
var token TokenInfo
362-
token, err = TokenInfoStringToStruct(tokenKey)
363-
if err != nil {
364-
return err
365-
}
366-
367-
if token.OriginNetwork.Uint64() == uint64(l2NetworkID) {
368-
continue
369-
}
350+
OriginNetwork uint32 `json:"originNetwork"`
351+
OriginTokenAddress common.Address `json:"originTokenAddress"`
352+
TotalSupply string `json:"totalSupply"`
353+
}
354+
355+
var balanceEntries []BalanceEntry
356+
for tokenKey, balance := range balances {
357+
if balance.Cmp(big.NewInt(0)) == 0 {
358+
continue
359+
}
360+
361+
var token TokenInfo
362+
token, err = TokenInfoStringToStruct(tokenKey)
363+
if err != nil {
364+
return err
365+
}
366+
367+
if token.OriginNetwork.Uint64() == uint64(l2NetworkID) {
368+
continue
369+
}
370370

371371
balanceEntries = append(balanceEntries, BalanceEntry{
372372
OriginNetwork: uint32(token.OriginNetwork.Uint64()),
373373
OriginTokenAddress: token.OriginTokenAddress,
374374
TotalSupply: balance.String(),
375375
})
376-
}
377-
378-
// Create the response structure
379-
response := struct {
380-
Root string `json:"root"`
381-
Balances []BalanceEntry `json:"balances"`
382-
}{
383-
Root: root.String(),
384-
Balances: balanceEntries,
385-
}
386-
387-
// Marshal to JSON with proper formatting
388-
jsonOutput, err := json.MarshalIndent(response, "", " ")
389-
if err != nil {
390-
return err
391-
}
392-
393-
fmt.Println(string(jsonOutput))
376+
}
377+
378+
// Create the response structure
379+
response := struct {
380+
Root string `json:"root"`
381+
Balances []BalanceEntry `json:"balances"`
382+
}{
383+
Root: root.String(),
384+
Balances: balanceEntries,
385+
}
386+
387+
// Marshal to JSON with proper formatting
388+
jsonOutput, err := json.MarshalIndent(response, "", " ")
389+
if err != nil {
390+
return err
391+
}
392+
393+
fmt.Println(string(jsonOutput))
394394
return nil
395395
}
396396

p2p/conns.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package p2p
22

33
import (
4+
"math/big"
45
"sync"
56
"time"
67

@@ -11,6 +12,13 @@ import (
1112
"github.com/ethereum/go-ethereum/p2p/enode"
1213
)
1314

15+
// BlockCache stores the actual block data to avoid duplicate fetches and database queries.
16+
type BlockCache struct {
17+
Header *types.Header
18+
Body *eth.BlockBody
19+
TD *big.Int
20+
}
21+
1422
// Conns manages a collection of active peer connections for transaction broadcasting.
1523
// It also maintains a global cache of blocks written to the database.
1624
type Conns struct {

p2p/database/database.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,17 @@ type Database interface {
2121
WriteBlock(context.Context, *enode.Node, *types.Block, *big.Int, time.Time)
2222

2323
// WriteBlockHeaders will write the block headers if ShouldWriteBlocks
24-
// returns true. The isParent parameter indicates if these headers were
25-
// fetched as parent blocks.
24+
// returns true.
2625
WriteBlockHeaders(context.Context, []*types.Header, time.Time, bool)
2726

2827
// WriteBlockHashes will write the block hashes if ShouldWriteBlockEvents
2928
// returns true.
3029
WriteBlockHashes(context.Context, *enode.Node, []common.Hash, time.Time)
3130

31+
// WriteBlockHashFirstSeen writes a partial block entry with just the hash
32+
// first seen time if the block doesn't exist yet.
33+
WriteBlockHashFirstSeen(context.Context, common.Hash, time.Time)
34+
3235
// WriteBlockBody will write the block bodies if ShouldWriteBlocks returns
3336
// true.
3437
WriteBlockBody(context.Context, *eth.BlockBody, common.Hash, time.Time)

p2p/database/datastore.go

Lines changed: 130 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ const (
2323
TransactionsKind = "transactions"
2424
TransactionEventsKind = "transaction_events"
2525
PeersKind = "peers"
26-
MaxAttempts = 3
26+
MaxAttempts = 5
2727
)
2828

2929
// Datastore wraps the datastore client, stores the sensorID, and other
@@ -57,33 +57,36 @@ type DatastoreEvent struct {
5757
// DatastoreHeader stores the data in manner that can be easily written without
5858
// loss of precision.
5959
type DatastoreHeader struct {
60-
ParentHash *datastore.Key
61-
UncleHash string `datastore:",noindex"`
62-
Coinbase string `datastore:",noindex"`
63-
Root string `datastore:",noindex"`
64-
TxHash string `datastore:",noindex"`
65-
ReceiptHash string `datastore:",noindex"`
66-
Bloom []byte `datastore:",noindex"`
67-
Difficulty string `datastore:",noindex"`
68-
Number string
69-
GasLimit string `datastore:",noindex"`
70-
GasUsed string
71-
Time time.Time
72-
Extra []byte `datastore:",noindex"`
73-
MixDigest string `datastore:",noindex"`
74-
Nonce string `datastore:",noindex"`
75-
BaseFee string `datastore:",noindex"`
76-
TimeFirstSeen time.Time
77-
TTL time.Time
78-
IsParent bool
60+
ParentHash *datastore.Key
61+
UncleHash string `datastore:",noindex"`
62+
Coinbase string `datastore:",noindex"`
63+
Root string `datastore:",noindex"`
64+
TxHash string `datastore:",noindex"`
65+
ReceiptHash string `datastore:",noindex"`
66+
Bloom []byte `datastore:",noindex"`
67+
Difficulty string `datastore:",noindex"`
68+
Number string
69+
GasLimit string `datastore:",noindex"`
70+
GasUsed string
71+
Time time.Time
72+
Extra []byte `datastore:",noindex"`
73+
MixDigest string `datastore:",noindex"`
74+
Nonce string `datastore:",noindex"`
75+
BaseFee string `datastore:",noindex"`
76+
TimeFirstSeen time.Time
77+
TTL time.Time
78+
IsParent bool
79+
SensorFirstSeen string
7980
}
8081

8182
// DatastoreBlock represents a block stored in datastore.
8283
type DatastoreBlock struct {
8384
*DatastoreHeader
84-
TotalDifficulty string `datastore:",noindex"`
85-
Transactions []*datastore.Key `datastore:",noindex"`
86-
Uncles []*datastore.Key `datastore:",noindex"`
85+
TotalDifficulty string `datastore:",noindex"`
86+
Transactions []*datastore.Key `datastore:",noindex"`
87+
Uncles []*datastore.Key `datastore:",noindex"`
88+
TimeFirstSeenHash time.Time
89+
SensorFirstSeenHash string
8790
}
8891

8992
// DatastoreTransaction represents a transaction stored in datastore. Data is
@@ -225,6 +228,52 @@ func (d *Datastore) WriteBlockHashes(ctx context.Context, peer *enode.Node, hash
225228
})
226229
}
227230

231+
// WriteBlockHashFirstSeen writes a partial block entry with just the hash
232+
// first seen time if the block doesn't exist yet. If it exists, updates the
233+
// TimeFirstSeenHash if the new time is earlier.
234+
func (d *Datastore) WriteBlockHashFirstSeen(ctx context.Context, hash common.Hash, tfsh time.Time) {
235+
if d.client == nil || !d.ShouldWriteBlocks() {
236+
return
237+
}
238+
239+
d.runAsync(func() {
240+
d.writeBlockHashFirstSeen(ctx, hash, tfsh)
241+
})
242+
}
243+
244+
// writeBlockHashFirstSeen performs the actual transaction to write or update the block hash first seen time.
245+
func (d *Datastore) writeBlockHashFirstSeen(ctx context.Context, hash common.Hash, tfsh time.Time) {
246+
key := datastore.NameKey(BlocksKind, hash.Hex(), nil)
247+
248+
_, err := d.client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
249+
var block DatastoreBlock
250+
err := tx.Get(key, &block)
251+
252+
// If block doesn't exist, create partial entry with just hash timing
253+
if err != nil {
254+
block.TimeFirstSeenHash = tfsh
255+
block.SensorFirstSeenHash = d.sensorID
256+
_, err = tx.Put(key, &block)
257+
return err
258+
}
259+
260+
// If timestamp already set and not earlier, no update needed
261+
if !block.TimeFirstSeenHash.IsZero() && !tfsh.Before(block.TimeFirstSeenHash) {
262+
return nil
263+
}
264+
265+
// Update with earlier timestamp
266+
block.TimeFirstSeenHash = tfsh
267+
block.SensorFirstSeenHash = d.sensorID
268+
_, err = tx.Put(key, &block)
269+
return err
270+
}, datastore.MaxAttempts(MaxAttempts))
271+
272+
if err != nil {
273+
log.Error().Err(err).Str("hash", hash.Hex()).Msg("Failed to write block hash first seen")
274+
}
275+
}
276+
228277
// WriteTransactions will write the transactions and transaction events to datastore.
229278
func (d *Datastore) WriteTransactions(ctx context.Context, peer *enode.Node, txs []*types.Transaction, tfs time.Time) {
230279
if d.client == nil {
@@ -318,25 +367,43 @@ func (d *Datastore) HasBlock(ctx context.Context, hash common.Hash) bool {
318367
// values are converted into strings to prevent a loss of precision.
319368
func (d *Datastore) newDatastoreHeader(header *types.Header, tfs time.Time, isParent bool) *DatastoreHeader {
320369
return &DatastoreHeader{
321-
ParentHash: datastore.NameKey(BlocksKind, header.ParentHash.Hex(), nil),
322-
UncleHash: header.UncleHash.Hex(),
323-
Coinbase: header.Coinbase.Hex(),
324-
Root: header.Root.Hex(),
325-
TxHash: header.TxHash.Hex(),
326-
ReceiptHash: header.ReceiptHash.Hex(),
327-
Bloom: header.Bloom.Bytes(),
328-
Difficulty: header.Difficulty.String(),
329-
Number: header.Number.String(),
330-
GasLimit: fmt.Sprint(header.GasLimit),
331-
GasUsed: fmt.Sprint(header.GasUsed),
332-
Time: time.Unix(int64(header.Time), 0),
333-
Extra: header.Extra,
334-
MixDigest: header.MixDigest.String(),
335-
Nonce: fmt.Sprint(header.Nonce.Uint64()),
336-
BaseFee: header.BaseFee.String(),
337-
TimeFirstSeen: tfs,
338-
TTL: tfs.Add(d.ttl),
339-
IsParent: isParent,
370+
ParentHash: datastore.NameKey(BlocksKind, header.ParentHash.Hex(), nil),
371+
UncleHash: header.UncleHash.Hex(),
372+
Coinbase: header.Coinbase.Hex(),
373+
Root: header.Root.Hex(),
374+
TxHash: header.TxHash.Hex(),
375+
ReceiptHash: header.ReceiptHash.Hex(),
376+
Bloom: header.Bloom.Bytes(),
377+
Difficulty: header.Difficulty.String(),
378+
Number: header.Number.String(),
379+
GasLimit: fmt.Sprint(header.GasLimit),
380+
GasUsed: fmt.Sprint(header.GasUsed),
381+
Time: time.Unix(int64(header.Time), 0),
382+
Extra: header.Extra,
383+
MixDigest: header.MixDigest.String(),
384+
Nonce: fmt.Sprint(header.Nonce.Uint64()),
385+
BaseFee: header.BaseFee.String(),
386+
TimeFirstSeen: tfs,
387+
TTL: tfs.Add(d.ttl),
388+
IsParent: isParent,
389+
SensorFirstSeen: d.sensorID,
390+
}
391+
}
392+
393+
// writeFirstSeen updates timing fields on a header and block, preserving earlier timestamps.
394+
func (d *Datastore) writeFirstSeen(header *DatastoreHeader, block *DatastoreBlock, tfs time.Time) {
395+
// Preserve earlier header timing if it exists
396+
if block.DatastoreHeader != nil &&
397+
!block.DatastoreHeader.TimeFirstSeen.IsZero() &&
398+
block.DatastoreHeader.TimeFirstSeen.Before(tfs) {
399+
header.TimeFirstSeen = block.DatastoreHeader.TimeFirstSeen
400+
header.SensorFirstSeen = block.DatastoreHeader.SensorFirstSeen
401+
}
402+
403+
// Set hash timing if it doesn't exist or if new timestamp is earlier
404+
if block.TimeFirstSeenHash.IsZero() || tfs.Before(block.TimeFirstSeenHash) {
405+
block.TimeFirstSeenHash = tfs
406+
block.SensorFirstSeenHash = d.sensorID
340407
}
341408
}
342409

@@ -391,9 +458,16 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big.
391458

392459
shouldWrite := false
393460

394-
if dsBlock.DatastoreHeader == nil {
461+
if dsBlock.DatastoreHeader == nil || tfs.Before(dsBlock.DatastoreHeader.TimeFirstSeen) {
395462
shouldWrite = true
396-
dsBlock.DatastoreHeader = d.newDatastoreHeader(block.Header(), tfs, false)
463+
464+
// Create new header with current timing
465+
header := d.newDatastoreHeader(block.Header(), tfs, false)
466+
467+
// Preserve earlier timestamps from any earlier announcement
468+
d.writeFirstSeen(header, &dsBlock, tfs)
469+
470+
dsBlock.DatastoreHeader = header
397471
}
398472

399473
if len(dsBlock.TotalDifficulty) == 0 {
@@ -484,12 +558,21 @@ func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header,
484558

485559
_, err := d.client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
486560
var block DatastoreBlock
487-
if err := tx.Get(key, &block); err == nil && block.DatastoreHeader != nil {
561+
err := tx.Get(key, &block)
562+
563+
// If block header already exists and new timestamp is not earlier, don't overwrite
564+
if err == nil && block.DatastoreHeader != nil && !tfs.Before(block.DatastoreHeader.TimeFirstSeen) {
488565
return nil
489566
}
490567

491-
block.DatastoreHeader = d.newDatastoreHeader(header, tfs, isParent)
492-
_, err := tx.Put(key, &block)
568+
// Create new header with current timing
569+
newHeader := d.newDatastoreHeader(header, tfs, isParent)
570+
571+
// Preserve earlier timestamps from any earlier announcement or full block
572+
d.writeFirstSeen(newHeader, &block, tfs)
573+
574+
block.DatastoreHeader = newHeader
575+
_, err = tx.Put(key, &block)
493576
return err
494577
}, datastore.MaxAttempts(MaxAttempts))
495578

p2p/database/json.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,14 @@ type JSONBlockEvent struct {
8181
Timestamp time.Time `json:"timestamp"`
8282
}
8383

84+
// JSONBlockHashFirstSeen represents a block hash announcement in JSON format.
85+
type JSONBlockHashFirstSeen struct {
86+
Type string `json:"type"`
87+
SensorID string `json:"sensor_id"`
88+
Hash string `json:"hash"`
89+
TimeFirstSeen time.Time `json:"time_first_seen"`
90+
}
91+
8492
// JSONTransaction represents a transaction in JSON format.
8593
type JSONTransaction struct {
8694
Type string `json:"type"`
@@ -228,6 +236,21 @@ func (j *JSONDatabase) WriteBlockHashes(ctx context.Context, peer *enode.Node, h
228236
}
229237
}
230238

239+
// WriteBlockHashFirstSeen writes a partial block entry with just the hash
240+
// first seen time. For JSON output, this writes a separate record type.
241+
func (j *JSONDatabase) WriteBlockHashFirstSeen(ctx context.Context, hash common.Hash, tfsh time.Time) {
242+
if !j.ShouldWriteBlocks() {
243+
return
244+
}
245+
246+
j.Write(JSONBlockHashFirstSeen{
247+
Type: "block_hash_first_seen",
248+
SensorID: j.sensorID,
249+
Hash: hash.Hex(),
250+
TimeFirstSeen: tfsh,
251+
})
252+
}
253+
231254
// WriteBlockBody writes the block body as JSON.
232255
func (j *JSONDatabase) WriteBlockBody(ctx context.Context, body *eth.BlockBody, hash common.Hash, tfs time.Time) {
233256
if !j.ShouldWriteBlocks() {

0 commit comments

Comments
 (0)